key spark_spark根据key自定义分区 - CSDN
精华内容
参与话题
  • Spark算子reduceByKey深度解析

    万次阅读 热门讨论 2016-10-21 11:29:03
    最近经常使用到reduceByKey这个算子,懵逼的时间占据多数,所以沉...国外的大牛一上来给出这么一句话,个人感觉高度概括了reduceByKey的功能:Spark RDD reduceByKey function merges the values for each key using

    最近经常使用到reduceByKey这个算子,懵逼的时间占据多数,所以沉下心来翻墙上国外的帖子仔细过了一遍,发现一篇不错的,在此加上个人的理解整体过一遍这个算子,那么我们开始:

    国外的大牛一上来给出这么一句话,个人感觉高度概括了reduceByKey的功能:

    Spark RDD reduceByKey function merges the values for each key 
    using an associative reduce function.【Spark的RDD的reduceByKey
    是使用一个相关的函数来合并每个key的value的值的一个算子(那么主
    干就是reduceByKey是个算子/函数)】。

    那么这就基本奠定了reduceByKey的作用域是key-value类型的键值对,并且是只对每个key的value进行处理,如果含有多个key的话,那么就对多个values进行处理。这里的函数是我们自己传入的,也就是说是可人为控制的【其实这是废话,人为控制不了这算子一点用没有】。那么举个例子:

      

    scala> val x = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 1),
         | ("a", 1), ("b", 1), ("b", 1),
         | ("b", 1), ("b", 1)), 3)

    我们创建了一个Array的字符串,并把其存入spark的集群上,设置了三个分区【这里我们不关注分区,只关注操作】。那么我们调用reduceByKey并且传入函数进行相应操作【本处我们对相同key的value进行相加操作,类似于统计单词出现次数】:

    scala> val y = x.reduceByKey((pre, after) => (pre + after))
    这里两个参数我们逻辑上让他分别代表同一个key的两个不同values,那么结果想必大家应该猜到了:

    scala> y.collect
    res0: Array[(String, Int)] = Array((a,3), (b,5))
    嗯,到这里大家对reduceByKey有了初步的认识和体会。论坛中有一段写的也很有帮助,由于英文不好怕翻译过来误导大家,所以每次附上原话:

    Basically reduceByKey function works only for RDDs which contains key and value pairs kind of
     elements(i.e RDDs having tuple or Map as a data element). It is a transformation operation 
    which means it is lazily evaluated.We need to pass one associative function as a parameter, 
    which will be applied to the source RDD and will create anew RDD as with resulting values(i.e.
    key value pair). This operation is a wide operation as data shuffling may happen across the 
    partitions.【本质上来讲,reduceByKey函数(说算子也可以)只作用于包含key-value的RDDS上,它是
    transformation类型的算子,这也就意味着它是懒加载的(就是说不调用Action的方法,是不会去计算的
    ),在使用时,我们需要传递一个相关的函数作为参数,这个函数将会被应用到源RDD上并且创建一个新的
    RDD作为返回结果,这个算子作为data Shuffling 在分区的时候被广泛使用】

    看到这大家对这个算子应该有了更加深入的认识,那么再附上我的scala的一个小例

    子,同样是统计字母出现次数:

    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
     * mhc
     * Created by Administrator on 2016/5/17.
     */
    object MyTest {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("MyTestApp").setMaster("local[1]")
        val sc = new SparkContext(conf)
        val x = sc.parallelize(List("a", "b", "a", "a", "b", "b", "b", "b"))
        val s = x.map((_, 1))
        val result = s.reduceByKey((pre, after) => pre + after)
        println(result.collect().toBuffer)
    
      }
    }
    

    结果是:ArrayBuffer((a,3), (b,5)),很简单对吧。论坛给出了java和python的版本的,如下:

    Java:

    packagecom.backtobazics.sparkexamples;
    
    importjava.util.Arrays;
    
    importorg.apache.spark.api.java.JavaPairRDD;
    importorg.apache.spark.api.java.JavaRDD;
    importorg.apache.spark.api.java.JavaSparkContext;
    importorg.apache.spark.api.java.function.Function2;
    
    importscala.Tuple2;
    
    public classReduceByKeyExample {
        public static void main(String[] args) throws Exception {
            JavaSparkContext sc = new JavaSparkContext();
            
            //Reduce Function for sum Function2<Integer, Integer, Integer> reduceSumFunc = (accum, n) -> (accum + n);
            
            
            // Parallelized with 2 partitions JavaRDD<String> x = sc.parallelize(
                            Arrays.asList("a", "b", "a", "a", "b", "b", "b", "b"),
                            3);
            
            // PairRDD parallelized with 3 partitions// mapToPair function will map JavaRDD to JavaPairRDD JavaPairRDD<String, Integer> rddX = 
                            x.mapToPair(e -> newTuple2<String,Integer>(e, 1));
            
            // New JavaPairRDD JavaPairRDD<String, Integer> rddY = rddX.reduceByKey(reduceSumFunc);
            
            //Print tuples for(Tuple2<String, Integer> element : rddY.collect()){
                System.out.println("("+element._1+", "+element._2+")");
            }
        }
    }
    
    // Output:// (b, 5)// (a, 3) 
    python:

     Bazic reduceByKey example in python# creating PairRDD x with key value pairs>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
    ... ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)
    
    # Applying reduceByKey operation on x>>> y = x.reduceByKey(lambda accum, n: accum + n)
    >>> y.collect()
    [('b', 5), ('a', 3)]
    
    # Define associative function separately >>>def sumFunc(accum, n):
    ...     return accum + n
    ...
    >>> y = x.reduceByKey(sumFunc)
    >>> y.collect()
    [('b', 5), ('a', 3)]
    感谢大家捧场,客官慢走。

    展开全文
  • 2 Spark入门reduce、reduceByKey的操作

    万次阅读 2018-04-13 11:22:32
    上一篇是讲map,map的主要作用就是替换。...import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import...

    上一篇是讲map,map的主要作用就是替换。reduce的主要作用就是计算。

    package reduce;
    
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @author wuweifeng wrote on 2018/4/13.
     */
    public class SimpleReduce {
        public static void main(String[] args) {
            SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
            //spark对普通List的reduce操作
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
            List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
            JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
    
            Integer sum = originRDD.reduce((a, b) -> a + b);
            System.out.println(sum);
    
            //reduceByKey,按照相同的key进行reduce操作
            List<String> list = Arrays.asList("key1", "key1", "key2", "key2", "key3");
            JavaRDD<String> stringRDD = javaSparkContext.parallelize(list);
            //转为key-value形式
            JavaPairRDD<String, Integer> pairRDD = stringRDD.mapToPair(k -> new Tuple2<>(k, 1));
            List list1 = pairRDD.reduceByKey((x, y) -> x + y).collect();
            System.out.println(list1);
        }
    }

    代码很简单,第一个就是将各个数累加。reduce顺序是1+2,得到3,然后3+3,得到6,然后6+4,依次进行。

    第二个是reduceByKey,就是将key相同的键值对,按照Function进行计算。代码中就是将key相同的各value进行累加。结果就是[(key2,2), (key3,1), (key1,2)]

    展开全文
  • 1.热点key的数据倾斜在大数据相关的统计与处理中,热点key造成的数据倾斜非常常见也非常讨厌,经常会造成job运行时间变长或者造成job的OOM最后导致任务失败。例如在wordcount任务中,如果有一个word是热点词,出现的...

    项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
    欢迎大家star,留言,一起学习进步

    1.热点key的数据倾斜

    在大数据相关的统计与处理中,热点key造成的数据倾斜非常常见也非常讨厌,经常会造成job运行时间变长或者造成job的OOM最后导致任务失败。例如在wordcount任务中,如果有一个word是热点词,出现的次数很多,那么最后这个job的运行时间就是由这个热点词所在的task运行时间决定的。因此遇到这种热点问题,我们需要想办法改进代码,优化任务,提高最终的运行效率。

    2.实际case

    现在有这么一个简单的实际例子:
    hdfs上有一个名为"xxx"的路径,此路径下的数据量比较大,有几百G之多。现在我们想统计一下这个路径下所有文件的行数。
    如果数据量不大,在spark-shell中,可以用一行简单的代码解决问题:

    scala> sc.textFile("xxx").count()
    

    但是数据量大了以后,运行的速度很慢很慢,慢到不可接受;而且最后程序会报OOM退出,得不到最终的结果。那怎么办呢?

    3.通过将热点key打算做计算

    我们将上述需求稍微做一下转型:
    统计所有数据的行数,假设每一行对应的一个key就是"all",每一行的输出是"all, 1",最后需要做的就是简单的wordcount,针对all这个热点key,然后求和!
    这种我们明确知道热点key是啥的case,一般的做法是将热点key先打散,然后再聚回来!
    直接上代码:

        def linestats(sc: SparkContext) = {
            val inputpath = "xxx"
            sc.textFile(inputpath)
                .map(x => {
                    val randomNum = (new java.util.Random).nextInt(2000)
                    val allkey = randomNum + "_all"
                    (allkey, 1)
                })
                .reduceByKey((x, y) => x + y)
                .map(x => {
                    val (keywithrandom, num) = (x._1, x._2)
                    val key = StringUtils.split(keywithrandom, "_")(1)
                    (key, num.toLong)
                })
                .reduceByKey((x, y) => x + y)
                .map(x => "%s\t%s".format(x._1, x._2))
                .repartition(1)
        }
    

    上面代码的思路如下:
    1.第一步先将key打算,给所有"all"加上一个随机前缀。
    2.然后对带有随机前缀的key做第一次聚合,即reduceByKey操作,得出第一次聚合的结果。
    3.再将随机前缀去掉,做第二次聚合,即reduceByKey操作,得到最终的结果!

    展开全文
  • Spark中reduce和reducebykey

    千次阅读 2018-05-22 19:51:39
    首先我们先讲讲两个函数在功能上的作用与区别是什么,然后我们再深入讨论两个函数在内部机理有什么不同。reduce(binary_function) reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与...

    首先我们先讲讲两个函数在功能上的作用与区别是什么,然后我们再深入讨论两个函数在内部机理有什么不同。

    reduce(binary_function) 

    reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。

    具体过程,RDD有1 2 3 4 5 6 7 8 9 10个元素, 
    1+2=3 
    3+3=6 
    6+4=10 
    10+5=15 
    15+6=21 
    21+7=28 
    28+8=36 
    36+9=45 
    45+10=55

    reduceByKey(binary_function)

    reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

    那么讲到这里,差不多函数功能已经明了了,而reduceByKey的是如何运行的呢?下面这张图就清楚了揭示了其原理:

    亦即,它会在数据搬移以前,提前进行一步reduce操作。

    可以实现同样功能的还有GroupByKey函数,但是,groupbykey函数并不能提前进行reduce,也就是说,上面的处理过程会翻译成这样:

    所以在处理大规模应用的时候,应该使用reduceByKey函数。

    展开全文
  • 两者都会根据 key 来分组 不同点: reduceByKey:Transormation 类算子,根据用户传入的聚合逻辑对数组内的数据进行聚合, 懒策略, 延迟计算 countByKey:Action 类算子,不需要用户传入聚合逻辑,直接对数组内的数据...
  • spark sortBy sortByKey实战详解

    千次阅读 2020-09-19 16:59:24
    那么在spark中如何排序呢?我们来看一些很有代表性的例子。 1.最简单的排序 假设有个RDD[Int]类型的数据,需要按数据大小进行排序,那这个排序算最简单的: sc.parallelize(Array(1,3,2,4,6,5)).sortBy(x =&...
  • CDHwebUi-> YARN (MR2 Included) -> Configuration -> NodeManager Default Group -> Advanced 在 “NodeManager Advanced Configuration Snippet (Safety Valve) for yarn-site.xml” 弹出框中粘贴如下: ...
  • Spark中rdd按key进行join

    2019-08-06 00:04:16
    今天在Spark中使rdd按key进行join,最开始使用的key是元组(tuple),如((a,b),c) 结果,数据量较小时可正常运行,数据量较大时会报shuffle出错。 原因可见...
  • 由于有时候数据的列数很多,不只是按一项作为key来排序,有时候需要对其中两项进行排序,spark的RDD提供了keyBy的方法。使用场景例子为: init: (ab,2,3) (ac,4,100) (bb,1,200) (ac,1,1) (bb,2,5)sort: (ab,...
  • 今天记录一下spark里面的一些key-value对的相关算子。 key-value对可以简单理解为是一种认为构造的数据结构方式,比如一个字符串"hello",单看"hello"的话,它是一个字符串类型,现在假设我想把...
  • 上一讲主要降到了spark executor资源在Master的分配原理。今天来讲Spark Executor的创建和启动过程。创建的过程可以功过如下时序图表示: 在Standalone模式下,Backend.start()方法最终调用了...
  • Spark之reduceByKey详解

    2019-09-26 18:30:29
    Spark算子reduceByKey详解reduceByKey与groupByKey不同之处相同之处 reduceByKey与groupByKey 不同之处 reduceByKey,多了一个rdd,MapPartitionsRDD,存在于stage0的,主要是代表了进行本地数据规约之后的rdd,网络...
  • Spark系列2】reduceByKey和groupByKey区别与用法

    万次阅读 多人点赞 2015-11-21 16:49:27
    spark中,我们知道一切的操作都是基于RDD的。在使用中,RDD有一种非常特殊也是非常实用的format——pair RDD,即RDD的每一行是(key, value)的格式。这种格式很像Python的字典类型,便于针对key进行一些处理。 ...
  • spark dataframe dataset reducebykey用法

    千次阅读 2017-09-27 17:17:42
    case class Record(ts: Long, id: Int, value: Int)如果是rdd,我们经常会用reducebykey获取到最新时间戳的一条记录,用下面的方法def findLatest(records: RDD[Record])(implicit spark: SparkSession) = { ...
  • 一.org.apache.spark.shuffle.FetchFailedException 1.问题描述 这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,非常的耗时。 2.报错提示 (1) missing ...
  • spark 中reduceByKey()算子,记录下 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object KeyValue02_reduceByKey { def main(args: Array[String]): Unit = { //1.创建...
  • 如题, spark:在reduceByKey中,怎么获取到key的值
  • 一、提高并行度 实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源。...对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度...
  • Spark源码之reduceByKey与GroupByKey

    万次阅读 2017-07-26 09:53:15
    Spark中针对键值对类型的RDD做各种操作比较常用的两个方法就是ReduceByKey与GroupByKey方法,下面从源码里面看看ReduceByKey与GroupByKey方法的使用以及内部逻辑。官方源码解释:三种形式的reduceByKey总体来说下面...
1 2 3 4 5 ... 20
收藏数 51,311
精华内容 20,524
关键字:

key spark