kv结构数据 spark 操作_spark算子 转化为kv值 - CSDN
  • Spark SQL操作数据

    2018-12-15 15:57:55
    Spark SQL支持通过DataFrame接口操作的多种不同的数据源。DataFrame提供支持统一的接口加载和保存数据源中的数据,包括:结构数据,Parquet文件,JSON文件,Hive表 ,以及通过JDBC连接外部数据源。 与Hive类似...

    Spark SQL支持通过DataFrame接口操作的多种不同的数据源。DataFrame提供支持统一的接口加载和保存数据源中的数据,包括:结构化数据,Parquet文件,JSON文件,Hive表 ,以及通过JDBC连接外部数据源。

    转载请标明原文地址:原文链接

    与Hive类似的,Spark SQL也可以创建临时表和持久表(即管理表),使用registerTempTable命令创建临时表,使用saveAsTable命令将数据保存到值就表,该命令将创建一个“管理表”,这也就意味着 数据的位置由Metastore控制,当表删除的时候,管理表将表数据自动删除。

    也可以通过配置SaveMode指定如何处理现有数据,实现保存模式不使用任何锁定,而且不是原子操作,因此,在并发环境下操作不能保证数据的安全性,保存模式参数选项如下:

    |Scala/Java |python|含义
    |-
    |SaveMode.ErrorIfExists(default)|“error”|如果保存数据已经存在,抛出异常
    |SaveMode.Append|“append”|如果保存数据已经存在,追写DataFrame数据
    |SaveMode.Overwrite|“overwrite”|如果保存数据已经存在,重写DataFrame数据
    |SaveMode.Ignore|“ignore”|如果保存数据已经存在,忽略DataFrame数据

    文本数据

    Spark SQL可以加载普通的文本文件来创建DataFrame来进行操作。
    以下面数据为例:

    shinelon,19
    mike,20
    wangwu,25
    

    操作代码如下:

     val sqlContext=new sql.SQLContext(sc)
    
        import sqlContext.implicits._              //隐式转换,将一个RDD转换为DataFrame
    
        //使用前缀hdfs://来标识HDFS存储系统的文件
        val people=sc.textFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql02.txt")
              .map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
    
        //DataFrame注册临时表
        people.registerTempTable("person")
        //使用sql运行SQL表达式
        val result=sqlContext.sql("SELECT name,age from person WHERE age>=19")
    
        println(result.map(t=>"Name:"+t(0)).collect())
    

    其实在上面将RDD转换为DataFrame,有两种方法,通过反射推断RDD模式和以编程方式定义模式,上面使用了反射方式,另一种方式参考上一篇文章末尾Spark SQL与DataFrame详解以及使用

    Parquet格式文件

    使用parquet格式文件,高效,因为其列式存储避免读入不需要的数据,有极好的性能和GC。而且方便压缩和解压缩,有更好的缓存效果。

    操作代码如下:

    /**
        * 将普通文本文件转换为parquet数据源来创建临时表
        * @param sc
        */
      def parquetTable(sc:SparkContext): Unit ={
        val sqlContext=new SQLContext(sc)
        //隐式转换为一个DataFrame
        import sqlContext.implicits._
    
        val peopleDF=sc.textFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql02.txt")
          .map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
    
        peopleDF.saveAsParquetFile("hdfs://hadoop-senior.shinelon.com:8020/user/shinelon/SparkSql/people.parquet")
        //加载Parquet为DataFrame
        val parquetFile=sqlContext.parquetFile("hdfs://hadoop-senior.shinelon.com:8020/user/shinelon/SparkSql/people.parquet")
        //将DataFrame注册为临时表,提供SQL查询使用
        parquetFile.registerTempTable("parquetTable")
    
        val result=sqlContext.sql("select name from parquetTable")
    
        result.map(t=>"Name:"+t(0)).collect().foreach(println)
    
        /**
          * 运行结果如下:
          * Name:shinelon
            Name:mike
            Name:wangwu
          */
      }
    

    上面程序需要注意的是,parquet文件不能在本地读写,需要在集群上操作,在windows本地操作有可能会报错。

    分区表
    与Hive类似的,Spark SQL也可以进行分区,下面是一个简单的分区表的创建:

      /**
        *
      scala> df1.printSchema()
    root
     |-- single: integer (nullable = false)
     |-- double: integer (nullable = false)
        * @param sc
        */
      def testPartition(sc:SparkContext): Unit ={
        val sqlContext=new SQLContext(sc)
        //隐式转换
        import sqlContext.implicits._
    
        val df1=sc.makeRDD(1 to 5).map(i=>(i,i*2)).toDF("single","double")
        df1.saveAsParquetFile("data/test/key=1")
    
        df1.printSchema()
    }
    

    Json文件

    Spark SQL可以自动推断出一个JSON数据集的Schema并作为一个DataFrame加载。下面是一个简单的实例。

    json数据如下:

    {"name":"Mirckel"}
    {"name":"Andy","age":30}
    {"name":"Jsutin","age":13}
    
    def test01(sc:SparkContext): Unit ={
        val sqlContext=new sql.SQLContext(sc)
        val df=sqlContext.jsonFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql01.json")
    
        df.registerTempTable("people")
    
        println(df.show())      //打印表数据
    
        println(df.printSchema())  //以树的形式打印DataFrame的Schema
    
        println(df.select(df("name"),df("age")+1).show())
    
        println("===============================")
    
        val result=sqlContext.sql("select name from people")
    
        result.foreach(println)
      }
    

    Hive表

    Spark SQL也可以从Hive表中读写数据,通过创建HiveContext进行一系列的操作。操作Hive表就需要将HIve的相关依赖或者jar包导入项目中,如果创建的是maven工程或者scala工程还必须将hive-site.xml和core-site.xml以及hdfs-site.xml配置文件拷贝到资源文件目录下。

    测试的相关数据集格式如下:

    238val_238
    86val_86
    311val_311
    27val_27
    165val_165
    409val_409
    255val_255
    278val_278
    98val_98
    

    即Spark源码中examples\src\main\resources目录下的数据集kv1.txt。读者可以去github中的Spark源码中下载。

    操作代码如下:

    /**
        * Spark SQL集成Hive表
        * 在Spark-shell中可以运行
        * @param sc
        */
      def testHive(sc:SparkContext): Unit ={
        val hiveContext=new HiveContext(sc)
    
    
        //创建表
        hiveContext.sql("create table if not exists src (key int,value string)")
        //加载数据
        hiveContext.sql("load data local inpath 'file:///opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/data/kv1.txt' into table src")
        //查询
        hiveContext.sql("from src select key,value").collect().foreach(println)
    
        /**
          * 运行部分结果如下:
          *
        [238,val_238]
        [86,val_86]
        [311,val_311]
        [27,val_27]
        [165,val_165]
        [409,val_409]
        [255,val_255]
          */
    
        sc.stop()
      }
    

    JDBC操作数据库

    除了上面介绍的一系列数据源之外,Spark SQL还支持使用JDBC操作关系型数据库,从关系型数据库中读写数据。这里连接mysql数据库。

    在mysql中,表中的数据如下所示:
    这里写图片描述

    操作代码如下:

      def testJDBC(sc:SparkContext): Unit ={
        val sqlContext=new SQLContext(sc)
    
        val jdbcDF=sqlContext.load("jdbc",Map(
          "url"->"jdbc:mysql://127.0.0.1:3306/library?user=root&password=123456",
          "dbtable"->"book",
          "driver"->"com.mysql.jdbc.Driver"
    //      "user"->"root",
    //      "password"->"123456"
        ))
    
        jdbcDF.show()
      }
    

    代码运行结果如下:
    这里写图片描述

    至此,本篇文章介绍完了Spark SQL如何对多数据源进行操作,完整的代码下载地址为:Spark SQL操作多数据源完整代码下载地址


    如果你想和我们一起学习交流,共同进步,欢迎加群:
    在这里插入图片描述

    展开全文
  • 本优化是生产环境下用Spark处理百亿规模数据的一些优化实战,并成功...本节主要从内存调优、高性能算子、数据结构优化、广播大变量和小表调优、动态并行度调优、Spark文件切分策略调优来介绍Spark处理大规模数据的一...

    本优化是生产环境下用Spark处理百亿规模数据的一些优化实战,并成功将程序的速度提升一倍(涉及到敏感信息本文在2018-07-04号将其删除,阅读上可能显得不完整)下面介绍一些基本的优化手段

    本文于2017-07-16号书写

    Spark任务优化

    本节主要从内存调优、高性能算子、数据结构优化、广播大变量和小表调优、动态并行度调优、Spark文件切分策略调优来介绍Spark处理大规模数据的一些优化实践。

    1 内存调优

    由于任务数据量大且会发生数据膨胀,如果内存参数设置不合理,任务容易出现OOM,分析Spark1.6.2内存管理模型如下图所示,知道Spark如何管理自己的内存我们才能进行更好的调优。

    内存调优详见:Spark统一内存管理:UnifiedMemoryManager

    任务任务内存参数配置:

    spark.driver.memory=6g(存在广播所以Driver设置的较大)
    spark.executor.memory=13G
    spark.memory.fraction=0.4

    内存参数配置计算公式:
    Execution Memory 2.5G =(Heap size(13G)- Reserve Memory(450M))* spark.memory.fraction 0.4 * spark.memory.storageFraction 0.5
    用户主导的空间:User Memory 7.5G =(Heap size(13G)- Reserve Memory(450M))* (1 - spark.memory.fraction 0.4)
    安全因子:0.9,考虑到内存空间使用和预估的准确度,实际应用过程中会考虑加入一个安全因子。
    可用用户主导空间:User Memory * 0.9 = 6.8G(根据实际情况,数据条数 * 每条数据任务后占用内存最大值,基于此评估一个最大值,如果超过这个值就会出现OOM)
    效果:解决程序OOM问题,因为任务过程维护了大的数据结构,其主要使用了User Memory的空间,用Spark默认内存配置会导致用户空间OOM。

    2 高性能算子

    任务是同一个用户的行为数据,分布式处理需要把一个用户的数据抓取到一个节点上处理,有Shuffle操作,如下图所示同源数据采用groupByKey时Shuffle Write数据量3.5T,aggregateByKey时Shuffle Write数据量3T,相比节省时间2~3min。
    分析数据分布的特征,同一个设备的数据一般在一个文件出现的概率较大,将groupByKey算子改成 aggregateByKey,首先进行了一个Map端的聚合,减少了网络传输的数据量。

    模拟代码:
    val initialSeq = mutable.Seq.empty[Row]
    val addToSeq = (s: mutable.Seq[Row], v: Row) => s :+ v // Map端本地聚合
    val mergePartitionSeqs = (p1: mutable.Seq[Row], p2: mutable.Seq[Row]) => p1 ++ p2 // Reduce端聚合
    kv.aggregateByKey(initialSeq)(addToSeq, mergePartitionSeqs)

    效果:减少网络传输数据量,时效性提升了2~5min,降低网络异常导致任务失败的风险。

    3 数据结构优化

    任务代码中,采用更加节省内存的数据结构,例如聚合的key、最短路径中的索引(如下模拟代码所示)等多处采用字符串拼接实现,避免自定义对象封装数据,尽可能使用轻量的Array而不是HashMap等
    效果:节省内存。

    4 广播大变量和小表调优

    任务任务为什么需要广播?我们先看一下广播的原理,如下图Executor端用到了Driver的List,如果广播List则每个Executor中只有一份Driver端的变量副本。如果不广播List,Executor有多少task就有多少Driver端的变量副本。如果对小表广播能实现本地Join,避免sortMergeJoin(如果使用SparkSQL发现不广播可以加上这个参数:spark.sql.statistics.fallBackToHdfs=true)。

    任务过程需要关联一些小的维表或定义一些大的变量,并存在大量task,所以需要广播。
    效果(1)降低网络传输的数据量;(2)降低内存的使用;(3)加快程序的运行速度。
    通过如上四种Spark任务优化,使任务运行更加稳定,同时也节省了内存。

    5 动态并行度调优

    数据量节假日数据量明显增大,是正常值的1~2倍,为了保障数据稳定生产,任务链条包括“数据清洗任务”和“任务任务”,“数据清洗任务”主要是从百亿条数据清洗出任务需要的50亿~100亿数据,并且把上游上万个大小不同的文件合并成固定个数和大小,这个任务产出的文件个数和大小对“任务任务”是有影响的,如何确定这个文件大小和个数?
    在数据量相同且资源配置相同条件下,要保证任务在1h内完成,测试单Task处理不同文件大小或不同数据条数的执行情况:
    (1)处理文件大小200M~256M左右,每个Task处理条数大概60万左右,Task平均执行时长8~10min;如图:每个Task处理文件大小为247M,其中75%的Task执行时长为10min,而且max值为17min,就算max失败,Task失败重新执行也不会影响到任务整体结束时间。


    (2)处理文件大小256~285M左右,每个Task处理条数大概70万左右,Task平均执行时长12min左右;如图:每个Task处理文件大小为271M,其中75%的Task执行时长为12min,而且max值20min,Task失败重新执行对任务整体时效性有影响。

    (3)处理文件大小300M~350M时,每个Task处理数据条数80万左右,Task平均执行时长17min左右。如图:每个Task处理文件大小313M,其中75%的Task执行时长为17min,而且max值太大,这样就拖慢了整个运行过程。

    通过大量测试发现,Task平均执行时长8~10min,每个Task处理条数大概60万左右时任务任务遇到失败Task、慢节点、某台机器故障等在开启慢任务推测情况下表现较优。

    慢任务推测参数配置:
    spark.speculation=true
    spark.speculation.interval=60s
    spark.speculation.multiplier=1.3
    spark.speculation.quantile=0.99

    基于如上测试,对“数据清洗任务”产出文件数量进行动态调整,让文件大小尽量在200M~256M左右,文件数量和大小,可以根据历史数据条数、文件个数、每个文件大小,考虑节假日等情况去预估,当然也可以采用机器学习等算法去预测。比如简单计算:本周日文件个数 = 上周日数据总条数/60万 ,因为数据清洗任务后数据量大概是50亿~100亿条,所以文件个数阈值为[7000,16000]
    依据每个Core处理2~3个Task,每个Task处理60万条数据文件大小为200~256M表现较好,开启了Executor动态资源分配功能如下:

    spark.dynamicAllocation.minExecutors=1000
    spark.dynamicAllocation.maxExecutors=1600

    参数:spark.default.parallelism是控制Shuffle并行度的,从而会影响Spark Task个数,间接影响文件产出个数。
    “数据清洗任务”是一个离线按天执行的任务,通过动态调整spark.default.parallelism的值保证产出文件个数和大小。
    “核心任务”是一个离线按天执行的任务,通过动态调整spark.default.parallelism的值,进一步保证任务过程每个Task处理的文件维持在256M左右,数据条数维持在60万左右。
    效果:避免了节假日任务执行超时/任务失败,保证生产耗时的相对平稳。
    通过如上参数调整,提高并缩短了生产耗时稳定性,主要是扩大Executor个数(资源才是王道),生产耗时缩短到了31min左右,如下图所示

    6 Spark文件切分策略调优

    ORC文件切分详见:spark 读取ORC文件时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案

    任务任务上游的“数据清洗任务”,会清洗出任务需要的有效数据,并且对上游上万个小文件进行合并压缩成ORC文件,其文件大小在256M左右。

    1)任务任务遇到问题:作业提交后ApplicationMaster(Driver)启动了,Spark任务长时间占用资源,SparkUI看不到DAG图、Stage、Partition和Task相关的信息。
    2)问题分析:Driver启动,但是Executor没干活,说明问题出在了Driver,Driver干什么呢?定位到Driver在计算Partition,发生了Full GC,于是问题定位到了Spark读取文件的方法OrcInputFormat.java。
    3)通俗描述:老大(Driver)管理小弟(worker)干活,本来是老大把活分给小弟就可以了,但是老大一直在了解小弟的情况,自己很忙小弟很闲。
    4)问题跟踪:查看OrcInputFormat.java发现Spark读取ORC文件有三种策略,默认采用HYBRID策略(HiveConf.java有相关配置信息):Spark Driver启动的时候,会去nameNode读取元数据,根据文件总大小和文件个数计算一个文件的平均大小,如果这个平均值大于默认256M的时候就会触发ETL策略。ETL策略就会去DataNode上读取orc文件的head等信息,如果stripe个数多或元数据信息太大就会导致Driver 产生FUll GC,这个时候就会表现为Driver启动到Task执行间隔时间太久的现象。
    5)解决方案:控制文件大小为256M左右,改变文件切分策略为BI,控制stripe大小。

    // 创建一个支持Hive的SparkSession
    val sparkSession = SparkSession
    .builder()
    .appName("PvMvToBase")
    // 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。
    // 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
    .config("hive.exec.orc.default.stripe.size", 268435456L)
    // 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),
    // 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。
    .config("hive.exec.orc.split.strategy", "BI")
    .enableHiveSupport()
    .getOrCreate()

    调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
    问题根源Driver压力太大,Worker启动了也只能闲等Driver忙完了,进行分配调度。
    效果:提升了任务稳定性。

    7 使用Kryo序列器(真正上线未用,发现有时候表现好有时候表现不好,存在不稳定性)

    使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

    // 创建SparkConf对象。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    // 设置序列化器为KryoSerializer。
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

    算法优化

    优化方案
    核心理念:归因过程数据膨胀,如何以节省内存的方式去归因,是算法优化的关键。
    核心思想:数学分而治之思想+索引技术灵活运用。
    算法描述:不变的字段单独维护且只维护一份,供查询使用(如Array1),任务新增的字段单独封装维护在(Array2),轻量级的Array2参与任务的计算过程,任务完成,通过Array1和Array2之间的索引把数据打通落盘。

    展开全文
  • 本节主要从内存调优、高性能算子、数据结构优化、广播大变量和小表调优、动态并行度调优、Spark文件切分策略调优来介绍Spark处理大规模数据的一些优化实践。 1 内存调优 由于任务数据量大且会发生数据膨胀,如果...

    Spark任务优化

    本节主要从内存调优、高性能算子、数据结构优化、广播大变量和小表调优、动态并行度调优、Spark文件切分策略调优来介绍Spark处理大规模数据的一些优化实践。

    1 内存调优

    由于任务数据量大且会发生数据膨胀,如果内存参数设置不合理,任务容易出现OOM,分析Spark1.6.2内存管理模型如下图所示,知道Spark如何管理自己的内存我们才能进行更好的调优。

    内存调优详见:Spark统一内存管理:UnifiedMemoryManager

    任务任务内存参数配置:

    
     
    1. spark.driver.memory=6g(存在广播所以Driver设置的较大)

    2. spark.executor.memory=13G

    3. spark.memory.fraction=0.4

    内存参数配置计算公式:
    Execution Memory 2.5G =(Heap size(13G)- Reserve Memory(450M))* spark.memory.fraction 0.4 * spark.memory.storageFraction 0.5
    用户主导的空间:User Memory 7.5G =(Heap size(13G)- Reserve Memory(450M))* (1 - spark.memory.fraction 0.4)
    安全因子:0.9,考虑到内存空间使用和预估的准确度,实际应用过程中会考虑加入一个安全因子。
    可用用户主导空间:User Memory * 0.9 = 6.8G(根据实际情况,数据条数 * 每条数据任务后占用内存最大值,基于此评估一个最大值,如果超过这个值就会出现OOM)
    效果:解决程序OOM问题,因为任务过程维护了大的数据结构,其主要使用了User Memory的空间,用Spark默认内存配置会导致用户空间OOM。

    2 高性能算子

    任务是同一个用户的行为数据,分布式处理需要把一个用户的数据抓取到一个节点上处理,有Shuffle操作,如下图所示同源数据采用groupByKey时Shuffle Write数据量3.5T,aggregateByKey时Shuffle Write数据量3T,相比节省时间2~3min。
    分析数据分布的特征,同一个设备的数据一般在一个文件出现的概率较大,将groupByKey算子改成 aggregateByKey,首先进行了一个Map端的聚合,减少了网络传输的数据量。

    
     
    1. 模拟代码:

    2. val initialSeq = mutable.Seq.empty[Row]

    3. val addToSeq = (s: mutable.Seq[Row], v: Row) => s :+ v // Map端本地聚合

    4. val mergePartitionSeqs = (p1: mutable.Seq[Row], p2: mutable.Seq[Row]) => p1 ++ p2 // Reduce端聚合

    5. kv.aggregateByKey(initialSeq)(addToSeq, mergePartitionSeqs)

    效果:减少网络传输数据量,时效性提升了2~5min,降低网络异常导致任务失败的风险。

    3 数据结构优化

    任务代码中,采用更加节省内存的数据结构,例如聚合的key、最短路径中的索引(如下模拟代码所示)等多处采用字符串拼接实现,避免自定义对象封装数据,尽可能使用轻量的Array而不是HashMap等
    效果:节省内存。

    4 广播大变量和小表调优

    任务任务为什么需要广播?我们先看一下广播的原理,如下图Executor端用到了Driver的List,如果广播List则每个Executor中只有一份Driver端的变量副本。如果不广播List,Executor有多少task就有多少Driver端的变量副本。如果对小表广播能实现本地Join,避免sortMergeJoin(如果使用SparkSQL发现不广播可以加上这个参数:spark.sql.statistics.fallBackToHdfs=true)。

    任务过程需要关联一些小的维表或定义一些大的变量,并存在大量task,所以需要广播。
    效果(1)降低网络传输的数据量;(2)降低内存的使用;(3)加快程序的运行速度。
    通过如上四种Spark任务优化,使任务运行更加稳定,同时也节省了内存。

    5 动态并行度调优

    数据量节假日数据量明显增大,是正常值的1~2倍,为了保障数据稳定生产,任务链条包括“数据清洗任务”和“任务任务”,“数据清洗任务”主要是从百亿条数据清洗出任务需要的50亿~100亿数据,并且把上游上万个大小不同的文件合并成固定个数和大小,这个任务产出的文件个数和大小对“任务任务”是有影响的,如何确定这个文件大小和个数?
    在数据量相同且资源配置相同条件下,要保证任务在1h内完成,测试单Task处理不同文件大小或不同数据条数的执行情况:
    (1)处理文件大小200M~256M左右,每个Task处理条数大概60万左右,Task平均执行时长8~10min;如图:每个Task处理文件大小为247M,其中75%的Task执行时长为10min,而且max值为17min,就算max失败,Task失败重新执行也不会影响到任务整体结束时间。


    (2)处理文件大小256~285M左右,每个Task处理条数大概70万左右,Task平均执行时长12min左右;如图:每个Task处理文件大小为271M,其中75%的Task执行时长为12min,而且max值20min,Task失败重新执行对任务整体时效性有影响。

    (3)处理文件大小300M~350M时,每个Task处理数据条数80万左右,Task平均执行时长17min左右。如图:每个Task处理文件大小313M,其中75%的Task执行时长为17min,而且max值太大,这样就拖慢了整个运行过程。

    通过大量测试发现,Task平均执行时长8~10min,每个Task处理条数大概60万左右时任务任务遇到失败Task、慢节点、某台机器故障等在开启慢任务推测情况下表现较优。

    
     
    1. 慢任务推测参数配置:

    2. spark.speculation=true

    3. spark.speculation.interval=60s

    4. spark.speculation.multiplier=1.3

    5. spark.speculation.quantile=0.99

    基于如上测试,对“数据清洗任务”产出文件数量进行动态调整,让文件大小尽量在200M~256M左右,文件数量和大小,可以根据历史数据条数、文件个数、每个文件大小,考虑节假日等情况去预估,当然也可以采用机器学习等算法去预测。比如简单计算:本周日文件个数 = 上周日数据总条数/60万 ,因为数据清洗任务后数据量大概是50亿~100亿条,所以文件个数阈值为[7000,16000]
    依据每个Core处理2~3个Task,每个Task处理60万条数据文件大小为200~256M表现较好,开启了Executor动态资源分配功能如下:

    
     
    1. spark.dynamicAllocation.minExecutors=1000

    2. spark.dynamicAllocation.maxExecutors=1600

    参数:spark.default.parallelism是控制Shuffle并行度的,从而会影响Spark Task个数,间接影响文件产出个数。
    “数据清洗任务”是一个离线按天执行的任务,通过动态调整spark.default.parallelism的值保证产出文件个数和大小。
    “核心任务”是一个离线按天执行的任务,通过动态调整spark.default.parallelism的值,进一步保证任务过程每个Task处理的文件维持在256M左右,数据条数维持在60万左右。
    效果:避免了节假日任务执行超时/任务失败,保证生产耗时的相对平稳。
    通过如上参数调整,提高并缩短了生产耗时稳定性,主要是扩大Executor个数(资源才是王道),生产耗时缩短到了31min左右,如下图所示

    6 Spark文件切分策略调优

    ORC文件切分详见:spark 读取ORC文件时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案

    任务任务上游的“数据清洗任务”,会清洗出任务需要的有效数据,并且对上游上万个小文件进行合并压缩成ORC文件,其文件大小在256M左右。

    1)任务任务遇到问题:作业提交后ApplicationMaster(Driver)启动了,Spark任务长时间占用资源,SparkUI看不到DAG图、Stage、Partition和Task相关的信息。
    2)问题分析:Driver启动,但是Executor没干活,说明问题出在了Driver,Driver干什么呢?定位到Driver在计算Partition,发生了Full GC,于是问题定位到了Spark读取文件的方法OrcInputFormat.java。
    3)通俗描述:老大(Driver)管理小弟(worker)干活,本来是老大把活分给小弟就可以了,但是老大一直在了解小弟的情况,自己很忙小弟很闲。
    4)问题跟踪:查看OrcInputFormat.java发现Spark读取ORC文件有三种策略,默认采用HYBRID策略(HiveConf.java有相关配置信息):Spark Driver启动的时候,会去nameNode读取元数据,根据文件总大小和文件个数计算一个文件的平均大小,如果这个平均值大于默认256M的时候就会触发ETL策略。ETL策略就会去DataNode上读取orc文件的head等信息,如果stripe个数多或元数据信息太大就会导致Driver 产生FUll GC,这个时候就会表现为Driver启动到Task执行间隔时间太久的现象。
    5)解决方案:控制文件大小为256M左右,改变文件切分策略为BI,控制stripe大小。

    
     
    1. // 创建一个支持Hive的SparkSession

    2. val sparkSession = SparkSession

    3. .builder()

    4. .appName("PvMvToBase")

    5. // 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。

    6. // 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。

    7. .config("hive.exec.orc.default.stripe.size", 268435456L)

    8. // 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),

    9. // 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。

    10. .config("hive.exec.orc.split.strategy", "BI")

    11. .enableHiveSupport()

    12. .getOrCreate()

    调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
    问题根源Driver压力太大,Worker启动了也只能闲等Driver忙完了,进行分配调度。
    效果:提升了任务稳定性。

    7 使用Kryo序列器(真正上线未用,发现有时候表现好有时候表现不好,存在不稳定性)

    使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

    
     
    1. // 创建SparkConf对象。

    2. val conf = new SparkConf().setMaster(...).setAppName(...)

    3. // 设置序列化器为KryoSerializer。

    4. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    5. // 注册要序列化的自定义类型。

    6. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

    算法优化

    优化方案
    核心理念:归因过程数据膨胀,如何以节省内存的方式去归因,是算法优化的关键。
    核心思想:数学分而治之思想+索引技术灵活运用。
    算法描述:不变的字段单独维护且只维护一份,供查询使用(如Array1),任务新增的字段单独封装维护在(Array2),轻量级的Array2参与任务的计算过程,任务完成,通过Array1和Array2之间的索引把数据打通落盘。

    展开全文
  • spark数据清洗案例

    2020-06-29 15:00:36
    spark数据清洗案例 本文的目的是模拟公司的实际数据清洗 数据来源是我自己用java代码模拟的,模拟代码及数据文件会上传至scdn,可以自由下载 本文的数据格式如下(en事件名称有四种类型) 将本地文件上传至...

    spark数据清洗案例

    本文的目的是模拟公司的实际数据清洗
    数据来源是我自己用java代码模拟的,模拟代码及数据文件会上传至csdn,可以自由下载
    本文的数据格式如下(en事件名称有七种类型)

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    将本地文件上传至sparkRDD

    scala> val optionRDD=sc.textFile("/opt/tmp/logs/op.log")
    optionRDD: org.apache.spark.rdd.RDD[String] = /opt/tmp/logs/op.log MapPartitionsRDD[1] at textFile at <console>:24
    

    计数

    scala> optionRDD.count
    res0: Long = 1000
    

    创建头

    scala> val schemaString = "time content"
    schemaString: String = time content
    
    //切割成两个字段
    scala> val fields = schemaString.split(" ")
    fields: Array[String] = Array(time, content)
    
    scala> fields.foreach(println)
    time
    content
    
    

    导包

    scala> import org.apache.spark.sql.types._
    

    将time content 通过map改变结构变成数组

    scala>  val fields = schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,nullable=true))
    fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(time,StringType,true), StructField(content,StringType,true))
    
    
    scala> fields.foreach(println)
    StructField(time,StringType,true)
    StructField(content,StringType,true)`
    

    将time content结构化

    scala> val schema = StructType(fields)
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(time,StringType,true), StructField(content,StringType,true))
    

    分析从本地上传的文件

    scala> val rowRDD = optionRDD.map(_.split("\\|"))
    rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:31
    
    scala> rowRDD.foreach(x=>println(x(0)))
    

    导包

    scala> import org.apache.spark.sql._
    import org.apache.spark.sql._
    

    将文件内容分成两部分,以数组的形式

    scala> val rowRDD = optionRDD.map(_.split("\\|")).map(attributes=>Row(attributes(0).trim(),attributes(1).trim()))
    rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[5] at map at <console>:34
    

    创建DF

    scala> val opDF = spark.createDataFrame(rowRDD,schema)
    opDF: org.apache.spark.sql.DataFrame = [time: string, content: string]
    
    scala> opDF.printSchema
    root
     |-- time: string (nullable = true)
     |-- content: string (nullable = true)
    
    

    scala> opDF.show(1,false)

    在这里插入图片描述

    获取第一层content

    scala> val opDF1 = opDF.select($"time",get_json_object($"content","$.cm").alias("cm"),get_json_object($"content","$.ap").alias("ap"),get_json_object($"content","$.et").alias("et"))
    opDF1: org.apache.spark.sql.DataFrame = [time: string, cm: string ... 2 more fields]
    
    scala> opDF1.printSchema
    root
     |-- time: string (nullable = true)
     |-- cm: string (nullable = true)
     |-- ap: string (nullable = true)
     |-- et: string (nullable = true)
    
    scala> opDF1.show(1,false)
    |time|cm|ap |et 
    |1593136280643|{"ln":"-87.7","sv":"V2.1.8","os":"8.0.3","g":"W51WU5I5@gmail.com","mid":"0","nw":"3G","l":"pt","vc":"16","hw":"640*1136","ar":"MX","uid":"0","t":"1593072938707","la":"29.6","md":"Huawei-8","vn":"1.2.3","ba":"Huawei","sr":"I"}|app|[{"ett":"1593051532834","en":"display","kv":{"goodsid":"0","action":"1","extend1":"1","place":"1","category":"71"}}]
    

    获取第二层

    scala> val opDF2 = opDF1.select($"time",$"ap",get_json_object($"cm","$.ln").alias("ln"),get_json_object($"cm","$.sv").alias("sv"),get_json_object($"cm","$.os").alias("os"),get_json_object($"cm","$.g").alias("g"),get_json_object($"cm","$.mid").alias("mid"),get_json_object($"cm","$.nw").alias("nw"),get_json_object($"cm","$.l").alias("l"),get_json_object($"cm","$.vc").alias("vc"),get_json_object($"cm","$.hw").alias("hw"),get_json_object($"cm","$.ar").alias("ar"),get_json_object($"cm","$.uid").alias("uid"),get_json_object($"cm","$.t").alias("t"),get_json_object($"cm","$.la").alias("la"),get_json_object($"cm","$.md").alias("md"),get_json_object($"cm","$.vn").alias("vn"),get_json_object($"cm","$.ba").alias("ba"),get_json_object($"cm","$.sr").alias("sr"),from_json($"et",ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil))).as("events"))
    opDF2: org.apache.spark.sql.DataFrame = [time: string, ap: string ... 18 more fields]
    
    scala> opDF2.printSchema
    root
     |-- time: string (nullable = true)
     |-- ap: string (nullable = true)
     |-- ln: string (nullable = true)
     |-- sv: string (nullable = true)
     |-- os: string (nullable = true)
     |-- g: string (nullable = true)
     |-- mid: string (nullable = true)
     |-- nw: string (nullable = true)
     |-- l: string (nullable = true)
     |-- vc: string (nullable = true)
     |-- hw: string (nullable = true)
     |-- ar: string (nullable = true)
     |-- uid: string (nullable = true)
     |-- t: string (nullable = true)
     |-- la: string (nullable = true)
     |-- md: string (nullable = true)
     |-- vn: string (nullable = true)
     |-- ba: string (nullable = true)
     |-- sr: string (nullable = true)
     |-- events: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- ett: string (nullable = true)
     |    |    |-- en: string (nullable = true)
     |    |    |-- kv: string (nullable = true)
    

    将event进行explode

    scala> val opDF3 = opDF2.select($"time",$"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",explode($"events").as("eventcontent"))
    opDF3: org.apache.spark.sql.DataFrame = [time: string, ap: string ... 18 more fields]
    
    
    scala> opDF3.printSchema
    root
     |-- time: string (nullable = true)
     |-- ap: string (nullable = true)
     |-- ln: string (nullable = true)
     |-- sv: string (nullable = true)
     |-- os: string (nullable = true)
     |-- g: string (nullable = true)
     |-- mid: string (nullable = true)
     |-- nw: string (nullable = true)
     |-- l: string (nullable = true)
     |-- vc: string (nullable = true)
     |-- hw: string (nullable = true)
     |-- ar: string (nullable = true)
     |-- uid: string (nullable = true)
     |-- t: string (nullable = true)
     |-- la: string (nullable = true)
     |-- md: string (nullable = true)
     |-- vn: string (nullable = true)
     |-- ba: string (nullable = true)
     |-- sr: string (nullable = true)
     |-- eventcontent: struct (nullable = true)
     |    |-- ett: string (nullable = true)
     |    |-- en: string (nullable = true)
     |    |-- kv: string (nullable = true)
    
    scala> opDF3.select($"time",$"sr",$"eventcontent").where("time=1593137251934").show(10,false)
    +-------------+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |time         |sr |eventcontent                                                                                                                                                                                                                                                                                                                                       |
    +-------------+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |1593137251934|R  |[1593041825601, display, {"goodsid":"0","action":"1","extend1":"2","place":"4","category":"28"}]                                                                                                                                                                                                                                                   |
    |1593137251934|R  |[1593074010012, notification, {"ap_time":"1593039359797","action":"4","type":"1","content":""}]                                                                                                                                                                                                                                                    |
    |1593137251934|R  |[1593110124137, error, {"errorDetail":"at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n at java.lang.reflect.Method.invoke(Method.java:606)\\n","errorBrief":"at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"}]|
    |1593137251934|R  |[1593042153320, favorites, {"course_id":2,"id":0,"add_time":"1593047184972","userid":8}]                                                                                                                                                                                                                                                           |
    |1593137251934|R  |[1593068864495, praise, {"target_id":7,"id":3,"type":3,"add_time":"1593101957239","userid":3}] 
    

    获取第三层,即event

    scala> val opDF4 = opDF3.select($"time",$"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",$"eventcontent.ett",$"eventcontent.en",$"eventcontent.kv")
    opDF4: org.apache.spark.sql.DataFrame = [time: string, ap: string ... 20 more fields]
    
    
    scala> opDF4.printSchema
    root
     |-- time: string (nullable = true)
     |-- ap: string (nullable = true)
     |-- ln: string (nullable = true)
     |-- sv: string (nullable = true)
     |-- os: string (nullable = true)
     |-- g: string (nullable = true)
     |-- mid: string (nullable = true)
     |-- nw: string (nullable = true)
     |-- l: string (nullable = true)
     |-- vc: string (nullable = true)
     |-- hw: string (nullable = true)
     |-- ar: string (nullable = true)
     |-- uid: string (nullable = true)
     |-- t: string (nullable = true)
     |-- la: string (nullable = true)
     |-- md: string (nullable = true)
     |-- vn: string (nullable = true)
     |-- ba: string (nullable = true)
     |-- sr: string (nullable = true)
     |-- ett: string (nullable = true)
     |-- en: string (nullable = true)
     |-- kv: string (nullable = true)
    
    
    scala> opDF4.select($"time",$"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",$"ett",$"en").where("en='display'").show(10,false)
    +-------------+---+------+------+-----+------------------+---+----+---+---+---------+---+---+-------------+-----+----------+-----+-------+---+-------------+-------+
    |time         |ap |ln    |sv    |os   |g                 |mid|nw  |l  |vc |hw       |ar |uid|t            |la   |md        |vn   |ba     |sr |ett          |en     |
    +-------------+---+------+------+-----+------------------+---+----+---+---+---------+---+---+-------------+-----+----------+-----+-------+---+-------------+-------+
    |1593137251934|app|-48.8 |V2.1.2|8.1.7|9T5WG2IF@gmail.com|0  |WIFI|en |4  |750*1134 |MX |0  |1593054455815|-13.8|Huawei-0  |1.0.5|Huawei |R  |1593041825601|display|
    |1593137251952|app|-94.9 |V2.7.0|8.2.7|2S3H1L7K@gmail.com|4  |4G  |en |6  |750*1134 |MX |4  |1593119037940|-46.8|sumsung-1 |1.0.6|Sumsung|I  |1593058474741|display|
    |1593137251954|app|-69.0 |V2.4.8|8.0.3|85B866BI@gmail.com|8  |3G  |es |2  |640*960  |MX |8  |1593083277982|22.4 |sumsung-13|1.0.3|Sumsung|S  |1593053303645|display|
    |1593137251958|app|-97.9 |V2.3.0|8.0.6|59V555IC@gmail.com|14 |3G  |es |5  |640*1136 |MX |14 |1593105762133|-45.6|Huawei-16 |1.1.5|Huawei |L  |1593098492648|display|
    |1593137251962|app|-50.7 |V2.6.6|8.1.9|7023Q381@gmail.com|15 |WIFI|es |16 |1080*1920|MX |15 |1593096659201|-47.8|sumsung-11|1.2.7|Sumsung|G  |1593041348870|display|
    |1593137251963|app|-77.1 |V2.9.8|8.2.1|W45Y7IZ9@gmail.com|17 |4G  |pt |8  |1080*1920|MX |17 |1593127628700|2.8  |sumsung-10|1.3.0|Sumsung|K  |1593070583966|display|
    |1593137251964|app|-53.1 |V2.1.3|8.2.3|44768OU6@gmail.com|18 |3G  |en |9  |640*1136 |MX |18 |1593114312736|-11.1|HTC-1     |1.3.9|HTC    |Z  |1593110468000|display|
    |1593137251966|app|-88.1 |V2.2.6|8.1.5|A4QF5V3G@gmail.com|27 |WIFI|pt |1  |750*1134 |MX |27 |1593049555889|18.8 |sumsung-8 |1.0.6|Sumsung|N  |1593083023112|display|
    |1593137251967|app|-104.9|V2.7.1|8.2.2|1L1PF21L@gmail.com|28 |WIFI|pt |13 |750*1134 |MX |28 |1593123010449|9.8  |Huawei-13 |1.3.4|Huawei |F  |1593072640137|display|
    |1593137251968|app|-105.1|V2.7.1|8.0.4|P5OB863W@gmail.com|29 |4G  |es |13 |640*960  |MX |29 |1593132148433|-20.0|Huawei-9  |1.0.9|Huawei |C  |1593052346863|display|
    +-------------+---+------+------+-----+------------------+---+----+---+---+---------+---+---+-------------+-----+----------+-----+-------+---+-------------+-------+
    
    

    获取最后一层et.en不同值时et.kv
    由于每个kv的值不一样,所以根据kv.en的值不同进行分类
    由最开始的数据结构图可知,有七种类型,在此给出两种案列

    当en=display时,DF如下:

    scala> val opDisplayDF =  opDF4.select($"time",$"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",$"ett",$"en",get_json_object($"kv","$.goodsid").alias("goodsid"),get_json_object($"kv","$.action").alias("action"),get_json_object($"kv","$.extend1").alias("extend1"),get_json_object($"kv","$.place").alias("place"),get_json_object($"kv","$.category").alias("category")).where("en='display'")
    opDisplayDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [time: string, ap: string ... 24 more fields]
    
    
    scala> opDisplayDF.printSchema
    root
     |-- time: string (nullable = true)
     |-- ap: string (nullable = true)
     |-- ln: string (nullable = true)
     |-- sv: string (nullable = true)
     |-- os: string (nullable = true)
     |-- g: string (nullable = true)
     |-- mid: string (nullable = true)
     |-- nw: string (nullable = true)
     |-- l: string (nullable = true)
     |-- vc: string (nullable = true)
     |-- hw: string (nullable = true)
     |-- ar: string (nullable = true)
     |-- uid: string (nullable = true)
     |-- t: string (nullable = true)
     |-- la: string (nullable = true)
     |-- md: string (nullable = true)
     |-- vn: string (nullable = true)
     |-- ba: string (nullable = true)
     |-- sr: string (nullable = true)
     |-- ett: string (nullable = true)
     |-- en: string (nullable = true)
     |-- goodsid: string (nullable = true)
     |-- action: string (nullable = true)
     |-- extend1: string (nullable = true)
     |-- place: string (nullable = true)
     |-- category: string (nullable = true)
    
    
    scala> opDisplayDF.show(2,false)
    2020-06-29 22:55:00 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
    +-------------+---+-----+------+-----+------------------+---+----+---+---+--------+---+---+-------------+-----+---------+-----+-------+---+-------------+-------+-------+------+-------+-----+--------+
    |time         |ap |ln   |sv    |os   |g                 |mid|nw  |l  |vc |hw      |ar |uid|t            |la   |md       |vn   |ba     |sr |ett          |en     |goodsid|action|extend1|place|category|
    +-------------+---+-----+------+-----+------------------+---+----+---+---+--------+---+---+-------------+-----+---------+-----+-------+---+-------------+-------+-------+------+-------+-----+--------+
    |1593137251934|app|-48.8|V2.1.2|8.1.7|9T5WG2IF@gmail.com|0  |WIFI|en |4  |750*1134|MX |0  |1593054455815|-13.8|Huawei-0 |1.0.5|Huawei |R  |1593041825601|display|0      |1     |2      |4    |28      |
    |1593137251952|app|-94.9|V2.7.0|8.2.7|2S3H1L7K@gmail.com|4  |4G  |en |6  |750*1134|MX |4  |1593119037940|-46.8|sumsung-1|1.0.6|Sumsung|I  |1593058474741|display|1      |2     |1      |1    |44      |
    +-------------+---+-----+------+-----+------------------+---+----+---+---+--------+---+---+-------------+-----+---------+-----+-------+---+-------------+-------+-------+------+-------+-----+--------+
    
    

    当en=active_background时,DF如下:

    scala> val opActionBackgroundDF =  opDF4.select($"time",$"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",$"ett",$"en", get_json_object($"kv","$.active_source").alias("activeSource")).where("en='active_background'")
    opActionBackgroundDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [time: string, ap: string ... 20 more fields]
    
    scala> opActionBackgroundDF.printSchema
    root
     |-- time: string (nullable = true)
     |-- ap: string (nullable = true)
     |-- ln: string (nullable = true)
     |-- sv: string (nullable = true)
     |-- os: string (nullable = true)
     |-- g: string (nullable = true)
     |-- mid: string (nullable = true)
     |-- nw: string (nullable = true)
     |-- l: string (nullable = true)
     |-- vc: string (nullable = true)
     |-- hw: string (nullable = true)
     |-- ar: string (nullable = true)
     |-- uid: string (nullable = true)
     |-- t: string (nullable = true)
     |-- la: string (nullable = true)
     |-- md: string (nullable = true)
     |-- vn: string (nullable = true)
     |-- ba: string (nullable = true)
     |-- sr: string (nullable = true)
     |-- ett: string (nullable = true)
     |-- en: string (nullable = true)
     |-- activeSource: string (nullable = true)
    
    
    scala> opActionBackgroundDF.show(1,false)
    +-------------+---+-----+------+-----+------------------+---+----+---+---+---------+---+---+-------------+-----+--------+-----+------+---+-------------+-----------------+------------+
    |time         |ap |ln   |sv    |os   |g                 |mid|nw  |l  |vc |hw       |ar |uid|t            |la   |md      |vn   |ba    |sr |ett          |en               |activeSource|
    +-------------+---+-----+------+-----+------------------+---+----+---+---+---------+---+---+-------------+-----+--------+-----+------+---+-------------+-----------------+------------+
    |1593137251948|app|-37.9|V2.7.1|8.0.0|4QALOI53@gmail.com|3  |WIFI|en |18 |1080*1920|MX |3  |1593084695545|-36.1|Huawei-2|1.1.0|Huawei|P  |1593084261117|active_background|2           |
    +-------------+---+-----+------+-----+------------------+---+----+---+---+---------+---+---+-------------+-----+--------+-----+------+---+-------------+-----------------+------------+
    
    
    展开全文
  • Spark优雅的操作Redis

    2020-03-18 15:14:45
    Spark优雅的操作Redis 贪恋清晨de阳光关注 0.442017.03.29 11:33:49字数 1,066阅读 24,687 Spark的优势在于内存计算,然而在计算中难免会用到一些元数据或中间数据,有的存在关系型数据库中,有的存在HDFS上,有...
  • Bloom Filter是一种空间效率很高的随机数据结构,它的原理是,当一个元素被加入集合时,通过K个Hash函数将这个元素映射成一个位阵列(Bit array)中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就...
  • Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL...
  • spark的分区设计

    2020-02-02 11:43:05
    spark 的RDD中对应数据是分区的: 对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。 一个task 处理一个分区 用户可以在创建RDD时指定RDD的分片个数, 如果没有指定,那么就会采用默认值。...
  • Spark SQL的DataFrame接口支持多种数据源的操作。可以使用关系转换进行操作,也可以被注册为临时视图。将DataFrame注册为临时视图,即可以通过SQL进行数据查询。 Spark SQL的默认数据源格式为Parquet文件格式,修改...
  • Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
  • 数据的存取中输出格式是很重要的,这利于下游处理程序 有时数据量达到本地无法存放时,就需要考虑别的存取方法 spark支持很多种输入源,因为spark是基于hadoop生态构建的,支持InputFormat和OutputFormat接口...
  • Spark 第一章 是什么一 介绍简介特点二 Spark与MapReduce的区别三 Spark运行模式四 Spark CoreSpark RDDRDD LineageSpark任务执行原理Spark代码流程Spark 中的算子Transformations转换算子Action行动算子控制算子 ...
  • 一、RDD.fold和Scala.fold使用之间的差别1.Scala中fold的使用val t1=Array(("C++", (1,"1")), ("Java", (2,"2")),("Java", (2,"...val rs
  • RDD是Spark中的抽象数据结构类型,任何数据Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,...
  • 一、Spark介绍 组成 Spark组成(BDAS):全称伯克利数据分析栈,通过大规模集成算法、机器、人之间展现大数据应用的一个平台。也是处理大数据、云计算、通信的技术解决方案。 它的主要组件有: SparkCore 将分布式...
  • Spark基础以及WordCount实现
  • Spark知识点总结

    2020-02-14 09:16:52
    1.什么是sparkspark是基于内存计算的通用大数据并行计算框架,是一个快速、通用可扩展的大数据分析引擎。...SparkSQL:提供了类sql方式操作结构化半结构数据。对历史数据进行交互式查询。(即...
  • 1. 前序关于Executor如何运行算子,请参考前面博文:大数据:Spark Core(四)用LogQuery的例子来说明Executor是如何运算RDD的算子,当Executor进行reduce运算的时候,生成运算结果的临时Shuffle,并保存在磁盘中,...
  • spark读写hbase package com.huawei.bigdata.spark.examples import java.io.{File, IOException} import java.util import com.esotericsoftware.kryo.Kryo import org.apache.hadoop.conf.Configuration ...
  • 通过给map函数传入匿名函数操作RDD filter数字运算:过滤 数值运算 字符运算 distinct运算:除去重复元素 randdomSplit运算:将整个集合元素以随机数的方式按照比列分为多个RDD groupBy运算:可以...
1 2 3 4 5 ... 20
收藏数 2,677
精华内容 1,070
关键字:

kv结构数据 spark 操作