精华内容
下载资源
问答
  • SparkSQL读取CSV文件

    千次阅读 2018-10-22 17:51:34
    一、核心代码 val spark = SparkSession .builder() .master("local[2]") .appName("app") .getOrCreate() //读取文件 val sr...

    一、核心代码

       val spark = SparkSession
                        .builder()
                        .master("local[2]")
                        .appName("app")
                        .getOrCreate()
        //读取文件
        val srcDF = spark
                        .read
                        .format("csv")
                        .option("header","true")
                        .option("multiLine", true)
                        .load("file:///C:\\1.csv")
    
        spark.stop()

    二、关键参数

           format指定读取csv文件。

           header是否指定头部行作为schema。

           multiLine在单元格中可能因为字数多有换行,但是不指定这个参数,处理数据时可能会报错。指定这个参数为true,可以将换行的单元格合并为1行。

    三、写出csv文件

          

    write.csv("/data/csv")

       如果字段内有换行的话,最好对字段进行处理。

    translate(jsonData,'\r\n','')
    //将字段中换行去掉

     

    展开全文
  • SparkSql读取csv实现统计功能

    千次阅读 2020-03-01 13:30:23
    前面已经介绍过有关sparksql读取json文件取得DataSet的功能,但实际开发中除了json外还可以使用...sparksql读取csv可以根据csv文件的第一行作为header自动推导出列名或schema,也可以通过手动的方式指定schema,自动推...

    前面已经介绍过有关sparksql读取json文件取得DataSet的功能,但实际开发中除了json外还可以使用csv、数据库等作为sparksql的数据源,因为csv日常开发也用的很多所以借此机会把我的学习代码分享给大家

    一 关于csv的schema

    sparksql读取csv可以根据csv文件的第一行作为header自动推导出列名或schema,也可以通过手动的方式指定schema,自动推导读取csv时需要指定option参数,看下官方的文档

    You can set the following CSV-specific options to deal with CSV files:

    • sep (default ,): sets a single character as a separator for each field and value.
    • encoding (default UTF-8): decodes the CSV files by the given encoding type.
    • quote (default "): sets a single character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you need to set not null but an empty string. This behaviour is different from com.databricks.spark.csv.
    • escape (default \): sets a single character used for escaping quotes inside an already quoted value.
    • charToEscapeQuoteEscaping (default escape or \0): sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise.
    • comment (default empty string): sets a single character used for skipping lines beginning with this character. By default, it is disabled.
    • header (default false): uses the first line as names of columns.
    • enforceSchema (default true): If it is set to true, the specified or inferred schema will be forcibly applied to datasource files, and headers in CSV files will be ignored. If the option is set to false, the schema will be validated against all headers in CSV files in the case when the header option is set to true. Field names in the schema and column names in CSV headers are checked by their positions taking into account spark.sql.caseSensitive. Though the default value is true, it is recommended to disable the enforceSchema option to avoid incorrect results.
    • inferSchema (default false): infers the input schema automatically from data. It requires one extra pass over the data.
    • samplingRatio (default is 1.0): defines fraction of rows used for schema inferring.
    • ignoreLeadingWhiteSpace (default false): a flag indicating whether or not leading whitespaces from values being read should be skipped.
    • ignoreTrailingWhiteSpace (default false): a flag indicating whether or not trailing whitespaces from values being read should be skipped.
    • nullValue (default empty string): sets the string representation of a null value. Since 2.0.1, this applies to all supported types including the string type.
    • emptyValue (default empty string): sets the string representation of an empty value.
    • nanValue (default NaN): sets the string representation of a non-number" value.
    • positiveInf (default Inf): sets the string representation of a positive infinity value.
    • negativeInf (default -Inf): sets the string representation of a negative infinity value.
    • dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type.
    • timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.
    • maxColumns (default 20480): defines a hard limit of how many columns a record can have.
    • maxCharsPerColumn (default -1): defines the maximum number of characters allowed for any given value being read. By default, it is -1 meaning unlimited length
    • mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing. It supports the following case-insensitive modes.
      • PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets other fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. A record with less/more tokens than schema is not a corrupted record to CSV. When it meets a record having fewer tokens than the length of the schema, sets null to extra fields. When the record has more tokens than the length of the schema, it drops extra tokens.
      • DROPMALFORMED : ignores the whole corrupted records.
      • FAILFAST : throws an exception when it meets corrupted records.
    • columnNameOfCorruptRecord (default is the value specified in spark.sql.columnNameOfCorruptRecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord.
    • multiLine (default false): parse one record, which may span multiple lines.

    参数看起来特别多,但是大多都有默认值,实际读取的时候只需指定很少的就行了,如下所示

    Dataset<Row> ds=spark.read()
    	  //自动推断列类型
    	   .option("inferSchema", "true")
    	   //指定一个指示空值的字符串
    	   .option("nullvalue", "?")
    	   //当设置为 true 时,第一行文件将被用来命名列,而不包含在数据中
    	   .option("header", "true")
    	   .csv("/home/cry/myStudyData/userList.csv");

    如果不喜欢这种方式也可以选择手动方式指定schema

    List<StructField> fs=new ArrayList<StructField>();
    StructField f1=DataTypes.createStructField("id", DataTypes.IntegerType, true);
    StructField f2=DataTypes.createStructField("name", DataTypes.StringType, true);
    StructField f3=DataTypes.createStructField("age", DataTypes.IntegerType, true);
            
    fs.add(f1);
    fs.add(f2);
    fs.add(f3);
    		
    StructType schema=DataTypes.createStructType(fs);
    		
         
    Dataset<Row> ds=spark.read().schema(schema).csv("/home/cry/myStudyData");

    二  完整的代码

    package com.debug;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    public class ReadCsv {
    
    	public static void main(String[] args) {
    		SparkSession spark = SparkSession.builder().appName("读取csv做统计").master("local[*]").getOrCreate();
    	    List<StructField> fs=new ArrayList<StructField>();
            StructField f1=DataTypes.createStructField("id", DataTypes.IntegerType, true);
            StructField f2=DataTypes.createStructField("name", DataTypes.StringType, true);
            StructField f3=DataTypes.createStructField("age", DataTypes.IntegerType, true);
            
            fs.add(f1);
            fs.add(f2);
            fs.add(f3);
    		
            StructType schema=DataTypes.createStructType(fs);
    		
    		/*Dataset<Row> ds=spark.read()
    		  //自动推断列类型
    		  .option("inferSchema", "true")
    		  //指定一个指示空值的字符串
    		  .option("nullvalue", "?")
    		  //当设置为 true 时,第一行文件将被用来命名列,而不包含在数据中
    		  .option("header", "true")
    		  .csv("/home/cry/myStudyData/userList.csv");*/
         
            Dataset<Row> ds=spark.read().schema(schema).csv("/home/cry/myStudyData");
    		ds.createOrReplaceTempView("user");
    		Dataset<Row> res=spark.sql("select * from user where age>25");
    		res.show();
    		
    		spark.stop();
    	}
    
    }
    

    其中的一个csv内容如下

    展开全文
  • * 读取csv格式的文件。 */ object ReadCsvDataToDF { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder() .master("local[1]") .appName("ReadCsvDataToDF") .get
    import org.apache.spark.sql.SparkSession
    /**
     * 读取csv格式的文件。
     */
    object ReadCsvDataToDF {
      def main(args: Array[String]): Unit = {
        val session: SparkSession = SparkSession.builder()
          .master("local[1]")
          .appName("ReadCsvDataToDF")
          .getOrCreate()
        // 添加头
        session.read.option("header", true)
          .csv("T:/code/spark_scala/data/spark/csvdata.csv")
          .show()
      }
    }
    
    
    展开全文
  • 一、前言 软件如下:hadoop 2.7.2 、scala 2.11.11、mysql、spark2.2.1需要提前安装好。...数据格式 shop数据 item数据 关系数据 3.csv文件用途 批量导入csv文件进neo4j形成shop标签与item标签及之间关联关系。

    一、前言

     软件如下:hadoop 2.7.2 、scala 2.11.11、mysql、spark2.2.1需要提前安装好。

    二、主要逻辑

    1.mysql如果格式如下

    mysql表结构

    response字段结构:

    {"bulletin":"","couponList":[],"createTime":"","deliveryFee":8,"deliveryMsg":"","deliveryTime":"0","deliveryType":0,"dpShopId":0,"itemList":[{"activityTag":"","activityType":0,"bigImageUrl":"","cActivityTag":"","cTag":"126421929","categoryName":"","categoryType":"0","createTime":"","currentPrice":1.0,"dpShopId":0,"iconUrl":"","littleImageUrl":"","mtWmPoiId":"","originPrice":1.0,"praiseNum":0,"sellStatus":0,"shopName":"**茶餐厅*","spuDesc":"","spuId":1997065665,"spuName":"","spuPromotionInfo":"","statusDesc":"","tag":"126421929","unit":""}],"minFee":20.0,"mtWmPoiId":"*","onlinePay":1,"shipping_time":"","shopName":"**茶餐厅*","shopPic":"","shopStatus":0}

    2.spark读取mysql数据保存csv

    
    import java.text.SimpleDateFormat
    import java.util.Date
    
    import com.alibaba.fastjson.{JSON, JSONArray}
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    import scala.collection.mutable.ListBuffer
    
    object SparkSqlShopItemToCsv {
    
      def main(args: Array[String]): Unit = {
        val session = SparkSession
          .builder()
          .appName("shop")
          .master("local[4]")
          .getOrCreate()
        import session.implicits._
        val recordRDD:DataFrame = session.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost/wm", "driver" -> "com.mysql.jdbc.Driver"
          , "dbtable" -> "record", "user" -> "root", "password" -> "123456")).load()
        recordRDD.createOrReplaceTempView("record")
        //读取record表中的数据
        val recordDf = session.sql("select a.response,a.request,a.create_time from record a where a.create_time >'2020-06-13 22:00:30'")
    
        val currentDate = getNowDate("yyyy-MM-dd HH:mm:ss")
        import scala.collection.JavaConversions._
        //对record 中的数据进行处理
        val ds = recordDf.map(one => {
          //获取response字段的值
          val response = one.getAs[String]("response")
          val recordJsonObject = JSON.parseObject(response)
          val mtWmPoiId = recordJsonObject.getString("mtWmPoiId")
          val shopName = recordJsonObject.getString("shopName")
          //店铺
          val shop = new Shop(mtWmPoiId, shopName, currentDate)
          val itemArray: JSONArray =recordJsonObject.getJSONArray("itemList")
          val itemObjectList: List[AnyRef] = itemArray.iterator().toList
    
          val itemListBuffer = ListBuffer[Item]()
          val relationListBuffer = ListBuffer[Relation]()
          for(x <-itemObjectList){
            val itemJsonObject = JSON.parseObject(x.toString)
            val spuId = itemJsonObject.getLong("spuId")
            val spuName = itemJsonObject.getString("spuName")
            //商品
            val item = new Item(spuId, spuName, currentDate)
            itemListBuffer.append(item)
            //店铺与商品关系
            val relation = new Relation(mtWmPoiId, shopName, spuId, spuName, currentDate)
            relationListBuffer.append(relation)
          }
          val itemList = itemListBuffer.toList
          val relationList = relationListBuffer.toList
          (shop, itemList, relationList)
        })
    
        //分别获取出店铺、商品、店铺与商品
        val shopDs:Dataset[Shop] = ds.map(one => {
          one._1
        })
        val itemDs: Dataset[Item] = ds.flatMap(one => {
          one._2
        })
        val relationDs: Dataset[Relation] = ds.flatMap(one => {
          one._3
        })
    
        //数据去重
        shopDs.createOrReplaceTempView("shop")
        itemDs.createOrReplaceTempView("item")
        relationDs.createOrReplaceTempView("relation")
        
        //写入hdfs 作为csv文件
        val date = getNowDate("yyyyMMdd")
        session.sql("select DISTINCT a.wm_poi_id,a.shop_name,a.create_time from shop a ").coalesce(4).write.option("header", "true").mode("Append").csv("hdfs://ELK01:9000/tmp/csv/"+date+"/shop.csv")
        session.sql("select DISTINCT b.spu_id,b.spu_name,b.create_time from item b ").coalesce(4).write.option("header", "true").mode("Append").csv("hdfs://ELK01:9000/tmp/csv/"+date+"/item.csv")
        session.sql("select DISTINCT c.wm_poi_id,c.shop_name,c.spu_id,c.spu_name,c.create_time from relation c ").coalesce(4).write.option("header", "true").mode("Append").csv("hdfs://ELK01:9000/tmp/csv/"+date+"/relation.csv")
      }
    
      def getNowDate(dataFormat:String):String={
        var now:Date = new Date()
        var  dateFormat:SimpleDateFormat = new SimpleDateFormat(dataFormat)
        var time = dateFormat.format( now )
        time
      }
    
      case class Item(spu_id:Long, spu_name:String,create_time:String)
    
      case class Shop(wm_poi_id:String, shop_name:String,create_time:String)
    
      case class Relation(wm_poi_id:String, shop_name:String,spu_id:Long,spu_name:String,create_time:String)
    
    }
    

    3.验证

    1.hdfs上有数据

    2.数据格式

    shop数据

    item数据

    关系数据

    3.csv文件用途

    批量导入csv文件进neo4j形成shop标签与item标签及之间关联关系。

    展开全文
  • 读取txt文件 @Test def count2(): Unit ={ val spark: SparkSession = SparkSession.builder().appName("test3").master("local[3]").getOrCreate() ...//也可以把txt文件当csv文件读取也可以 spark...
  • 第一种:Spark2.0之前通过外部包的形式 第一步:导入依赖 <dependency> <groupId>.../groupId>...spark-csv_2.10</artifactId> <version>1.4.0</version> </depend...
  • NULL 博文链接:https://humingminghz.iteye.com/blog/2309413
  • 格式为${当天日期}visit.txt,例如20180707visit.txt,现在需要将其通过spark-sql程序实现将该文件读取并以parquet的格式通过外部表的形式保存到hive中,最终要实现通过传参的形式,将该日期区间内的csv文件批量加载...
  • 读json格式的数据和文件 import spark.implicits._ // spark的一个隐式转换 val spark = SparkSession .builder() .master("local") .appName("JsonFileTest") ... / /读取json文件数据 ...
  • SparkSQL写入多种文件格式

    千次阅读 2018-10-24 21:11:27
    将数据库中的数据读取出来并以text json csv parquet四种格式写入到本地文件或者hdfs中 csv格式:能够以excel的形式打开  代码实现: package cn.ysjh0014.SparkSql import java.util.Properties import ...
  • spark 1.6.x 读取操作 CSV文件

    千次阅读 2018-02-11 16:03:19
    package demo.sparksql import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} /** * Created by joy on 2017/8/25. */.....
  • SparkSQL专门为读取HDFS上的文件开的外部数据源接口,spark-parquet、csv、json等都是这种方式。 DefaultSource 入口类,用来建立外部数据源连接,SparkSQL默认会找这个名字,不要改类名。基本所有接口都在这个...
  • 这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为parquet。 1)加载数据 spark.read.load是加载数据的通用方法 scala> spark.read. csv format jdbc ...
  • Spark_SparkSQL

    2020-06-23 19:16:15
    读取CSV文件 //读取csv文件数据 val spark = SparkSession.builder().appName("OnetonBaseMain").master("local[1]").getOrCreate() val path = "file:///F:\\oneton_test_ts_auto_audit_oneton_base.csv" val base ...
  • SparkSQL DataFrame Datasets

    2019-10-12 15:11:53
    文章目录概述:准备工作:案例及源码分析:1 读取 txt 文件数据2 读取json3 读取CSV 文件4 jdbc 方式读取 概述: 1)Spark SQL可以加载任何地方的数据,例如mysql,hive,hdfs,hbase等,而且支持很多种格式如json, ...
  • sparksql读hdfs数据

    2020-08-15 19:06:56
    通过pyspark模块,调用sparksql,读取hdfs目录数据,然后做统计。...#format后面为告诉程序读取csv格式,load后面为hdfs地址,hdfs后面跟着hadoop的名字,然后文件目录(这块有点懵,如果报错,跟着报错查修) data = sqlcon
  • sparksql_分析航线数据_DataFrame...# 设置文件路径 读取csv文件 flightPerfFilePath = "/databricks-datasets/flights/departuredelays.csv" airportsFilePath = "/databricks-datasets/flights/airport-codes-n...
  • sparksql_探索数据分布

    2020-01-12 16:06:27
    读取csv文件 创建dataframe的 schema: 获取schema 用.groupby(…)方法分组统计 用 .describe()方法对数值进行描述性统计: 偏态&离散程度 参考:...
  • spark中的dataframe与sparksql的实例

    千次阅读 2015-07-12 18:55:21
    本文中的project是做了两方面的工作,首先是读取netcdf文件,并将...下面附上工程中所使用的.csv文件如下: 1.NELE_POINT.csv 2.NODE.csv 假设你已经下载或者配置好scala,在IDEA下新建一个scala工程,并将spark
  • sparkSQl 可以读取不同数据源的数据,比如jdbc,json,csv,parquet 执行读操作就用sparkSession.read.文件类型,执行写操作就用SparkSession.write.文件类型 首先创建一个SparkSession: val spark = SparkSession....
  • sparkSQl 可以读取不同数据源的数据,比如jdbc,json,csv,parquet 执行读操作就用sparkSession.read.文件类型,执行写操作就用SparkSession.write.文件类型 首先创建一个SparkSession: val spark = SparkSession....
  • 目录需求背景及解决思路问题Ⅰ:csv编码问题...需求很简单,用spark读取csv文件,然后join数仓的点位表即可,伪代码如下: (csv文件2M,数仓中的点位表100亿数据) ... val frame = sparkSession.read.csv(localpath
  • 数据的加载和保存

    2021-05-26 10:12:06
    数据的加载和保存 ...SparkSQL 默认读取和保存的文件格式为 parquet 1)加载数据 spark.read.load 是加载数据的通用方法 scala> spark.read. csv table format text jdbc json textFile load option options
  • 83.SparkSQL读取CSV或TSV格式数据详解 84.自定义schema的方式转换RDD为DataFrame及直接运行SQL在数据文件上 85.SparkSQL中自带函数的使用及如何自定义UDF、注册和使用 86.Spark 2.x中SparkSQL基本使用(一) 87....

空空如也

空空如也

1 2
收藏数 32
精华内容 12
关键字:

sparksql读取csv文件