精华内容
下载资源
问答
  • 一、转换算子转换算子: Transformation,懒执行,需要Action触发执行filter过滤 RDD[T]==>RDD[T],窄依赖 mapRDD[T] ->RDD[O], 窄依赖 flatMapRDD[T]–>RDD[[O]],一对多 ,窄依赖, mapToPairsample抽样...

    一、转换算子转换算子:

    Transformation,懒执行,需要Action触发执行
    filter
    过滤 RDD[T]==>RDD[T],窄依赖

    map
    RDD[T] ->RDD[O], 窄依赖

    flatMap
    RDD[T]–>RDD[[O]],一对多 ,窄依赖,

    mapToPair
    sample
    抽样算子 RDD[T]–>RDD[O],窄依赖

    sortBy
    RDD[T]–>RDD[T], 根据你指定的内容排序 宽依赖

    sortByKey
    根据你的K排序,要求RDD中必须是KV的,宽依赖

    reduceByKey
    根据RDD的K分组之后聚合(累加,字符串连接) , 宽依赖

    join
    把两个RDD根据K相同合并,结果RDD[K,(V1,V2)] ,宽依赖

    leftOuterJoin
    左连接 和下面的一致 都是宽依赖

    rightOuterJoin
    fullOuterJoin
    union
    把两个RDD直接聚合成一个RDD,数据不合并 ,窄依赖

    intersection
    取两个RDD的交集,窄依赖

    subtract
    mapPartitions
    把整个分区的数据作为一个迭代器一次计算 数据量不是特别大 会有性能提升,窄依赖

    distinct
    去重算子 本质是map + reduceByKey+map 宽依赖

    cogroup
    (K,V) (K,W)=>(K,([V],[W])) RDD1相同的key value 放在[V]中 另一个RDD相同的key 的value 放在[W]中 宽依赖

    mapPartitionsWithIndex
    把整个分区的数据作为一个迭代器一次计算 多了一个分区的index 数据量不是特别大 会有性能提升,窄依赖

    repartition
    可以增多分区,可以减少分区,有shuffle 宽依赖
    repartition = coalesce(num,shuffle=true)

    coalesce
    可以增多分区,也可以减少分区,默认没有shuffle 有shuffle就 宽依赖 没shuffle 就是窄依赖
    若RDD由少的分区分到多的分区时,不让产生shuffle, 不起作用
    少 - > 多 false RDD1 a、b分区 RDD2 0:a 1:b 2: 窄依赖 。 true RDD1 a、b 分区 RDD2 0:a1 b1 / 1: a2 b2 / 3: a3 b3 宽依赖
    多 - > 少 false RDD1 a、b、c 分区 RDD2: 0: a 、 1: b c 窄依赖 。 true RDD1 a、b、c 分区 RDD2 0: a1、b1、c1 1:a2、b2、c2 宽依赖

    zip
    两个RDD压成一个RDD 窄依赖

    zipWithIndex
    groupByKey
    (K,V)–>(K,iter),根据K相同分组,分组之后把一组的V封装成一个迭代器, 宽依赖

    二、行动算子Action,

    触发transformation类算子执行,一个application中有一个action算子就有一个job
    Action算子
    foreach
    循环出值

    count
    结果会拿到Driver端

    collect
    将结果拿回Driver端 返回一个列表

    first
    取出第一个值

    take(num)
    取出num个值 返回driver端

    foreachPartition
    reduce
    聚合数据 不过 这个是在driver端 不合适聚合大量的数据 适合聚合结果数据

    countByKey
    countByValue

    三、持久化算子

    cache
    默认将数据存储在内存中
    cache() = persist() = persist(StorageLevel.MEMORY_ONLY)

    persist
    可以手动指定持久化级别
    MEMORY_ONLY
    MEMORY_ONLY_SER
    MEMORY_AND_DISK
    MEMORY_AND_DISK_SER
    “_2” 是由副本
    尽量少使用DISK_ONLY级别
    checkpoint
    将数据直接持久化到指定的目录,当lineage 计算非常复杂,可以尝试使用checkpoint ,checkpoint还可以切断RDD的依赖关系
    特殊场景使用checkpoint,对RDD使用checkpoint要慎用
    checkpoint要指定目录,可以将数据持久化到指定的目录中,当application执行完成之后,这个目录中的数据不会被清除

    checkpoint的执行流程
    1.当sparkjob执行完成之后,Spark 会从后往前回溯,找到checkpointRDD做标记
    2.回溯完成之后,Spark框架会重新启动一个job,计算标记的RDD的数据,放入指定的checkpoint目录中
    3.数据计算完成,放入目录之后,会切断RDD的依赖关系,当SparkApplication执行完成之后,数据目录中的数据不会被清除
    优化:对哪个RDD进行checkpoint,最好先cache下,这样回溯完成后再计算这个CheckpointRDD数据的时候可以直接在内存中拿到放指定的目录中
    cache和persist的注意
    1.cache,persist,checkpoint 都是懒执行,最小持久化单位是partition
    2.cache和persist之后可以直接赋值给一个值,下次直接使用这个值,就是使用的持久化的数据
    3.如果采用第二种方式,后面不能紧跟action算子
    4.cache和persist的数据,当application执行完成之后会自动清除
    ————————————————
    版权声明:本文为CSDN博主「狂躁的辣条」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/weixin_42712704/article/details/89534037

    转载于:https://www.cnblogs.com/carsonwuu/p/11596815.html

    展开全文
  • 内容简介一、窄依赖与宽依赖剖析二、窄依赖与宽依赖的区别三、窄依赖算子与宽依赖算子四、总结 一、窄依赖与宽依赖剖析 在之前的文章中曾对RDD进行剖析,详情看Spark学习之路(三):剖析RDD的概念及用三种方式创建...

    一、窄依赖与宽依赖剖析

    • 在之前的文章中曾对RDD进行剖析,详情看Spark学习之路(三):剖析RDD的概念及用三种方式创建RDD,知道RDD与RDD之间是存在依赖关系(也叫血缘关系)的,每当RDD调用transform算子生成另一个RDD时,这两个RDD之间就存在依赖关系,事实上,还可以对两个RDD之间的依赖关系进行进一步划分,分为窄依赖关系和宽依赖关系。
    • 在剖析窄依赖与宽依之前必须了解一个事实,在RDD中存在许多分区,而RDD也有一份分区列表来记录这些分区,比如从HDFS中读取数据时,默认的分区划分就是根据HDFS中的数据块的个数划分,一个数据块即为一个分区。因此可以给出窄依赖的定义:一个RDD中的每一个分区的数据仅仅依赖于父RDD对应分区的数据,即仅仅由父RDD对应分区的数据变换而来,即分区与分区之间是一对一的关系,如下图所示:
      在这里插入图片描述
      宽依赖的定义为:一个RDD中的每一个分区的数据依赖于父RDD多个分区的数据,即父RDD多个分区的数据变换而来,即分区与分区之间是一对多的关系,如下图所示:
      在这里插入图片描述
      以上便是窄依赖和宽依赖的基本概念及原理,事实上,因为宽依赖而导致从被依赖RDD的多个分区拉取数据的过程称之为Shuffle,Shuffle是Spark中非常重要的概念,这也是DAG调度器将job划分Stage的唯一依据,且因为Shuffle往往伴随着大量跨分区的数据传输,其耗费的时间也是最长的,毫不夸张地说Spark程序最花费时间的环节就是Shuffle。现在通过对WordCount程序的窄依赖与宽依赖的划分,可以更清楚地知道WordCount程序的偏底层的数据流向:
      在这里插入图片描述

    二、窄依赖与宽依赖的区别

    • 宽依赖往往对应着Shuffle操作,需要在运行的过程中将同一个RDD分区传入到不同的RDD分区中,中间可能涉及到多个节点之间数据的传输,而窄依赖的每个父RDD分区通常只会传入到另一个子RDD分区,通常在一个节点内完成。
    • 当窄依赖的子RDD数据丢失时,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重新计算与子RDD分区对应的父RDD分区即可;当宽依赖的子RDD数据丢失时,一个分区的数据通常由自多个父RDD分区数据变换而来,极端情况下,所有父RDD的分区都有可能重新计算,因此计算量很大,需要注意容错,一般而言会在此处设置Checkpoint。

    三、窄依赖算子与宽依赖算子

    • Spark学习之路(四):深度图解Spark算子运作原理中曾剖析过Spark算子的概念及运算原理,其中对Spark算子分成了两类:transform算子和action算子,事实上还可以对transform算子进行更进一步的划分:窄依赖算子和宽依赖算子(Shuffle算子),常用的窄依赖算子如下:
      算子名称 算子功能
      map 对RDD中的每个元素都执行一个指定函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
      flatMap 与map类似,将原RDD中的每个元素通过函数f转换为新的元素,并将这些元素放入一个集合,构成新的RDD。
      filter 对RDD元素进行过滤,返回一个新的数据集,由经过给定的过滤条件函数后返回值为true的原元素组成。
      union 合并两个数据类型相同的 RDD ,并不进行去重操作,保存所有元素。
      distinct 将RDD进行去重操作,返回去重后的RDD。
      常用的宽依赖算子如下:
      算子名称 算子功能
      cartesian 返回两个RDD的笛卡尔积。
      groupBy 将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
      groupByKey 对RDD中的数据进行分组操作,在一个由(K, V)键值对组成的数据集上调用,返回一个(K, Seq[V])对的数据集。
      sortBy 对RDD中的每个元素按照给定的条件进行排序。
      sortByKey 对RDD中的每个元素按照Key进行排序。
      ReduceByKey 对RDD中的每个元素对按照Key进行聚合操作。
      join 对两个RDD中的每个元素对进行join操作,将两个RDD中具有相同的key的元素的value进行笛卡尔积,返回(key,(value1,value2))类型的值。

    四、总结

    • 由宽依赖而产生的Shuffle是很耗费时间的,从而如何对Shuffle进行调优是一个难点和重点,这也是一个优化程序的点,也可以通过由宽依赖算子生成的RDD进行Checkpoint操作从而避免数据丢失时产生大量的计算也是Spark调优点之一。感谢你的阅读,如有错误请不吝赐教!
    • 更多内容请看 萧邦主的技术博客导航
    展开全文
  • 1.案例概要 将product表和order表进行join,并找出订单金额最大的top3的产品. 结果 productName amount 小米7 550106959 小米6 275024061 小米5 54945718 2.数据结构与构造数据 2.1 数据结构 ...

    1.案例概要

       将product表和order表进行join,并找出订单金额最大的top3的产品.
       
       结果
       productName   amount 
        小米7       550106959
        小米6       275024061
        小米5       54945718
    

    2.数据结构与构造数据

    2.1 数据结构

     case class Order(id:Int,date:String,pid:Int,amount:Int)
    
    case class Product(id:Int,pName:String,categoryId:String,price:Int)
    

    2.2 Mock data

    1000w条order数据
        1,2020-08-29 17:03:37,1,5597
        2,2020-08-29 17:03:37,2,5472
        3,2020-08-29 17:03:37,3,5883
        4,2020-08-29 17:03:37,4,5757
        5,2020-08-29 17:03:37,5,5766
        6,2020-08-29 17:03:37,8,5895
        7,2020-08-29 17:03:37,8,5232
        ......
    
    10条product数据
        1,小米1,c1,10001
        2,小米2,c2,10002
        3,小米3,c3,10003
        4,小米4,c4,10004
        5,小米5,c5,10005
        6,小米6,c6,10006
        7,小米7,c7,10007
        8,小米8,c8,10008
        9,小米9,c9,10009
        10,小米10,c10,100010
    

    3.代码

      object JoinTest {
    
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setMaster("local[3]").setAppName(JoinTest.getClass.getSimpleName)
        val sc = new SparkContext(sparkConf)
    
        val orderPairRDD = sc.textFile("hdfsOperation/data/table/order.txt")
          .map(
            x => {
              val splits = x.split(",")
              (splits(2), Order(splits(0).toInt, splits(1), splits(2).toInt, splits(3).toInt))
            }
          )
    
        val productPairRDD = sc.textFile("hdfsOperation/data/table/product.txt")
          .map(
            x => {
              val splits = x.split(",")
              (splits(0), Product(splits(0).toInt, splits(1), splits(2), splits(3).toInt))
            }
          )
    
        //1.普通join   耗时:25s
        //产品的下单金额TOP3  PName,amount
    
        //commonJoin(orderPairRDD,productPairRDD)
    
        /*
        (小米7,550106959)
        (小米6,275024061)
        (小米5,54945718)
         普通join耗时:25s
    
         */
    
        //2. broadcastJoin   耗时:4s
        //观察 localhost:4040 页面
    
        //broadcastJoin(sc,orderPairRDD,productPairRDD)
    
        /*
          (小米7,550106959)
          (小米6,275024061)
          (小米5,54945718)
          mapJoin耗时:4s
         */
    
    
        //3.co-partitioned join   耗时:10s   
        /*
        利用key相同必然分区相同的这个原理,Spark将较大表的join分而治之,
        先将表划分成n个分区,再对两个表中相对应分区的数据分别进行Hash Join。
         */
    
        coPartitionedJoin(orderPairRDD,productPairRDD)
    
         /*
        
          (小米7,550106959)
          (小米6,275024061)
          (小米5,54945718)
          coPartitionedJoin耗时:10s
          */
    
    
        Thread.sleep(Int.MaxValue)
        sc.stop()
      }
    

    3.1 commonjoin 耗时:25s

          /**
        * 根据pid进行普通join
        * @param orderPairRDD
        * @param productPairRDD
        */
      def commonJoin( orderPairRDD: RDD[(String, Order)],productPairRDD: RDD[(String, Product)]): Unit ={
            val startTime=System.currentTimeMillis()/1000
            orderPairRDD
              .join(productPairRDD)
              .map(x=>{
                (x._2._2.pName,x._2._1.amount)
              })
              .reduceByKey(_+_)
              .sortBy(-_._2)
              .take(3)
              .foreach(println(_))
            println("普通join耗时:"+(System.currentTimeMillis()/1000-startTime)+"s")
    
      }
    

    3.1.1 commonjoin DAG 图

    在这里插入图片描述

    3.2 brocastJoin 耗时:4s

         /**
        * 小表广播Join
        * @param sc
        * @param orderPairRDD
        * @param productPairRDD
        */
      def broadcastJoin(sc:SparkContext,orderPairRDD: RDD[(String, Order)],productPairRDD: RDD[(String, Product)]): Unit ={
        val startTime=System.currentTimeMillis()/1000
        // Can not directly broadcast RDDs; instead, call collect() and broadcast the result.
        val product = productPairRDD.collectAsMap()
    
        val productBroadcast = sc.broadcast(product)
        orderPairRDD
          .mapPartitions(partition=>{
            val productMap = productBroadcast.value
            for( (k,v) <-partition if productMap.contains(k))
              yield (productMap.getOrElse(k,Product).asInstanceOf[Product].pName,v.amount)
          })
          .reduceByKey(_+_)
          .sortBy(-_._2)
          .take(3)
          .foreach(println)
          println("mapJoin耗时:"+(System.currentTimeMillis()/1000-startTime)+"s")
      }
    

    3.2.1 brocastJoin DAG图

    在这里插入图片描述

    3.3 co-partitioned Join 耗时:10s

        /**
        * co-partitioned Join
        * @param orderPairRDD
        * @param productPairRDD
        */
      def coPartitionedJoin(orderPairRDD: RDD[(String, Order)],productPairRDD: RDD[(String, Product)]): Unit ={
        val startTime=System.currentTimeMillis()/1000
        val orderDataPartitioner = orderPairRDD.partitioner match {
          case (Some(p)) => p
          case (None) => new HashPartitioner(orderPairRDD.partitions.length)
        }
        //在join之前尽可能减少数据
         val  bestOrderData = orderPairRDD.map(x=>(x._1,x._2.amount)).reduceByKey(orderDataPartitioner,_+_)
         bestOrderData.cache
         bestOrderData
           .join(productPairRDD)
           .map(x=>{
             (x._2._2.pName,x._2._1)
           })
           .reduceByKey(_+_)
           .sortBy(-_._2)
           .take(3)
           .foreach(println(_))
    
        bestOrderData.unpersist(true)
        println("coPartitionedJoin耗时:"+(System.currentTimeMillis()/1000-startTime)+"s")
      }
    

    3.3.1 co-partitioned JoinDAG图

    在这里插入图片描述

    展开全文
  • 接上篇文章第2章2.3.4:SparkCore之转换算子:RDD概述属性介绍,RDD特点,RDD编程模型,RDD的创建与转换, transformation转换算子Value类型与双value类型交互,key-value类型算子 文章目录 2.4 Action 2.4.1 reduce...

    接上篇文章第2章2.3.4:SparkCore之转换算子:RDD概述属性介绍,RDD特点,RDD编程模型,RDD的创建与转换, transformation转换算子Value类型与双value类型交互,key-value类型算子

    文章目录

    2.4 Action

    2.4.1 reduce(func)案例

    2.4.2 collect()案例

    2.4.3 count()案例

    2.4.4 first()案例

    2.4.5 take(n)案例

    2.4.6 takeOrdered(n)案例

    2.4.7 aggregate案例

    2.4.8 fold(num)(func)案例

    2.4.9 saveAsTextFile(path)

    2.4.10 saveAsSequenceFile(path) 

    2.4.11 saveAsObjectFile(path) 

    2.4.12 countByKey()案例

    2.4.13 foreach(func)案例

    2.5 RDD中的函数传递

    2.5.1 传递一个方法

    2.5.2 传递一个属性

    2.6 RDD依赖关系

    2.6.1 Lineage

    2.6.2 窄依赖

    2.6.3 宽依赖

    2.6.4 DAG

    2.6.5 任务划分(面试重点)

    2.7 RDD缓存

    2.8 RDD CheckPoint

    2.4 Action

    2.4.1 reduce(func)案例

    1. 作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

    2. 需求:创建一个RDD,将所有元素聚合得到结果。

    (1)创建一个RDD[Int]

    scala> val rdd1 = sc.makeRDD(1 to 10,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24

    (2)聚合RDD[Int]所有元素

    scala> rdd1.reduce(_+_)

    res50: Int = 55

    (3)创建一个RDD[String]

    scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))

    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24

    (4)聚合RDD[String]所有数据

    scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))

    res51: (String, Int) = (adca,12)

    2.4.2 collect()案例

    1. 作用:在驱动程序中,以数组的形式返回数据集的所有元素。

    2. 需求:创建一个RDD,并将RDD内容收集到Driver端打印

    (1)创建一个RDD

    scala> val rdd = sc.parallelize(1 to 10)

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

    (2)将结果收集到Driver端

    scala> rdd.collect

    res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)   

    2.4.3 count()案例

    1. 作用:返回RDD中元素的个数

    2. 需求:创建一个RDD,统计该RDD的条数

    (1)创建一个RDD

    scala> val rdd = sc.parallelize(1 to 10)

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

    (2)统计该RDD的条数

    scala> rdd.count

    res1: Long = 10

    2.4.4 first()案例

    1. 作用:返回RDD中的第一个元素

    2. 需求:创建一个RDD,返回该RDD中的第一个元素

    (1)创建一个RDD

    scala> val rdd = sc.parallelize(1 to 10)

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

    (2)统计该RDD的条数

    scala> rdd.first

    res2: Int = 1

    2.4.5 take(n)案例

    1. 作用:返回一个由RDD的前n个元素组成的数组

    2. 需求:创建一个RDD,统计该RDD的条数

    (1)创建一个RDD

    scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

    (2)统计该RDD的条数

    scala> rdd.take(3)

    res10: Array[Int] = Array(2, 5, 4)

    2.4.6 takeOrdered(n)案例

    1. 作用:返回该RDD排序后的前n个元素组成的数组

    2. 需求:创建一个RDD,统计该RDD的条数

    (1)创建一个RDD

    scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

    (2)统计该RDD的条数

    scala> rdd.takeOrdered(3)

    res18: Array[Int] = Array(2, 3, 4)

    2.4.7 aggregate案例

    1. 参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

    2. 作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

    3. 需求:创建一个RDD,将所有元素相加得到结果

    (1)创建一个RDD

    scala> var rdd1 = sc.makeRDD(1 to 10,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

    (2)将该RDD所有元素相加得到结果

    scala> rdd.aggregate(0)(_+_,_+_)

    res22: Int = 55

    2.4.8 fold(num)(func)案例

    1. 作用:折叠操作,aggregate的简化操作,seqop和combop一样。

    2. 需求:创建一个RDD,将所有元素相加得到结果

    (1)创建一个RDD

    scala> var rdd1 = sc.makeRDD(1 to 10,2)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

    (2)将该RDD所有元素相加得到结果

    scala> rdd1.fold(0)(_+_)

    res24: Int = 55

     

    解释:如果num值为1,fold结果为58,是因为先对组内折叠时两组+1,然后组间折叠+1

    2.4.9 saveAsTextFile(path)

    作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

    2.4.10 saveAsSequenceFile(path) 

    作用:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

    2.4.11 saveAsObjectFile(path) 

    作用:用于将RDD中的元素序列化成对象,存储到文件中。

    2.4.12 countByKey()案例

    1. 作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

    2. 需求:创建一个PairRDD,统计每种key的个数

    (1)创建一个PairRDD

    scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)

    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24

    (2)统计每种key的个数

    scala> rdd.countByKey

    res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

    2.4.13 foreach(func)案例

    1. 作用:在数据集的每一个元素上,运行函数func进行更新。

    2. 需求:创建一个RDD,对每个元素进行打印

    (1)创建一个RDD

    scala> var rdd = sc.makeRDD(1 to 5,2)

    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24

    (2)对该RDD每个元素进行打印

    scala> rdd.foreach(println(_))

    3

    4

    5

    1

    2

    2.5 RDD中的函数传递

    在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。下面我们看几个例子:

    2.5.1 传递一个方法

    1.创建一个类

    class Search(query:String){

      //过滤出包含字符串的数据
      def isMatch(s: String): Boolean = {
        s.contains(query)
      }

      //过滤出包含字符串的RDD
      def getMatch1 (rdd: RDD[String]): RDD[String] = {
        rdd.filter(isMatch)
      }

      //过滤出包含字符串的RDD
      def getMatche2(rdd: RDD[String]): RDD[String] = {
        rdd.filter(x => x.contains(query))
      }

    }

    2.创建Spark主程序

    object SeriTest {

     

      def main(args: Array[String]): Unit = {

     

        //1.初始化配置信息及SparkContext

        val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

        val sc = new SparkContext(sparkConf)

     

        //2.创建一个RDD

        val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))

     

        //3.创建一个Search对象

        val search = new Search("h")

     

        //4.运用第一个过滤函数并打印结果

        val match1: RDD[String] = search.getMatch1(rdd)

        match1.collect().foreach(println)

      }

    }

    3.运行程序

    Exception in thread "main" org.apache.spark.SparkException: Task not serializable

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)

    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

    at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)

    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)

    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

    at org.apache.spark.rdd.RDD.filter(RDD.scala:386)

    at RDD.action.Search.getMatch1(SeriTest.scala:38)

    at RDD.action.SeriTest$.main(SeriTest.scala:25)

    at RDD.action.SeriTest.main(SeriTest.scala)

    Caused by: java.io.NotSerializableException: RDD.action.Search

    Serialization stack:

    - object not serializable (class: RDD.action.Search, value: RDD.action.Search@238ad8c)

    - field (class: RDD.action.Search$$anonfun$getMatch1$1, name: $outer, type: class RDD.action.Search)

    - object (class RDD.action.Search$$anonfun$getMatch1$1, <function1>)

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)

    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)

    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

    ... 12 more

     

    4.问题说明

    //过滤出包含字符串的RDD

      def getMatch1 (rdd: RDD[String]): RDD[String] = {

        rdd.filter(isMatch)

      }

    在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。

    5.解决方案

    使类继承scala.Serializable即可。

    class Search() extends Serializable{...}

     

    2.5.2 传递一个属性

    1.创建Spark主程序

    object TransmitTest {

     

      def main(args: Array[String]): Unit = {

     

        //1.初始化配置信息及SparkContext

        val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

        val sc = new SparkContext(sparkConf)

     

    //2.创建一个RDD

        val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))

     

    //3.创建一个Search对象

        val search = new Search("h"

    )

     

    //4.运用第一个过滤函数并打印结果

        val match1: RDD[String] = search.getMatche2(rdd)

        match1.collect().foreach(println)

        }

    }

    2.运行程序

    Exception in thread "main" org.apache.spark.SparkException: Task not serializable

        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)

        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

        at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)

        at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)

        at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

        at org.apache.spark.rdd.RDD.filter(RDD.scala:386)

        at com.atguigu.Search.getMatche1(SeriTest.scala:39)

        at com.atguigu.SeriTest$.main(SeriTest.scala:18)

        at com.atguigu.SeriTest.main(SeriTest.scala)

    Caused by: java.io.NotSerializableException: com.atguigu.Search

    3.问题说明

      //过滤出包含字符串的RDD

      def getMatche2(rdd: RDD[String]): RDD[String] = {

        rdd.filter(x => x.contains(query))

      }

    在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this. query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。

    4.解决方案

    1)使类继承scala.Serializable即可。

    class Search() extends Serializable{...}

     

    2)将类变量query赋值给局部变量

    修改getMatche2为

      //过滤出包含字符串的RDD

      def getMatche2(rdd: RDD[String]): RDD[String] = {

        val query_ : String = this.query//将类变量赋值给局部变量

        rdd.filter(x => x.contains(query_))

      }

     

    2.6 RDD依赖关系

    2.6.1 Lineage

    RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

     

    (1)读取一个HDFS文件并将其中内容映射成一个个元组

    scala> val wordAndOne = sc.textFile("/fruit.tsv").flatMap(_.split("\t")).map((_,1))

    wordAndOne: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:24

    (2)统计每一种key对应的个数

    scala> val wordAndCount = wordAndOne.reduceByKey(_+_)

    wordAndCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:26

    (3)查看“wordAndOne”的Lineage

    scala> wordAndOne.toDebugString

    res5: String =

    (2) MapPartitionsRDD[22] at map at <console>:24 []

     |  MapPartitionsRDD[21] at flatMap at <console>:24 []

     |  /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []

     |  /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []

    (4)查看“wordAndCount”的Lineage

    scala> wordAndCount.toDebugString

    res6: String =

    (2) ShuffledRDD[23] at reduceByKey at <console>:26 []

     +-(2) MapPartitionsRDD[22] at map at <console>:24 []

        |  MapPartitionsRDD[21] at flatMap at <console>:24 []

        |  /fruit.tsv MapPartitionsRDD[20] at textFile at <console>:24 []

        |  /fruit.tsv HadoopRDD[19] at textFile at <console>:24 []

    (5)查看“wordAndOne”的依赖类型

    scala> wordAndOne.dependencies

    res7: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@5d5db92b)

    (6)查看“wordAndCount”的依赖类型

    scala> wordAndCount.dependencies

    res8: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@63f3e6a8)

    注意:RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

    2.6.2 窄依赖

    窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女

     

    2.6.3 宽依赖

    宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生

     

    2.6.4 DAG

    DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据

     

    2.6.5 任务划分(面试重点)

    RDD任务切分中间分为:Application、Job、Stage和Task

    1)Application:初始化一个SparkContext即生成一个Application

    2)Job:一个Action算子就会生成一个Job

    3)Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。

     

    4)Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。

    注意:Application->Job->Stage-> Task每一层都是1对n的关系。

    2.7 RDD缓存

    RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。

    但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

     

    通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

     

    在存储级别的末尾加上“_2”来把持久化数据存为两份

    缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

    (1)创建一个RDD

    scala> val rdd = sc.makeRDD(Array("atguigu"))

    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at makeRDD at <console>:25

    (2)将RDD转换为携带当前时间戳做缓存

    scala> val nocache = rdd.map(_.toString+System.currentTimeMillis)

    nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27

    (3)多次打印结果

    scala> nocache.collect

    res0: Array[String] = Array(atguigu1538978275359)

     

    scala> nocache.collect

    res1: Array[String] = Array(atguigu1538978282416)

     

    scala> nocache.collect

    res2: Array[String] = Array(atguigu1538978283199)

    (4)将RDD转换为携带当前时间戳做缓存

    scala> val cache =  rdd.map(_.toString+System.currentTimeMillis).cache

    cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27

    (5)多次打印做了缓存的结果

    scala> cache.collect

    res3: Array[String] = Array(atguigu1538978435705)                                   

     

    scala> cache.collect

    res4: Array[String] = Array(atguigu1538978435705)

     

    scala> cache.collect

    res5: Array[String] = Array(atguigu1538978435705)

    2.8 RDD CheckPoint

    Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。

    为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

    案例实操:

    (1)设置检查点

    scala> sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")

    (2)创建一个RDD

    scala> val rdd = sc.parallelize(Array("atguigu"))

    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:24

    (3)将RDD转换为携带当前时间戳并做checkpoint

    scala> val ch = rdd.map(_+System.currentTimeMillis)

    ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:26

     

    scala> ch.checkpoint

    (4)多次打印结果

    scala> ch.collect

    res55: Array[String] = Array(atguigu1538981860336)

     

    scala> ch.collect

    res56: Array[String] = Array(atguigu1538981860504)

     

    scala> ch.collect

    res57: Array[String] = Array(atguigu1538981860504)

     

    scala> ch.collect

    res58: Array[String] = Array(atguigu1538981860504)

    展开全文
  • 宽依赖: 发生shuffle时,一定会产生宽依赖宽依赖是一个RDD中的一个Partition被多个子Partition所依赖,也就是说每一个父RDD的Partition中的数据,都可能传输一部分到下一个RDD的多个partition中,此时一定会发生...
  • RDD算子操作会使得RDD分区之间产生不同依赖,主要有两种依赖:宽依赖和窄依赖。 宽依赖:是指一个父RDD的各个分区被一个子RDD的各个分区多次依赖, 窄依赖:是指一个父RDD的各个分区被一个子RDD的各个分区一次依赖,...
  • RDD依赖与DAG

    2020-11-19 14:44:11
    宽依赖 & 窄依赖 在spark中,rdd间的依赖关系分为两种,宽依赖和窄依赖 ...该转换过程的算子叫做宽依赖算子,比如groupByKey. reduceByKey aggravateByKey 源码 abstract class Dependency[T] extends Serializable {
  • Spark RDD 宽依赖&窄依赖1.窄依赖2.宽依赖:3.阶段的划分4.宽依赖和窄依赖的作用: 1.窄依赖 每一个父RDD的Partition最多被子RDD的一个Partition使用 (一对一的关系) 常见算子:map flatmap filter union sample...
  • 一、宽窄依赖 RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄...ps:Spark中产生宽窄依赖的依据是shuffle,当发生shuffle时,会产生宽依赖,基本上shuffle算子都会产生宽依赖,但是join除外,在执行join算子之前
  • 宽依赖与窄依赖区别

    2020-04-16 11:42:26
    依赖 窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用。换句话说,一个父RDD的分区对应于一个子RDD的分区,或者多个父RDD的分区对应于一个子RDD的分区。所以窄依赖又可以分为两种情况: ...宽依赖是指父RDD的每...
  • 1.RDD 之间的依赖关系 ... 从算子视角上来看,splitRDD通过map算子得到了tupleRDD, 所以splitRDD和tupleRDD之间的关系是map 但是仅仅这样说, 会不够全面, 从细节上来看,RDD只是数据和关于数据...
  • 一般是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)(简单理解为一对多的关系),称为shuffle Depencency,shuffle的算子一般都会是宽依赖,比如...
  • 1.宽依赖和窄依赖操作算子的区别 2.宽依赖和窄依赖类型区别 二、概念 1.窄依赖 (1)概念 子RDD的每个分区的数据来自常数个父RDD分区;父RDD的每个分区的数据到子RDD的时候在一个分区中进行处理。即,父...
  • 相邻两个RDD处理之前是父RDD,处理之后就是子rdd,具体你用的什么算子产生没有产生shuffle就是会出现宽依赖和窄依赖的现象. 宽依赖 多对多,一对多(父RDD的一个或者多个分区,可能被子RDD多个分区所使用,) 窄依赖 一对一...
  • 开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下...
  • 聊聊Spark中的宽依赖和窄依赖

    千次阅读 2018-07-08 22:35:46
    开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算...而后面的却是宽依赖:我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来消费该数据。同样...
  • Spark中产生宽窄依赖的依据是shuffle,当发生shuffle时,会产生宽依赖,基本上shuffle算子都会产生宽依赖,但是join除外,在执行join算子之前如果先执行groupByKey,执行groupByKey之后,会把相同的key分到同一个...
  • 宽依赖: 发生shuffle时,一定会产生宽依赖宽依赖是一个RDD中的一个Partition被多个子Partition所依赖(一个父亲多有儿子),也就是说每一个父RDD的Partition中的数据,都可能传输一部分到下一个RDD的多个...
  • 窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区。...宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区。这其中又分两种...
  • 一、窄依赖(narrow dependencies) 1、子RDD的每个分区依赖于常数个父分区(既与数据规模无关) 2、输入输出一对一的算子,且结果RDD的每个分区结构不变,主要是map、flatMap 3、输入输出一对一,但...二、...
  • 文章目录算子总结map和mapPartitions的区别map和foreach的区别:foreach和foreachPartition的区别:RDD类型RDD依赖关系窄依赖宽依赖join有时宽依赖有时窄依赖 算子总结 map和mapPartitions的区别 map是处理RDD里的每个...
  • [1]宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面...
  • Spark算子API解析

    2020-08-23 19:06:14
    如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。 缓存 如果在应用程序中多次使用同一个RDD,...
  • 文章目录核心术语spark运行机理Transformationsmap和mapPartitionsforeach和foreachpartitioncoalesce和repartitionShuffle窄依赖和宽依赖Stagegroupbykey和reducebykey代码map和mapPartitionforeach和...
  • 宽依赖父RDD的每个分区都回最多被子类的一个RDD所使用 窄依赖父RDD的每个分区会被多个子类的RDD分区所使用 dataset 和datastream 区别 1.keyBy = groupBy() 根据key的hash值进行分组聚合 datastream spark ...

空空如也

空空如也

1 2 3 4 5 ... 7
收藏数 121
精华内容 48
关键字:

宽依赖算子