kv类型 spark_spark算子 转化为kv值 - CSDN
精华内容
参与话题
  • reduceByKey、foldByKey、aggregateByKey、combineByKey之前的区别和联系 源码主要逻辑函数比较 算子 源码 ...combineByKetWithClassTag[V] ((v:V)=>...combineByKetWithClassTag[V] ((v:...

    reduceByKey、foldByKey、aggregateByKey、combineByKey之间的区别和联系

    源码主要逻辑函数比较

    算子 源码
    reduceByKey( ) combineByKeyWithClassTag[V] ((v:V)=>v,func,func)
    foldByKey( )( ) combineByKeyWithClassTag[V] ((v:V)=>cleanedFunc(createZero(),v),cleanedFunc,cleanedFunc)
    aggregateByKey( )( , ) combineByKeyWithClassTag[V] ((v:V)=>cleanedSeqOp(createZero(),v),cleanedSeqOp,combOp)
    combineByKey( , , ) combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners)

    灵活性:
    reduceByKey < foldByKey < aggregateByKey < combineByKey

    都有一个默认参数:mapSideCombine=true,即都在map端进行的combine操作,进行了提前的预聚合。

    展开全文
  • kv 类型RDD 在java 显示:JavaPairRDD<String, Integer> 而不是JavaRDD<Tuple2<String, Integer>> 这个很重要不要搞错

    kv 类型RDD 在java 显示:JavaPairRDD<String, Integer>
    而不是JavaRDD<Tuple2<String, Integer>>
    这个很重要不要搞错

    展开全文
  • 大多数的 Spark 操作可以用在任意类型的 RDD 上, 但是有一些比较特殊的操作只能用在key-value类型的 RDD 上. 这些特殊操作大多都涉及到 shuffle 操作, 比如: 按照 key 分组(group), 聚集(aggregate)等. 在 Spark 中,...

    大多数的 Spark 操作可以用在任意类型的 RDD 上, 但是有一些比较特殊的操作只能用在key-value类型的 RDD 上.

    这些特殊操作大多都涉及到 shuffle 操作, 比如: 按照 key 分组(group), 聚集(aggregate)等.

    在 Spark 中, 这些操作在包含对偶类型(Tuple2)的 RDD 上自动可用(通过隐式转换).

    object RDD {
      implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
        (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
        new PairRDDFunctions(rdd)
      }

    键值对的操作是定义在PairRDDFunctions类上, 这个类是对RDD[(K, V)]的装饰.

    1、partitionBy

    作用: 对pairRDD 进行分区操作,如果原有的 partionRDD 的分区器和传入的分区器相同, 则返回原pairRDD,否则会生成 ShuffleRDD,即会产生 shuffle过程

    def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
      
      if (self.partitioner == Some(partitioner)) {
        self
      } else {
        new ShuffledRDD[K, V, V](self, partitioner)
      }
    }
    scala> val rdd1 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "d")))
    scala> rdd1.partitions.length
    res1: Int = 2
    
    scala> rdd1.partitionBy(new org.apache.spark.HashPartitioner(3)).partitions.length
    res3: Int = 3

    2、reduceByKey(func,[numTasks])

    作用: 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的reduce函数,将相同key的value聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

    scala> val rdd1 = sc.parallelize(List(("female",1),("male",5),("female",5)("male",2)))
    scala> rdd1.reduceByKey(_ + _)
    
    scala> res1.collect
    res2: Array[(String, Int)] = Array((female,6), (male,7))

    3、groupByKey()

    作用: 按照key进行分组.

    scala> val rdd1 = sc.parallelize(Array("hello", "world", "h", "hello", "are", "go"))
    
    scala> val rdd2 = rdd1.map((_, 1))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:26
    
    scala> rdd2.groupByKey()
    res3: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[4] at groupByKey at <console>:29
    
    scala> res3.collect
    res4: Array[(String, Iterable[Int])] = Array((are,CompactBuffer(1)), (hello,CompactBuffer(1, 1)), (go,CompactBuffer(1)), (h,CompactBuffer(1)), (world,CompactBuffer(1)))
    
    scala> res3.map(t => (t._1, t._2.sum))
    res5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:31
                    
    scala> res5.collect
    res7: Array[(String, Int)] = Array((are,1), (hello,2), (go,1), (h,1), (world,1))

    注意:

    (1)基于当前的实现,groupByKey必须在内存中持有所有的键值对 . 如果一个key有太多的value, 则会导致内存溢出(OutOfMemoryError)

    (2)所以这操作非常耗资源, 如果分组的目的是为了在每个key上执行聚合操作(比如: sum 和 average), 则应该使用PairRDDFunctions.aggregateByKey或者PairRDDFunctions.reduceByKey, 因为他们有更好的性能(会先在分区进行预聚合)

    4、reduceByKey和groupByKey的区别

    (1)reduceByKey:按照Key进行聚合,在Shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]

      def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
        combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
      }
      def combineByKeyWithClassTag[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
        require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
        if (keyClass.isArray) {
          if (mapSideCombine) {
            throw new SparkException("Cannot use map-side combining with array keys.")
          }
          if (partitioner.isInstanceOf[HashPartitioner]) {
            throw new SparkException("HashPartitioner cannot partition array keys.")
          }
        }
        val aggregator = new Aggregator[K, V, C](
          self.context.clean(createCombiner),
          self.context.clean(mergeValue),
          self.context.clean(mergeCombiners))
        if (self.partitioner == Some(partitioner)) {
          self.mapPartitions(iter => {
            val context = TaskContext.get()
            new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
          }, preservesPartitioning = true)
        } else {
          new ShuffledRDD[K, V, C](self, partitioner)
            .setSerializer(serializer)
            .setAggregator(aggregator)
            .setMapSideCombine(mapSideCombine)
        }
      }

    (2)groupByKey:按照key进行分组,直接进行shuffle

      def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
        // groupByKey shouldn't use map side combine because map side combine does not
        // reduce the amount of data shuffled and requires all map side data be inserted
        // into a hash table, leading to more objects in the old gen.
        val createCombiner = (v: V) => CompactBuffer(v)
        val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
        val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
        val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
          createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
        bufs.asInstanceOf[RDD[(K, Iterable[V])]]
      }

    5、aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

    函数声明:

        /**
       * Aggregate the values of each key, using given combine functions and a neutral "zero value".
       * This function can return a different result type, U, than the type of the values in this RDD,
       * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
       * as in scala.TraversableOnce. The former operation is used for merging values within a
       * partition, and the latter is used for merging values between partitions. To avoid memory
       * allocation, both of these functions are allowed to modify and return their first argument
       * instead of creating a new U.
       */
      def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
          combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
        aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
      }

    使用给定的 combine 函数和一个初始化的zero value, 对每个key的value进行聚合.

    这个函数返回的类型U不同于源 RDD 中的V类型. U的类型是由初始化的zero value来定的. 所以, 我们需要两个操作: -

    一个操作(seqOp)去把 1 个v变成 1 个U - 另外一个操作(combOp)来合并 2 个U

    一个操作用于在一个分区进行合并, 第二个操作用在两个分区间进行合并.

    为了避免内存分配, 这两个操作函数都允许返回第一个参数, 而不用创建一个新的U

    (1) eroValue:给每一个分区中的每一个key一个初始值;

    (2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;

    (3)combOp:函数用于合并每个分区中的结果。

    创建一个 pairRDD,取出每个分区相同key对应值的最大值,然后相加

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author z
     * Date 2019-12-09 15:39:08
     */
    object AggregateByKey {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setAppName("MySqlRead").setMaster("local[2]")
            
            val sc = new SparkContext(conf)
            
            val rdd = sc.parallelize(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
            
            /*     val rdd2 = rdd.aggregateByKey((Int.MinValue, Int.MaxValue))(
                     (x, y) => (x._1.max(y), x._2.min(y)),
                     (x, y) => (x._1 + y._1, x._2 + x._2)
                 )*/
            val rdd2 = rdd.aggregateByKey((Int.MinValue, Int.MaxValue))(
                {   //分区内相同key的(最大值,最小值)
                    case (kv, e) => (kv._1.max(e), kv._2.min(e))
                },
                {   //两个分区间数据的合并
                    case (kv1, kv2) => (kv1._1 + kv2._1, kv1._2 + kv2._2)
                }
            )
            
            // 计算出来每个key对应的值的平均值!!
            /*  val rdd2=rdd.aggregateByKey((0, 0))(
                  {     //(sum,count)即为zero value,每个key
                      case ((sum, count), e) => (sum + e, count + 1)
                  },
                  {
                      case ((sum1,count1),(sum2,count2)) => (sum1 + sum2, count1 + count2)
                  }
              )*/
            
            //val rdd3 = rdd2.mapValues(kv => kv._1.toDouble / kv._2)
            rdd2.collect().foreach(println)
        }
    }

    6、foldByKey

    参数: (zeroValue:V)(func: (V, V) => V): RDD[(K, V)]

    作用:aggregateByKey的简化操作seqop和combop相同

    object FoldLeft {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("FoldLeft").setMaster("local[2]")
            val sc: SparkContext = new SparkContext(conf)
            val rdd= sc.parallelize(Array(("c","3"), ("c","2"), ("c","4"), ("c","3"), ("c","6"), ("c","8")), 3)
            
            // foldByKey来说, 0值, 每个分区内用一次. 重点: 分区间合并的时候, 零值不参与
            val res = rdd.foldByKey("-")(_ + _)
            res.collect.foreach(println)               
            sc.stop()
            
        }
    }

    7、combineByKey

    def combineByKey[C](
                           createCombiner: V => C,
                           mergeValue: (C, V) => C,
                           mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
            partitioner, mapSideCombine, serializer)(null)
    }

    作用: 针对每个K, 将V进行合并成C, 得到RDD[(K,C)]

    参数描述:

    (1)createCombiner: combineByKey会遍历分区中的每个key-value对. 如果第一次碰到这个key, 则调用createCombiner函数,传入value, 得到一个C类型的值.(如果不是第一次碰到这个 key, 则不会调用这个方法)

    (2)mergeValue: 如果不是第一个遇到这个key, 则调用这个函数进行合并操作. 分区内合并

    (3)mergeCombiners 跨分区合并相同的key的值(C). 跨分区合并

    创建一个 pairRDD,根据 key 计算每种 key 的value的平均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果

    object CombineByKey {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setAppName("MySqlRead").setMaster("local[2]")
            val sc = new SparkContext(conf)
            val rdd = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)
            val rdd2:RDD[(String,(Int,Int))] = rdd.combineByKey(
                (_, 1),
                {
                    case ((sum: Int, count: Int), e:Int) => (sum + e, count + 1)
                },
                {
                    case ((sum1: Int, count1: Int), (sum2:Int, count2:Int)) => (sum1 + sum2, count1 + count2)
                }
            )
            val rdd3 = rdd2.mapValues {
                case (sum, count) => (sum, count, sum.toDouble / count)
            }               
            rdd3.collect.foreach(println)
        }
    }

    8、sortByKey

    作用: 在一个(K,V)的 RDD 上调用, K必须实现 Ordered[K] 接口(或者有一个隐式值: Ordering[K]), 返回一个按照key进行排序的(K,V)的 RDD

    object SorkByKey {
        //1. 冥界召唤,需要样例类
       /* implicit val ord = new Ordering[User]{
            override def compare(x: User, y: User): Int = x.age - y.age
        }
        */
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("SorkByKey").setMaster("local[2]")
            val sc: SparkContext = new SparkContext(conf)
    //        val rdd = sc.parallelize(Array((1, "a"), (10, "b"), (11, "c"), (4, "d"), (20, "d"), (10, "e")))
    //        val res: RDD[(Int, String)] = rdd.sortByKey(ascending = false, numPartitions = 10)
            val rdd = sc.parallelize(Array(User(10, "a"), User(8, "c"), User(12, "b"))).map((_, 1))
            val res: RDD[(User, Int)] = rdd.sortByKey()
            
            res.collect.foreach(println)
            sc.stop()
            
        }
    }
    //
    //case class User(id:Int,name:String)
    //2. 继承 Ordered
    case class User(age: Int, name:String) extends Ordered[User] {
        override def compare(that: User): Int = this.age - that.age
    }
    

    9、mapValues

    作用: 针对(K,V)形式的类型只对V进行操作

    scala> val rdd = sc.parallelize(Array((1, "a"), (10, "b"), (11, "c"), (4, "d"), (20, "d"), (10, "e")))
    
    scala> rdd.mapValues("<" + _ + ">").collect
    res29: Array[(Int, String)] = Array((1,<a>), (10,<b>), (11,<c>), (4,<d>), (20,<d>), (10,<e>))

    10、join(otherDataSet,[numTasks])

    内连接:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的RDD

    object Join {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("Join").setMaster("local[2]")
            val sc: SparkContext = new SparkContext(conf)
            var rdd1 = sc.parallelize(Array((1, "a"), (1, "b"), (2, "c"), (4, "d")))
            var rdd2 = sc.parallelize(Array((1, "aa"),(1, "bb"), (3, "bb"), (2, "cc")), 3)
            // 内连接
    //        val res: RDD[(Int, (String, String))] = rdd1.join(rdd2)
    //        var res = rdd1.leftOuterJoin(rdd2)
    //        val res: RDD[(Int, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
            val res = rdd1.rightOuterJoin(rdd2)
            println(res.partitions.length)
            res.collect.foreach(println)
            sc.stop()        
        }
    }
    

    (1) 如果某一个 RDD 有重复的 Key, 则会分别与另外一个 RDD 的相同的 Key进行组合.

    (2)也支持外连接: leftOuterJoin,rightOuterJoin,fullOuterJoin.

    11、cogroup(otherDataSet,[numTasks])

    作用:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD

    object Cogroup {
        def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("Cogroup").setMaster("local[2]")
            val sc: SparkContext = new SparkContext(conf)
            val rdd1 = sc.parallelize(Array((1, 10), (2, 20), (1, 100), (3, 30)), 1)
            val rdd2 = sc.parallelize(Array((1, "a"), (2, "b"), (1, "aa"), (3, "c")), 1)
            val res: RDD[(Int, (Iterable[Int], Iterable[String]))] = rdd1.cogroup(rdd2)
            res.collect.foreach(println)
            sc.stop()
        }
    }
    (1,(CompactBuffer(10, 100),CompactBuffer(a, aa)))
    (3,(CompactBuffer(30),CompactBuffer(c)))
    (2,(CompactBuffer(20),CompactBuffer(b)))
    展开全文
  • 原文发在我的公众号微信公众号"大数据学习应用"中 ...本文系个人原创 请勿私自转载 ...spark的编程归根结底就是对spark算子的使用,因此非常有必要熟练掌握这些内置算子。 本文重点分析以下spark算子 groupByKey ...

    在这里插入图片描述

    原文发在我的公众号微信公众号"大数据学习应用"中
    公众号后台回复"spark源码"可查看spark源码分析系列
    本文系个人原创 请勿私自转载

    本文共约4400字

    前言

    spark内置了非常多有用的算子,通过对这些算子的组合就可以完成业务需要的功能。

    spark的编程归根结底就是对spark算子的使用,因此非常有必要熟练掌握这些内置算子。

    本文重点分析以下spark算子

    • groupByKey
    • reduceByKey
    • aggregateByKey
    • foldByKey
    • combineByKey

    这几个算子操作的对象都是(k,v)类型的RDD

    虽然都有迭代合并的意思 但不同点在于传入的参数以及分区内分区间的计算规则

    groupByKey()

    函数签名

    def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
        groupByKey(defaultPartitioner(self))
    }
    
    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
        groupByKey(new HashPartitioner(numPartitions))
    }
    
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
        // groupByKey shouldn't use map side combine because map side combine does not
        // reduce the amount of data shuffled and requires all map side data be inserted
        // into a hash table, leading to more objects in the old gen.
        val createCombiner = (v: V) => CompactBuffer(v)
        val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
        val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
        val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
            createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
        bufs.asInstanceOf[RDD[(K, Iterable[V])]]
    }
    

    函数说明

    • groupByKey()称为分组合并
    • 对相同的key进行分组 并对每个key返回一个Iterable[V]
    • Iterable[V]存放的是之前相同的key所对应的一个一个的value
    • 如果直接输出 则value默认为CompactBuffer数据结构
    • groupByKey()处理数据时需要等待,等待所有相同的key都到达时,才能继续往后执行
    • groupByKey()会将数据打乱重组,也就是说含有shuffle的过程,但是又不能在内存中等待数据,所以必须将shuffle的数据落盘等待

    关于CompactBuffer

    CompactBufferspark里的数据结构,它继承自一个迭代器和序列,所以它的返回值是一个能进行循环遍历的集合

    /**
    * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
    * ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
    * so it has about 80-100 bytes of overhead. In contrast, CompactBuffer can keep up to two
    * elements in fields of the main object, and only allocates an Array[AnyRef] if there are more
    * entries than that. This makes it more efficient for operations like groupBy where we expect
    * some keys to have very few elements.
    */
    /**
    类似于ArrayBuffer的仅追加缓冲区,但是对于小型缓冲区而言,其内存效率更高。
    ArrayBuffer总是分配一个Object数组来存储数据,默认情况下有16个条目,
    因此它有大约80-100字节的开销。 
    相反,CompactBuffer最多可以在主对象的字段中保留两个元素,并且仅当有更多条目时才分配Array [AnyRef]。 
    这对于像groupBy这样的操作来说效率更高,因为我们希望某些键的元素很少。
    */
    private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
    

    代码举例

    var rdd = sc.makeRDD(
        List(
            ("hello", 1),
            ("hello", 2),
            ("hadoop", 2),
            ("hadoop", 2),
            ("hadoop", 4)
        )
    )
    
    // 使用key进行分组操作
    val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    rdd1.collect().foreach(println)
    // 可以直接输出 结果为
    //(hadoop,CompactBuffer(2, 2, 4))
    //(hello,CompactBuffer(1, 2))
    
    val rdd2 = rdd1.mapValues(
        datas => {
            datas.sum
        }
    )
    rdd2.collect().foreach(println)
    // 也可以将数据迭代取出进行后续操作之后输出 结果为
    // (hadoop,8)
    // (hello,3)
    

    作图示例

    reduceByKey()

    函数签名

    def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
        reduceByKey(defaultPartitioner(self), func)
    }
    
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
        reduceByKey(new HashPartitioner(numPartitions), func)
    }
    
    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
        combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
    }
    

    函数说明

    • reduceByKey()groupByKey()基础上升级

    • 区别在于 groupByKey()的代码中 mapSideCombine = false

    • 也说就是groupByKey()没有map端的预聚合操作,直接进行shuffle

    • reduceByKey()会在分区内做预聚合,然后再进行shuffle聚合,返回的结果是RDD

      • 我们一般将分区内聚合称之为预聚合 combine
    • 推荐使用reduceByKey()shuffle的过程中,落盘的数据量会变少,所以读写磁盘的速度会变快,性能更高

    代码举例

    var rdd = sc.makeRDD(
        List(
            ("Hello", 1),
            ("Hadoop", 2),
            ("Hello", 3),
            ("Hadoop", 4),
            ("Hadoop", 5),
            ("Hello", 6),
            ("Hadoop", 7)
        )
    )
    
    // spark中所有的byKey算子都需要通过KV类型的RDD进行调用
    // reduceByKey = 分组 + 聚合
    // 分组操作已经由Spark自动完成,按照key进行分组。然后在数据的value进行两两聚合
    val rdd1: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
    
    rdd1.collect().foreach(println)
    // 结果为
    // (Hadoop,18)
    // (Hello,10)
    

    作图示例

    aggregateByKey

    函数签名

    def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
                                                  combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
        aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
    }
    
    def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
                                                                      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
        aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
    }
    
    def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
                                                                            combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
        // Serialize the zero value to a byte array so that we can get a new clone of it on each key
        val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
        val zeroArray = new Array[Byte](zeroBuffer.limit)
        zeroBuffer.get(zeroArray)
    
        lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
        val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
    
        // We will clean the combiner closure later in `combineByKey`
        val cleanedSeqOp = self.context.clean(seqOp)
        combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
                                    cleanedSeqOp, combOp, partitioner)
    }
    

    函数说明

    • 将数据根据不同的规则进行分区内计算和分区间计算 并且可以给定初始值zeroValue

    • zeroValue: 给每一个分区中的每一个 key 一个初始值

      seqOp: 函数用于在每一个分区中用初始值逐步迭代 value

      combOp: 函数用于合并每个分区中的结果

    • keyvalue 进行分组合并,合并时,将每个 value 和初始值作为 seq 函数的参数,进行计算,返回的结果作为一个新的 kv 对,然后再将结果按照 key 进行合并

    • 最后将每个分组的 value 传递给 comb 函数进行计算(先将前两个 value 进行计算,将返回结果和下一个 value 传给 comb 函数,以此类推),将 key 与计算结果作为一个新的 kv 对输出

    代码举例

    // 取出每个分区内相同key的最大值然后分区间相加
    // aggregateByKey算子是函数柯里化,存在两个参数列表
    // 1. 第一个参数列表中的参数表示每个key的初始值
    // 2. 第二个参数列表中含有两个参数
    //    2.1 第一个参数表示分区内的计算规则
    //    2.2 第二个参数表示分区间的计算规则
    val rdd =
    sc.makeRDD(List(
        ("a",1),("a",2),("c",3),
        ("b",4),("c",5),("c",6)
    		  ),2)
    // 0:("a",1),("a",2),("c",3) => (a,5)(c,5)
    //                                         => (a,5)(b,5)(c,11)
    // 1:("b",4),("c",5),("c",6) => (b,5)(c,6)
    
    val resultRDD =
    rdd.aggregateByKey(5)(
        (x, y) => math.max(x,y),
        (x, y) => x + y
    )
    
    resultRDD.collect().foreach(println)
    // 结果为
    // (b,5)
    // (a,5)
    // (c,11)
    

    作图示例

    foldByKey()

    函数签名

    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
        foldByKey(zeroValue, defaultPartitioner(self))(func)
    }
    
    def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
        foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
    }
    
    def foldByKey(
        zeroValue: V,
        partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
        // Serialize the zero value to a byte array so that we can get a new clone of it on each key
        val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
        val zeroArray = new Array[Byte](zeroBuffer.limit)
        zeroBuffer.get(zeroArray)
    
        // When deserializing, use a lazy val to create just one instance of the serializer per task
        lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
        val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
    
        val cleanedFunc = self.context.clean(func)
        combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
                                    cleanedFunc, cleanedFunc, partitioner)
    }
    

    函数说明

    • 当分区内计算规则和分区间计算规则相同时,aggregateByKey()就可以简化为foldByKey()
    • 如果给定的初始值对数据的合并和计算没有任何影响
      • 例如 计算规则为求和 而初始值为0 此时就相当于reduceByKey()

    代码举例

    // 如果aggregateByKey算子中分区内计算规则和分区间计算规则相同的话
    // 那么可以采用其他算子来代替
    
    val rdd =
    sc.makeRDD(List(
        ("a", 1), ("a", 2), ("c", 3),
        ("b", 4), ("c", 5), ("c", 6)
    		  ), 2)
    
    // 如果做加法 初始值为0时就相当于 reduceByKey(_+_)
    val resultRDD = rdd.foldByKey(5)(_ + _)
    
    resultRDD.collect().foreach(println)
    // 结果为
    // (b,9)
    // (a,8)
    // (c,24)
    

    作图示例

    combineByKey()

    函数签名

    def combineByKey[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
    }
    
    def combineByKey[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        numPartitions: Int): RDD[(K, C)] = self.withScope {
        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
    }
    
    def combineByKey[C](
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null): RDD[(K, C)] = self.withScope {
        combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
                                 partitioner, mapSideCombine, serializer)(null)
    }
    

    函数说明

    • combineByKey()最通用的对key-valueRDD进行聚集操作的聚集函数
    • aggregateByKey()相比 combineByKey()是使用传入的第一个函数createCombiner对第一次出现的keyvalue进行操作(可允许转换类型),而aggregateByKey()是给各分区中每个key一个初始值,但是没有对原数据进行任何的操作。
    • 如果返回值类型和参数类型一致,使用aggregateByKey()则较为简单

    代码举例

    // TODO : 求每个key的平均值 => ( total, cnt )
    val rdd =
        sc.makeRDD(
            List(
                ("a", 88), ("b", 95), ("a", 91),
                ("b", 93), ("a", 95), ("b", 98))
            , 2)
    
    val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey(
        // 对分区内第一次出现的key对应的value值进行格式的转换
        // 这里我们将Int 转换为元组(Int,Int)
        (x: Int) => (x, 1),
        // 分区内计算规则 数据相加,数量加1
        (x: (Int, Int), y: Int) => {
            (x._1 + y, x._2 + 1)
        },
        // 分区间计算规则 数据相加,数量相加
        (x: (Int, Int), y: (Int, Int)) => {
            (x._1 + y._1, x._2 + y._2) // 数据相加,数量相加
        }
    )
    val resultRDD = rdd1.mapValues(
        t => t._1 / t._2
    )
    // 结果为
    // (b,95)
    // (a,91)
    

    作图示例

    对比

    关键代码对比

    // ---关键代码对比---
    // groupByKey
    combineByKeyWithClassTag[CompactBuffer[V]](
            createCombiner, 
            mergeValue, 
            mergeCombiners, 
            partitioner, 
        	// 这里map端的聚合操作为false
            mapSideCombine = false)
    
    // reduceByKey 
    combineByKeyWithClassTag[V]((v: V) => 
                   v, 
                   func, 
                   func, 
                   partitioner)
    // aggregateByKey
    combineByKeyWithClassTag[U]((v: V) => 
                   // 分区内计算规则 传进去初始值和v
                   cleanedSeqOp(createZero(), v),
                   // 接着在分区内连续使用分区内的计算规则
                   cleanedSeqOp,
                   // 分区间计算规则
                   combOp, 
                   partitioner)
    
    // foldByKey
    combineByKeyWithClassTag[V]((v: V) => 
                   cleanedFunc(createZero(), v),
                   // 分区内和分区间计算规则相同
                   cleanedFunc, 
                   cleanedFunc, 
                   partitioner)
    // combineByKey
    combineByKeyWithClassTag(
        			//第一个参数是对第一次出现的key的value进行处理 可转换类型
        			createCombiner, 
                    mergeValue, 
                    mergeCombiners,
                    partitioner, 
        			// map端的预聚合 上面的几个没有传该参数表示使用默认的 true
                    mapSideCombine, 
        			// 序列化 默认为null
                    serializer)(null)
    

    五大算子比较

    • 从底层来看 五个都是使用相同的底层逻辑
    • groupByKey未进行map端的预聚合操作
    • reduceByKey不会对第一个value进行处理,分区内和分区间计算规则相同
    • aggregateByKey会把初始值和每个第一次出现的key对应的value使用分区内的计算规则进行计算 分区内和分区间计算规则不同
    • foldByKey的算子的分区内和分区间的计算规则相同,并且初始值和第一个value使用的规则相同 是aggregateByKey的简化版
    • combineByKey第一个参数就是对分区内每个第一次出现的keyvalue进行处理,且可以转换类型,所以无需初始值。
    • groupByKey之外的四个算子都支持预聚合功能。所以shuffle性能比较高
    • 上面的算子都可以实现WordCount

    四大聚合算子比较

    算子 初始值 分区内规则 分区间规则 是否相同
    reduceByKey func: (V, V) => V func: (V, V) => V
    aggregateByKey zeroValue: U seqOp: (U, V) => U combOp: (U, U) => U ×
    foldByKey zeroValue: V func: (V, V) => V func: (V, V) => V
    combineByKey createCombiner: V => C mergeValue: (C, V) => C mergeCombiners: (C, C) => C ×

    在这里插入图片描述

    展开全文
  • 从kafka获取到的数据类型: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String, String]] 转载于:https://www.cnblogs.com/xu...
  • spark sortBy sortByKey实战详解

    千次阅读 2020-09-19 16:59:24
    那么在spark中如何排序呢?我们来看一些很有代表性的例子。 1.最简单的排序 假设有个RDD[Int]类型的数据,需要按数据大小进行排序,那这个排序算最简单的: sc.parallelize(Array(1,3,2,4,6,5)).sortBy(x =&...
  • SparkR初体验

    万次阅读 2016-05-24 22:43:06
    突然有个想法,R只能处理百万级别的数据,如果R能运行在spark上多好!搜了下发现13年SparkR这个项目就启动了,感谢美帝! 1.你肯定得先装个spark吧。看这:Spark本地模式与Spark Standalone伪分布模式 2.你肯定得会R...
  • 文章目录第1章Spark整体概述1.1整体概念1.2RDD抽象1.3计算抽象1.4集群模式1.5RPC网络通信抽象1.6启动Standalone集群1.7核心组件1.8核心组件交互流程1.9Block管理1.10整体应用第2章脚本解析2.1start-daemon.sh2.2...
  • Spark学习笔记:Spark基础

    千次阅读 2018-09-03 23:40:20
    Spark基础以及WordCount实现
  • 上一篇中我们介绍了hive中的数据类型,其中一类比较重要的类型即集合类型,主要包括struct、map、array三种。那么我们在spark中处理这三种类型呢?本文就来介绍一下。1、数据...
  • 或者,如果想使用updateStateByKey操作大量的kv类型数据,则所需的存储空间会很大。相反,如果想执行一个简单的map-filter-store操作,则所需的内存将很少。 通常,由于通过接收器接收的数据存储在StorageLevel....
  • Spark Key-Value类型

    2020-09-10 15:04:58
    1) groupByKey案例 1.作用:groupByKey 对每一个进行操作,但只生成一个sequence 2.需求: 创建一个pairRDD,将相同key对应值聚合到一个sequence中,并计算相同对应... "Python", "Python", "Scala", "Spark", "Spark"), 2)
  • 文本数据: <BaseValue::CD type=全数> @ id name value unit // 序号 基准值名 基准值 ...电压.1000 1000 KV # 3 CD.电压.800 800 KV # 4 CD.电压.750 750 KV # 5 CD.电压.660 660 KV # 6 CD.电压.600 600 K...
  • 阅读建议:阅读本文前,最好先阅读《Spark2.1.0——SparkUI的实现》和《Spark2.1.0——WebUI框架体系》。  在SparkContext的初始化过程中,会创建SparkUI。有了对WebUI的总体认识,现在是时候了解SparkContext是...
  • Spark SQL 中数据类型为Map的注意事项

    千次阅读 2019-06-05 20:16:27
    在使用SparkSQL进行处理数据时,将数据保存为Map,并读取出Map的数据 数据列聚合操作后拼接为一个字符集合:BSV ANGLIA_1~BSV ANGLIA---_2 SELECT MMSI, IMO, concat_ws("~",collect_set(concat_ws("_",ShipName,...
  • KVstore 笔记【随时增】

    千次阅读 2017-11-06 11:29:31
    KVstore对比文章 http://blog.csdn.net/cadem/article/details/72516810?locationNum=8&fps=1
  • [Spark版本更新]--2.3.0发行说明

    千次阅读 2018-03-29 08:50:19
    自从2017年12月1日发布spark-2.2.1以来,已有3个月时间。2018年2月28日,spark官方发布了一个大版本Spark-2.3.0,解决了1399个大大小小的问题。一、DataBricks做了相关说明今天,我们很高兴地宣布Databricks上的...
  • 1、Spark编程模型 1.1 术语定义 l应用程序(Application): 基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor; l驱动程序(Driver Program):运行Application的main()函数并且创建...
  • def calcProvinceClickTop(dateProvinceCityAdCountsDS:DStream[(String, Int)], sqlContext:SQLContext): Unit = { //当前批次的记录 val dateProvinceAdCounts:DStream[(String, Int)] = ...
  • 第四天:Spark Streaming

    2020-07-02 07:45:29
    Spark Streaming概述 1. Spark Streaming是什么 Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的...
1 2 3 4 5 ... 20
收藏数 3,500
精华内容 1,400
关键字:

kv类型 spark