key出现倾斜 spark_spark数据倾斜 拆分key - CSDN
  • 一:概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题...**方案适用场景:**对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。 方案实现思...

    一:概述

    有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

    二、产生原因

    **方案适用场景:**对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

    方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

    方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

    方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

    方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。
    在这里插入图片描述

    三、代码

    import org.apache.spark.{SparkConf, SparkContext}
    import scala.util.Random
    
    object Demo {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setAppName("Demo").setMaster("local[2]")
        val sc=new SparkContext(conf)
    
        //准备数据
        val array=new Array[Int](10000)
        for (i <-0 to 9999){
          array(i)=new Random().nextInt(10)
        }
        //array.foreach(x=>print(x+","))
        //生成一个rdd
        val rdd=sc.parallelize(array)
        //数据量很大就先取样
        //rdd.sample(false,0.1)
        //所有key加一操作
        val maprdd=rdd.map((_,1))
        //没有加随机前缀的结果
          maprdd.countByKey.foreach(print)
        //(0,976)(5,997)(1,966)(6,959)(9,1004)(2,1051)(7,973)(3,1036)(8,1022)(4,1016)
    
        //val wc=rdd.map(x=>(x,1)).reduceByKey(_+_)
        //wc.foreach(print)
        //(4,1016)(0,976)(6,959)(8,1022)(2,1051)(1,966)(3,1036)(7,973)(9,1004)(5,997)
    
        //两阶段聚合(局部聚合+全局聚合)处理数据倾斜
    
        //加随机前缀,文章评论有正确代码
        val prifix=new Random().nextInt(10)
        val prifixrdd=maprdd.map(x=>(prifix+"_"+x._1,x._2))
    
        //加上随机前缀的key进行局部聚合
        val tmprdd=prifixrdd.reduceByKey(_+_)
        //去除随机前缀
        val newrdd=tmprdd.map(x=> (x._1.split("_")(1),x._2))
        //最终聚合
        newrdd.reduceByKey(_+_).foreach(print)
        //(4,1016)(7,973)(5,997)(9,1004)(8,1022)(6,959)(0,976)(3,1036)(2,1051)(1,966)
      }
    }
    
    展开全文
  • 1.热点key的数据倾斜在大数据相关的统计与处理中,热点key造成的数据倾斜非常常见也非常讨厌,经常会造成job运行时间变长或者造成job的OOM最后导致任务失败。例如在wordcount任务中,如果有一个word是热点词,出现的...

    项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
    欢迎大家star,留言,一起学习进步

    1.热点key的数据倾斜

    在大数据相关的统计与处理中,热点key造成的数据倾斜非常常见也非常讨厌,经常会造成job运行时间变长或者造成job的OOM最后导致任务失败。例如在wordcount任务中,如果有一个word是热点词,出现的次数很多,那么最后这个job的运行时间就是由这个热点词所在的task运行时间决定的。因此遇到这种热点问题,我们需要想办法改进代码,优化任务,提高最终的运行效率。

    2.实际case

    现在有这么一个简单的实际例子:
    hdfs上有一个名为"xxx"的路径,此路径下的数据量比较大,有几百G之多。现在我们想统计一下这个路径下所有文件的行数。
    如果数据量不大,在spark-shell中,可以用一行简单的代码解决问题:

    scala> sc.textFile("xxx").count()
    

    但是数据量大了以后,运行的速度很慢很慢,慢到不可接受;而且最后程序会报OOM退出,得不到最终的结果。那怎么办呢?

    3.通过将热点key打算做计算

    我们将上述需求稍微做一下转型:
    统计所有数据的行数,假设每一行对应的一个key就是"all",每一行的输出是"all, 1",最后需要做的就是简单的wordcount,针对all这个热点key,然后求和!
    这种我们明确知道热点key是啥的case,一般的做法是将热点key先打散,然后再聚回来!
    直接上代码:

        def linestats(sc: SparkContext) = {
            val inputpath = "xxx"
            sc.textFile(inputpath)
                .map(x => {
                    val randomNum = (new java.util.Random).nextInt(2000)
                    val allkey = randomNum + "_all"
                    (allkey, 1)
                })
                .reduceByKey((x, y) => x + y)
                .map(x => {
                    val (keywithrandom, num) = (x._1, x._2)
                    val key = StringUtils.split(keywithrandom, "_")(1)
                    (key, num.toLong)
                })
                .reduceByKey((x, y) => x + y)
                .map(x => "%s\t%s".format(x._1, x._2))
                .repartition(1)
        }
    

    上面代码的思路如下:
    1.第一步先将key打算,给所有"all"加上一个随机前缀。
    2.然后对带有随机前缀的key做第一次聚合,即reduceByKey操作,得出第一次聚合的结果。
    3.再将随机前缀去掉,做第二次聚合,即reduceByKey操作,得到最终的结果!

    展开全文
  • 一. 数据倾斜的现象多数task执行速度较快,少数task执行时间非常长...数据问题key本身分布不均匀(包括大量的key为空)key的设置不合理spark使用问题shuffle时的并发度不够计算方式有误三. 数据倾斜的后果spark中一个s...

    一. 数据倾斜的现象

    多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。

    二. 数据倾斜的原因

    常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作。

    数据问题

    1. key本身分布不均匀(包括大量的key为空)
    2. key的设置不合理

    spark使用问题

    1. shuffle时的并发度不够
    2. 计算方式有误

    三. 数据倾斜的后果

    1. spark中一个stage的执行时间受限于最后那个执行完的task,因此运行缓慢的任务会拖累整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。
    2. 过多的数据在同一个task中执行,将会把executor撑爆,造成OOM,程序终止运行。

    一个理想的分布式程序: 
    理想的分布式程序

    发生数据倾斜时,任务的执行速度由最大的那个任务决定: 
    发生数据倾斜

    四. 数据问题造成的数据倾斜

    发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。

    找出异常的key

    如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。

    选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个

    df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)
    
    • 1
    • 2

    如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。

    经过分析,倾斜的数据主要有以下三种情况:

    1. null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
    2. 无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
    3. 有效数据,业务导致的正常数据分布。

    解决办法

    第1,2种情况,直接对数据进行过滤即可。

    第3种情况则需要进行一些特殊操作,常见的有以下几种做法。

    1. 隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。
    2. 对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
    3. 使用reduceByKey 代替 groupByKey
    4. 使用map join。

    举例:

    如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作如下:

    1. 将原始的 key 转化为 key + 随机值(例如Random.nextInt)
    2. 对数据进行 reduceByKey(func)
    3. 将 key + 随机值 转成 key
    4. 再对数据进行 reduceByKey(func)

    tip1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。

    tips2: 单独处理异常数据时,可以配合使用Map Join解决。

    五. spark使用不当造成的数据倾斜

    1. 提高shuffle并行度

    dataFramesparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。 
    rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。

    局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。

    2. 使用map join 代替reduce join

    在小表不是特别大(取决于你的executor大小)的情况下使用,可以使程序避免shuffle的过程,自然也就没有数据倾斜的困扰了。

    局限性: 因为是先将小数据发送到每个executor上,所以数据量不能太大。

    具体使用方法和处理流程参照:

    Spark map-side-join 关联优化

    spark join broadcast优化

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lsshlsw/article/details/52025949
    文章标签: spark数据倾斜
    个人分类: Spark
    展开全文
  • SparkStreaming解决数据倾斜方法 两阶段聚合的方式解决数据倾斜 解释: 对DStream 进行map操作对原始key前加上随机值,map完后进行第一次reducebykey操作,此结果为打散key后的reducebykey结果,再次进行map操作...

    SparkStreaming解决数据倾斜方法
    两阶段聚合的方式解决数据倾斜

    在这里插入图片描述

    解释:
    对DStream 进行map操作对原始key前加上随机值,map完后进行第一次reducebykey操作,此结果为打散key后的reducebykey结果,再次进行map操作根据分隔符,去掉随机数保留原有key,map后再进行reducebykey,保证相同key的数据准确累加。

    代码实现

        val dsStream=stream.filter(item => item.value().split("\t").length == 3)//过滤合格的数据
        .mapPartitions(partitions => //对所有分区的数据进行转换
            partitions.map(item => {
              val rand = new Random() //创建随机数对象
              val line = item.value() //获取value
              val arr = line.split("\t")  //切割
              val id = arr(1)       //取第二个元素
              (rand.nextInt(3) + "_" + id, 1)    //随机数与app_id进行拼接,并且返回二元组
            }))
        val result = dsStream.reduceByKey(_ + _)     //重组之后的第一次reduceByKey
        result.map(item => {                                     //进行map转换把key切割还原
          val id = item._1.split("_")(1)
          (id, item._2)
        }).reduceByKey(_ + _).print()                //还原之后进行第二次聚合
    
    展开全文
  • Spark数据倾斜的完美解决

    万次阅读 2018-05-18 16:54:08
    数据倾斜解决方案数据倾斜的解决,跟之前讲解的性能调优,有一点...6.1、原理以及现象分析6.1.1、数据倾斜怎么出现的在执行shuffle操作的时候,是按照key,来进行values的数据的输出、拉取和聚合的。同一个key的valu...
  • sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long) sample算子时用来抽样用的,其有3个参数 withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着...
  • spark做大数据处理,不怕数据大,就怕发生数据倾斜,一发生数据倾斜,轻则spark job要跑很久才能结束,重则OOM,把一个executor的存储空间撑爆,导致程序终止。 一个spark job 是由多个stage组成的 ,stage之间...
  • 在大数据处理过程中常常出现数据倾斜(Data Skew)。那么,数据倾斜会造成什么问题呢?为什么要处理数据倾斜? 什么是数据倾斜? 对 Spark/Hadoop 这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。何谓...
  • 解决Spark数据倾斜 1、先用sample(false,0,x)采用key,找出倾斜key 2、将数据集拆分成倾斜部分和不倾斜部分 3、不倾斜部分走正常路线 4、倾斜部分前面加上前缀 5、重分区 => 聚合 => 去掉前缀 => 聚合 6...
  • Spark:shuffle数据倾斜

    2019-01-21 15:01:58
    数据倾斜 Shuffle的时候,将各个节点上相同的key拉取到...因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。 数据倾斜的解决方案 方案一:使用Hive...
  • Spark:对数据倾斜的八种处理方法

    千次阅读 2018-10-17 23:16:47
    3. 导致Spark数据倾斜的本质 4. 定位最慢的Task所处的源码位置 5. 解决方案 方案一:使用Hive ETL预处理 方案二:过滤导致倾斜key 方案三:提高Shuffle操作并行度 方案四:两阶段聚合(局部聚合+全局聚合) ...
  • 项目中经常会使用到Spark进行批处理,数据量大的时候总是会遇到数据倾斜的情况,参考了项目中遇到的情况以及网上的一些案例,写篇文章来总结下如何处理Spark中常见的数据倾斜问题。当然由于业务场景不一样,本文说到...
  • spark数据倾斜处理

    2018-12-13 11:18:53
    本篇文章属于转载 原文出处 ...调优概述 有的时候,我们可能会遇到...数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。 数据倾斜发生时的现象 1、绝大多数task执行得都非常快...
  • 一. 数据倾斜的现象 多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。...key本身分布不均匀(包括大量的key为空)key的设置不合理 spark使用问题 shuffle时的并发
  • Spark数据倾斜及解决方案

    千次阅读 2018-10-09 16:02:27
    一,数据倾斜现象及原理 发生数据倾斜以后的现象: 1、你的大部分的task,都执行的特别特别快,刷刷刷,就执行完了(你要用client模式,standalone client,yarn client,本地机器只要一执行spark-submit脚本,就会...
  • 第139课: Spark面试经典系列之数据倾斜解决之对于两个RDD数据量都很大且倾斜Key特别多如何解决?如果两个RDD的数据量都特别大而且倾斜Key特别多如何解决:数据量特别大就无法把其中的一个RDD广播出去;如果...
  • Sparkkey加随机尾串解决数据倾斜问题(Java/Scala版) 通过给key加随机尾串,使得相同key加上随机尾串后的hash值不相等,在聚合操作的时候实现进入不同的分区,实现数据倾斜问题解决方式之一 详情请看代码注释 ...
1 2 3 4 5 ... 20
收藏数 4,367
精华内容 1,746
关键字:

key出现倾斜 spark