精华内容
下载资源
问答
  • Spark读写csv,txt,json,xlsx,xml,avro文件

    千次阅读 2019-10-30 14:51:00
    Spark读取txt文件3. Spark读取json文件4. Spark读取excel文件5. Spark读取xml文件 Spark读取csv,txt,json,xlsx,xml文件 下文讲述spark从hdfs读取解析常见的几种文本文件的方式。 1. Spark读取csv文件 需引入的外部...

    Spark读取文本文件时,面对繁多的文件格式,是一件很让人头疼的事情,幸好databricks提供了丰富的api来进行解析,我们只需要引入相应的依赖包,使用Spark SqlContext来进行读取和解析,即可得到格式化好的数据。

    下面我们讲述spark从hdfs读写解析常见的几种文本文件的方式。

    1. Spark读写csv文件

    • 需引入的外部jar包
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.11</artifactId>
        <version>1.4.0</version>
    </dependency>
    
    • 读取csv文件核心代码
    import org.apache.spark.sql.SQLContext
    val sqlContext = new SQLContext(sc)
    sqlContext.read
        .format("com.databricks.spark.csv")
        .option("delimiter", ",") // 字段分割符
        .option("header", "true") // 是否将第一行作为表头header
        .option("inferSchema", "false") //是否自动推段内容的类型
        .option("codec", "none") // 压缩类型
        .load(csvFile) // csv文件所在hdfs路径 + 文件名
    
    • DataFrame加载为csv文件核心代码
    df.write.format("com.databricks.spark.csv")
    	.option("header", "true")
    	.option("codec", "none")
    	.save(tempHdfsPath) //落地文件hdfs路径,需提前创建路径
    
    • spark-csv项目源码

    https://github.com/databricks/spark-csv

    2. Spark读写txt文件

    • hdfs目录下/home/test/testTxt.txt文件内容
    a,b,c,d
    123,345,789,5
    34,45,90,9878
    
    • 读取txt文件核心代码
    scala> sqlContext.read.text("/home/test/testTxt.txt").show
    +-------------+
    |        value|
    +-------------+
    |      a,b,c,d|
    |123,345,789,5|
    |34,45,90,9878|
    +-------------+
    

    txt文件为按整行读取,如果需要获得相关字段,则需要对DataFrame的列进行拆分。详情见下一篇博文《Spark DataFrame列拆分与合并》

    • DataFrame加载为txt文件核心代码
    //获取dateframe所有的字段名
    val columnArr = df.columns.map { colName =>  
    	df.col(colName)
    }
    
    df.select(concat_ws(",", columnArr: _*) //将各列数据使用分隔符连接
    	.cast(StringType))
    	.write.format("text")
    	.save(tempHdfsPath) //落地文件hdfs路径,需提前创建路径
    

    3. Spark读写json文件

    • hdfs目录下/home/test/testJson.json文件内容
    {"a":"1747","b":"id抽取_SDK_按小时","c":1,"d":"2018112713"}
    {"a":"456","b":"232","c":10,"d":"203227324"}
    
    • 读取json文件核心代码
    scala> sqlContext.read.format("json").load("/home/test/testJson.json").show
    +----+------------+---+----------+                                              
    |   a|           b|  c|         d|
    +----+------------+---+----------+
    |1747|id抽取_SDK_按小时|  1|2018112713|
    | 456|         232| 10| 203227324|
    +----+------------+---+----------+
    
    • DataFrame加载为json文件核心代码
    df.write.format("json")
    	.save(tempHdfsPath) //落地文件hdfs路径,需提前创建路径
    

    4. Spark读写excel文件

    • 需引入的外部jar包
    <dependency>
    	<groupId>com.crealytics</groupId>
    	<artifactId>spark-excel_2.11</artifactId>
    	<version>0.12.2</version>
    </dependency>
    
    • 读取xlsx|xls文件核心代码
    import org.apache.spark.sql._
    
    val spark: SparkSession = ???
    val df = spark.read
        .format("com.crealytics.spark.excel")
        .option("useHeader", "true") // 是否将第一行作为表头
        .option("inferSchema", "false") // 是否推断schema
        .option("workbookPassword", "None") // excel文件的打开密码
        .load(excelFile) //excel文件路径 + 文件名
    
    • DataFrame加载为xlsx|xls文件核心代码
    df.write.format("com.crealytics.spark.excel")
    	.option("useHeader", "true")
    	.option("timestampFormat", "MM-dd-yyyy HH:mm:ss")
    	.option("inferSchema", "false")
    	.option("workbookPassword", "None") 
    	.save(tempHdfsPath) //落地文件hdfs路径,需提前创建路径
    
    • spark-excel项目源码

    https://github.com/crealytics/spark-excel

    5. Spark读写xml文件

    • test.xml文件
    <catalog>
    	<testXml id="tx101">
    		<a>Tove</a>
    		<b>Jani</b>
    		<c>Reminder</c>
    		<d>Don't forget me this weekend!</d>
    	</testXml>
    	<testXml id="tx102">
    		<a>ksdhf</a>
    		<b>Jasfdi</b>
    		<c>Re</c>
    		<d>Don't forget me</d>
    	</testXml>
    </catalog>
    
    • 需引入的外部jar包
    <dependency>
    	<groupId>com.databricks</groupId>
    	<artifactId>spark-xml_2.11</artifactId>
    	<version>0.6.0</version>
    </dependency>
    
    • 读取xml文件核心代码
    import org.apache.spark.sql._
    
    val spark: SparkSession = ???
    val df = spark.read
        .format("com.databricks.spark.xml")
        .option("rowTag", "testXml") // xml文件rowTag,分行标识,"testXml"即为上文rowTag
        .load(xmlFile) //xml文件路径+文件名
    
    • DataFrame加载为xml文件核心代码
    df.write.format("com.databricks.spark.xml")
    	.option("rowTag", "testXml")
    	.option("rootTag", "catalog")
    	.save(tempHdfsPath) //落地文件hdfs路径,需提前创建路径
    
    • spark-xml项目源码

    https://github.com/databricks/spark-xml

    6. Spark读取avro文件

    • 需引入的外部jar包
    <dependency>
    	<groupId>org.apache.spark</groupId>
    	<artifactId>spark-avro_2.11</artifactId>
    	<version>2.4.4</version>
    </dependency>
    
    • 读取avro文件核心代码
    import org.apache.spark.sql._
    
    val spark: SparkSession = ???
    spark.conf.set("spark.sql.avro.compression.codec", "deflate")   //设置avro文件压缩方式
    spark.conf.set("spark.sql.avro.deflate.level", "2")
    
    val df: DataFrame = spark.read
    	.format("avro")
    	.option("avroSchema", "/.../.../test.avsc")  //设置avsc格式的avro文件字段信息
    	.load(avroFilePath)  //指定avro文件路径
    
    展开全文
  • I am writing a spark/scala program to read in ZIP files, unzip them and write the contents to a set of new files. I can get this to work for writing to the local file system but wondered if there was ...

    I am writing a spark/scala program to read in ZIP files, unzip them and write the contents to a set of new files. I can get this to work for writing to the local file system but wondered if there was a way to to write the output files to a distributed file system such as HDFS. Code is shown below`

    import java.util.zip.ZipInputStream

    import org.apache.spark.input.PortableDataStream

    import java.io._

    var i =1

    sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {

    val zipStream = new ZipInputStream(file._2.open)

    val entry = zipStream.getNextEntry

    val iter = scala.io.Source.fromInputStream(zipStream).getLines

    val fname = f"/d/tmp/myfile$i.txt"

    i = i + 1

    val xx = iter.mkString

    val writer = new PrintWriter(new File(fname))

    writer.write(xx)

    writer.close()

    iter

    }).collect()

    `

    解决方案

    You can easy write data to HDFS using hadoop-common library (if you are using sbt as dependency manangement tool, add thath library to your dependency). With that you can create a FileSystem object :

    private val fs = {

    val conf = new Configuration()

    FileSystem.get(conf)

    }

    Be sure to configure the FileSystem with your hadoop cluster information (core-site.xml, etc)

    Then you can write, for example a String to path (in your case you should deal with streams), on HDFS as following:

    @throws[IOException]

    def writeAsString(hdfsPath: String, content: String) {

    val path: Path = new Path(hdfsPath)

    if (fs.exists(path)) {

    fs.delete(path, true)

    }

    val dataOutputStream: FSDataOutputStream = fs.create(path)

    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))

    bw.write(content)

    bw.close

    }

    展开全文
  • Spark 对很多种文件格式的读取和保存方式都很简单。从诸如文本文件的非结构化的文件, 到诸如 JSON 格式的半结构化的文件,再到诸如 ...一、读取1.1 读取txt文件只需要使用文件路径作为参数调用 SparkContext 中的 ...

    Spark 对很多种文件格式的读取和保存方式都很简单。从诸如文本文件的非结构化的文件, 到诸如 JSON 格式的半结构化的文件,再到诸如 SequenceFile 这样的结构化的文件, Spark 都可以支持(见下表) 。 Spark 会根据文件扩展名选择对应的处理方式。这一过程是封装 好的,对用户透明。

    29bd6aa94d981648ffb9cf1f96bf03b5.png

    一、读取

    1.1 读取txt文件

    只需要使用文件路径作为参数调用 SparkContext 中的 textFile() 函数,就可以读取一个文本文件,读取的一行作为一个元素。如果要控制分区数的话,可以指定 minPartitions 。

    read_txt 

    1.2 读取csv文件

    csv 文件的读取和 txt 相同。

    read_csv 

    1.3 读取json文件

    将数据作为文本文件读取, 然后对 JSON 数据进行解析, 这样的方法可以在所有支持的 编程语言中使用。 这种方法假设文件中的每一行都是一条 JSON 记录。 如果你有跨行的 JSON 数据,你就只能读入整个文件,然后对每个文件进行解析。

    import 

    1.4 读取HDFS文件

    读取HDFS文件和前面的格式相同,只是路径有一些变化

    read_hdfs 

    二、保存

    2.1保存到本地txt

    保存到本地一般是比较小的数据,可以先 collect 再保存到本地,如果直接保存存储的会是各个分区的数据。

    # 直接保存,会保存成文件夹
    

    2.2 保存到本地json

    这个时候需要先对 json 进行 dumps 处理。

    save_json 
    展开全文
  • Spark读取压缩文件

    万次阅读 2018-06-05 22:40:02
    本文讲如何用spark读取gz类型的压缩文件,以及如何解决我遇到的各种问题。 1、文件压缩 下面这一部分摘自Spark快速大数据分析:   在大数据工作中,我们经常需要对数据进行压缩以节省存储空间和网络传输开销...

    我的原创地址:https://dongkelun.com/2018/05/30/sparkGZ/

    前言

    本文讲如何用spark读取gz类型的压缩文件,以及如何解决我遇到的各种问题。

    1、文件压缩

    下面这一部分摘自Spark快速大数据分析:
      在大数据工作中,我们经常需要对数据进行压缩以节省存储空间和网络传输开销。对于大多数Hadoop输出格式来说,我们可以指定一种压缩编解码器来压缩数据。
      选择一个输出压缩编解码器可能会对这些数据以后的用户产生巨大影响。对于像Spark 这样的分布式系统,我们通常会尝试从多个不同机器上一起读入数据。要实现这种情况,每个工作节点都必须能够找到一条新记录的开端。有些压缩格式会使这变得不可能,而必须要单个节点来读入所有数据,这就很容易产生性能瓶颈。可以很容易地从多个节点上并行读取的格式被称为“可分割”的格式。下表列出了可用的压缩选项。

    格式 可分割 平均压缩速度 文本文件压缩效率 Hadoop压缩编解码器 纯Java实现 原生 备注
    gzip org.apache.hadoop.io.compress.GzipCodec
    lzo 是(取决于所使用的库) 非常快 中等 com.hadoop.compression.lzo.LzoCodec 需要在每个节点上安装LZO
    bzip2 非常高 org.apache.hadoop.io.compress.Bzip2Codec 为可分割版本使用纯Java
    zlib 中等 org.apache.hadoop.io.compress.DefaultCodec Hadoop 的默认压缩编解码器
    Snappy 非常快 org.apache.hadoop.io.compress.SnappyCodec Snappy 有纯Java的移植版,但是在Spark/Hadoop中不能用

      尽管Spark 的textFile() 方法可以处理压缩过的输入,但即使输入数据被以可分割读取的方式压缩,Spark 也不会打开splittable。因此,如果你要读取单个压缩过的输入,最好不要考虑使用Spark 的封装,而是使用newAPIHadoopFile 或者hadoopFile,并指定正确的压缩编解码器。

    关于上面一段话的个人测试:选取一个大文件txt,大小为1.5G,写spark程序读取hdfs上的该文件然后写入hive,经测试在多个分区的情况下,txt执行时间最短,因为在多个机器并行执行,而gz文件是不可分割的,即使指定分区数目,但依然是一个分区,一个task,即在一个机器上执行,bzip2格式的文件虽然是可分割的,即可以按照指定的分区分为不同的task在多个机器上执行,但是执行时间长,比gz时间还长,经过四次改变bzip2的分区,发现最快的时间和gz时间是一样的,如果指定一个分区的话,比gz要慢很多,我想这样就可以更好的理解:”尽管Spark 的textFile() 方法可以处理压缩过的输入,但即使输入数据被以可分割读取的方式压缩,Spark 也不会打开splittable”这句话了。


    后续测试:根据集群的cpu合理分配executor的个数的情况下,txt的时间缩短到1分钟,bzip2缩短到1.3分钟,而对gz重新分区(reparation)缩短到2分钟,可以看到在合理分配资源的情况下,bzip2比gz快不少,但依然赶不上txt,当然这也的结果可能受文件大小和集群资源的限制,所以根据自己的实际需求测试再决定用哪个即可。

    2、代码

    代码很简单,用textFile()即可,假设,我的数据名为data.txt.gz,我把它放在hdfs上的/tmp/dkl路径下那么代码为:

    val path = "hdfs://ambari.master.com:8020/tmp/dkl/data.txt.gz"
    val data = sc.textFile(path)

    注:把数据放在hdfs的命令为

    hadoop fs -put data.tar.gz /tml/dkl

    3、一些小问题

    3.1 数据

    首先造几个数据吧,先创建一个txt,名字为data.txt,内容如下

    1            张三            上海        2018-05-25
    2            张三            上海        2018-05-25
    3            张三            上海        2018-05-25
    4            张三            上海        2018-05-25
    5            张三            上海        2018-05-25

    3.2 如何压缩

    那么如如何打包为gz格式的压缩文件呢,分两种
    一、 在windows上打包,如果不想在Linux服务器上用命令打包,那么可以直接用windows上的软件打包(win上常见的zip,rar格式,spark是不支持的),我用7-zip软件压缩,大家可百度7-zip或直接在https://www.7-zip.org/下载安装,压缩格式选gzip即可。
    二、 在Linux上压缩,可通过下面的命令
    1、保留原文件

    gzip –c data.txt > data.txt.gz

    2、不保留原文件,默认生成的文件名为原文件名.gz,即data.txt.gz

    gzip data.txt

    压缩完了之后,跑一下程序测试一下

    data.take(3).foreach(println)
    1            张三            上海        2018-05-25
    2            张三            上海        2018-05-25
    3            张三            上海        2018-05-25

    根据结果看没问题。
    三、 说明
    在Linux上用tar命令压缩,spark虽然可以读,但是第一行会有文件信息

    tar -zcvf data.tar.gz data.txt

    3.3 文件编码问题

    别人给我的原文件是.rar,那我需要将其解压之后得到txt,然后按照上述方式压缩为.gz,然后上传到hdfs,进行代码测试,打印前几条发现乱码,查了一下发现原文件是gbk编码的,且sc.textFile()不能指定编码,只能读取utf8格式,其他格式就会乱码。

    注意:因为实际情况下解压后的txt文件很大,windows是直接打不开的,所以不能通过打开文件修改编码的方法去解决。

    3.3.1 构建测试gbk格式的文件

    1、windows上可以用记事本打开,另存为,编码选择ANSI即可

    2、Linux可以通过下面的命令修改

    iconv -f utf8 -t gbk data.txt > data_gbk.txt

    测试一下输出,发现确实乱码了(直接测试txt即可)

    1            ����            �Ϻ�        2018-05-25
    2            ����            �Ϻ�        2018-05-25
    3            ����            �Ϻ�        2018-05-25

    3.3.2 代码解决

    通过如下代码测试即可
    定义方法

    import org.apache.spark.rdd.RDD
    import org.apache.spark.SparkContext
    import org.apache.hadoop.io.LongWritable
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.hadoop.io.Text
    def transfer(sc: SparkContext, path: String): RDD[String] = {
      sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
        .map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
    }

    测试方法

    transfer(sc, path3).take(3).foreach(println)

    参考:Spark Scala 读取GBK文件的方法

    3.3.3 Linux命令

    可直接通过Linux命令转换txt的编码格式,再压缩,这样代码就不用修改
    其实在3.2.1中已经涉及到了
    1、通过Linux自带的命令iconv
    iconv不能覆盖原来的文件,只能生成新的文件之后,再通过mv命令去覆盖

    iconv -f gbk -t utf8 data_gbk.txt > data_new.txt

    2、通过enca
    enca可以直接覆盖原来的文件,这样如果不想改变来的文件名,就少一步mv操作了,enca不是子系统自带的,需要自己下载安装,可在http://dl.cihar.com/enca/下载最新版本。

    #下载&解压
    wget http://dl.cihar.com/enca/enca-1.19.tar.gz
    tar -zxvf enca-1.19.tar.gz
    cd enca-1.19
    #编译安装
    ./configure
    make
    make install

    安装好了之后通过下面的命令转换即可

    enca -L zh_CN -x UTF-8 data_gbk.txt 

    转换编码格式之后,在通过程序测试即可。

    参考:linux 下的文件编码格式转换

    3.4 rdd换df

    由于文件过大,不能直接打开看也没用垃圾数据,造成格式问题,如果有垃圾数据,在rdd转df的过程中会产生异常,这里记录一下我碰见的问题。

    1、首先可以先打印出前几行数据查看一下该文件的大体格式

    2、碰到的一个一个异常
    代码用的旧版spark(1.6版本) 将rdd动态转为dataframe里面的方法。

    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true])....

    原因是因为文件里有一行数据为垃圾数据,这行数据的列数和列名的个数不一样导致的,可以在代码中过滤掉这样数据即可。

    .filter(_.length == colName.length)
    展开全文
  • Spark读取本地文件和HDFS文件

    千次阅读 2020-01-09 11:44:05
    旁边的实习生又一脸懵逼了:Spark有bug,明明我本地/data目录下有test.txt文件,但运行就报错: Caused by: java.io.FileNotFoundException: File file:/data/test.txt does not exist 我一看,原来小伙子使用spark...
  • Spark读取本地文件问题

    万次阅读 热门讨论 2019-01-14 18:51:00
    Spark 读取本地文件问题 网上给出的多是下面两种方案 解决方式1:让每个Worker节点的相应位置都有要读取的数据文件。 解决方式2:直接将数据文件上传到hdfs,达到数据共享。(强烈推荐,比格更高更专业) ...
  • txt 文件读写示例 import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class test
  • 1、spark读取csv数据文件 https://www.cnblogs.com/gaopeng527/p/4961464.html val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "/tmp/model/nlp/lda/data_result_0605_new.txt", ...
  • Spark读取文件

    2018-02-28 11:54:00
    spark默认读取的是hdfs上的文件。 如果读取本地文件,则需要加file:///...读取hdfs文件, 可以这样指定路径 hdfs://ns1/tmp/test.txt。 如果不指定任何前缀,则使用hdfs的默认路径/user/data/ 启动spark-shell...
  • Spark读取文本文件并转换为DataFrame

    万次阅读 2018-08-08 23:09:43
    Spark ML里的核心API已经换成了DataFrame,为了使读取到的值成为DataFrame类型,我们可以直接使用读取CSV的方式来读取文本文件,可问题来了,当文本文件...本文将介绍spark读取多列txt文件后动态转成DataFrame的方法。
  • SPARK读取HDFS文件

    2021-03-25 16:20:30
    sc.textFile("hdfs://polo:9000/sparktest/data.txt");//polo是你的主机名
  • spark读取sequenceFile文件

    千次阅读 2019-06-05 18:18:38
    先用hive创建一张存储为sequenceFile文件的表 create table test(id string,time string) row format delimited fields terminated by '\t'; load data local inpath '/home/hadoop/data/order_seq.txt' into ...
  • 在scala编写spark程序使用了sc.textFile("file:///home/hadoop/2.txt"), 竟然报错java.io.FileNotFoundException: File file:/home/hadoop/2.txt does ...之后我将文件放在hdfs上面,就能读取的到,这是怎么回事
  • 对于spark的典型应用场景为批处理,一般由基本数据源(文件系统如:hdfs)或者高级数据源(flume、kafka)作为spark的数据接入端。输出一样可以是文件系统或数据库等等。本文介绍一个用java写的demo程序,功能是从...
  • 在一些情景下,需要用spark读取hadoop的文件的具体内容,这里做一下简单介绍。 pom.xml       pom.xml如下: <dependencies> <dependency> <groupId>org.apache...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,509
精华内容 4,603
关键字:

spark读取txt文件