精华内容
下载资源
问答
  • Spark数据倾斜

    2019-06-25 20:38:23
    1.数据倾斜的概念 (这个不用说了都懂)略 2.发生数据倾斜的现象 个别的task运行时间明显长于其他的task 发生OOM异常 3.发生数据倾斜的原因 spark只要是发生数据倾斜必然经历了shuffle,也就是shuffle是数据倾斜的...

    Spark的数据倾斜

    1.数据倾斜的概念

    (这个不用说了都懂)略

    2.发生数据倾斜的现象

    • 个别的task运行时间明显长于其他的task
    • 发生OOM异常

    3.发生数据倾斜的原因

    spark只要是发生数据倾斜必然经历了shuffle,也就是shuffle是数据倾斜的必要条件

    4.发生数据倾斜之后的解决方案

    1.提高并行度
    程序运行缓慢,第一反应大多是资源分配不足,并行度不够。提高并行度是我们做数据倾斜调优的第一
    步尝试,提高并行度会在一定程度上减轻数据倾斜的压力,但是并不能从彻底上根除数据倾斜。因为一
    旦发生数据倾斜,倾斜的key无论如何提高并行度,经过shuffle操作都会直到一个分区中去。
    如何提高并行度?两个地方进行设置。

    1. spark.default.parallelism 设置spark程序全局并行度
    2. shuffle操作的第二个参数进行设置(局部)并行度
      在这里插入图片描述

    2.过滤掉发生数据倾斜的key
    找到导致数据倾斜的key的值,通过和业务人员进行沟通,确定该key值是否有价值,如果没有价值的话将这个key直接过滤掉即可,但是如果这个key是有效的话千万万不能搞掉,否则后果自负

    3.Hive ETL中做预处理
    这个处理的方法,主要在于Spark作业加载hive表中的数据,进行业务处理。加入hive的数据有倾斜现
    象,在spark中的处理,自然会出现dataskew。而如果spark作业一般只是想web 端提供查询服务,针
    对这种情况就比较适合这个解决方法。
    Hive ETL预处理,数据倾斜的现象在hive中提前被处理,这样加载到spark中的数据有倾斜吗?没有!
    此时spark给web服务端只提供一个查询服务,所以没有的数据倾斜,效率非常高!只不过此时将数据
    倾斜解决掉了吗?是把spark端的dataskew转移到hive中,这是甩锅的行为,个人不推荐,毕竟不人道

    4.进行两阶段的聚合
    两阶段聚合操作,指的是局部聚合+全局聚合。该方法适合于哪些XxxxByKey的操作,比如
    groupByKey、reduceByKey的聚合操作.
    核心思路就是:某个key的量特别的大,如果直接进行聚合操作会灰常的困难,因为这个key会进入到XXXbykey之后的某个特定的分区内,这个ResultTask的任务量是相当大的,那么可以先将者这些key分成两部分(加上前缀,类似hbase的加盐),两部分分别进行一次聚合的操作,这样何有可能这此次聚合的实时两部分可以由两个ResultTask分别去完成,这样的话执行的速度就会上升.之后再将加的前缀去掉,再进行一次xxxbykey的操作,这样就完成了.一般情况是可以解决数据倾斜的

    好吧搞上一个例子

    import java.util.Random
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    /**
      * 当出现数据倾斜的时候,如果是XXXBykey的shuffle的过程出现的数据倾斜的话可以使用双重聚合的方法进行小狐狸数据倾斜
      * 处理的过程如下:
      * 1.通过sample算子找出数据量最大的key
      * 2.对这个key进行加上一个指定范围内的随机数的前缀
      * 3.做一次xxxByKey的聚合操作
      * 4.将前缀去掉
      * 5.再搞一次xxxByKey的聚合
      */
    object _05DoubleTogether {
      def main(args: Array[String]): Unit = {
        var conf = new SparkConf()
          .setAppName("_05DoubleTogether".getClass.getSimpleName)
          .setMaster("local[2]")
        var sc = new SparkContext(conf)
        doubleTogether(sc)
        Thread.sleep(100000)
        sc.stop()
      }
    
      def doubleTogether(sc: SparkContext): Unit = {
        val list = List(
          "hello hello hello hello you you hello",
          "hello hello hello you you hei hei hello hello hello"
        )
        val listRDD = sc.parallelize(list)
        val pairsRDD: RDD[(String, Int)] = listRDD.flatMap(line => {
          line.split("\\s+")
    
        }).map((_, 1))
        //第一步找出数据倾斜的key
        val abnormalData: collection.Map[String, Long] = pairsRDD.sample(true, 0.8).countByKey().take(1)
        val abnoramlDataKey: String = abnormalData.head._1
        //第二步,找出所有的异常的key进行加随机的前缀的方式
        val firstTogetherRDD: RDD[(String, Int)] = pairsRDD.map { case (word, num) => {
          var random = new Random()
          if (word == abnoramlDataKey) {
            val pre: Int = random.nextInt(2)
            (s"${pre}_${word}", num)
    
          } else {
            (word, num)
          }
        }
        }
        //第三步进行局部的聚合
        val firstTogetherResultRDD: RDD[(String, Int)] = firstTogetherRDD.reduceByKey(_ + _)
        //第四去掉前缀word
        val deletePreRDD: RDD[(String, Int)] = firstTogetherResultRDD.map { case ((word, num)) => {
          //注意这里不能这样写,因为当出现没有加上前缀的word的时候word.indexOf("_")的返回值是-1,
          //word.substring(0, word.indexOf("_"))会报错java.lang.StringIndexOutOfBoundsException: String index out of range: -1
          //if (word.substring(0, word.indexOf("_")) == "1" | word.substring(0, word.indexOf("_")) == "0") {
          if (word.contains("_")) {
            // println("**********************************")
            (word.substring(word.indexOf("_") + 1), num)
          } else {
            (word, num)
          }
        }
        }
        //第五  第二次进行聚合
        val ResultRDD: RDD[(String, Int)] = deletePreRDD.reduceByKey(_ + _)
        ResultRDD.foreach { case (word, count) => {
          println(s"${word}---------------->${count}")
        }
        }
      }
    }
    
    

    结果

    you---------------->4
    hello---------------->11
    hei---------------->2
    

    5.使用mapjoin代替reducejoin
    这个操作主要是针对join类的聚合操作,多表关联,前提条件是大小表关联。
    所谓reduce-join操作就是很直白的调用join算子,执行操作,这个过程是有shuffle的。
    所谓map-join操作呢,将小表广播到各个executor,在map类算子中完成关联操作。如果出现一张中表和一张大表的情况,也可以将中表瘦身成小表之后进行mapjoin
    这个操作,请问,从根本上解决了数据倾斜了没有?从根本上解决了数据倾斜,因为有map-join代替
    reduce-join没有shuffle操作,肯定就没有数据倾斜了。
    代码如下:

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.rdd.RDD
    
    /**
      * 测试广播变量
      * 使用一个案例就是小表关联大表的操作,小表就可以使用广播变量,大表在节点上就可以直接使用广播变量中的小表进行操作了
      */
    object _07boradcast {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf()
          .setAppName("_07boradcast".getClass.getSimpleName)
          .setMaster("local[2]")
        val sc=new SparkContext(conf)
        val resultRDD: RDD[(String, String)] = joinops(sc,"file:///D:/aaa/class.txt","file:///D:/aaa/score.txt")
        resultRDD.foreach(println)
    
        /**
          * (赵六,100,100,2班)
          * (花花,90,90,2班)
          * (菲菲,40,40,2班)
          * (张三,100,100,1班)
          * (李四,90,90,1班)
          * (王五,50,50,1班)
          */
    
      }
    
      /**
        *
        * @param sc           上下文对象
        * @param minTablePath 小表的关联的路径
        * @param bigTablePath 大表的关联的路径
        * class.txt
        *                     张三,1班
        *                     李四,1班
        *                     王五,1班
        *                     赵六,2班
        *                     花花,2班
        *                     菲菲,2班
        *
        *  score.txt
        *                     张三,100,100
        *                     李四,90,90
        *                     王五,50,50
        *                     赵六,100,100
        *                     花花,90,90
        *                     菲菲,40,40
        * 需求:将量表进行关联输出--->姓名,分数,分数,班级
        */
      def joinops(sc:SparkContext,minTablePath:String,bigTablePath:String): RDD[(String, String)] = {
        val fileRDD: RDD[String] = sc.textFile(minTablePath)
        //将读取到的RDD的小表的内容collect到dirver端,之后进行加入广播变量
        val class_map: Map[String, String] = fileRDD.map(line => {
          val lines: Array[String] = line.split("\\,")
          (lines(0), lines(1))
        }).collect.toMap
    
    
        //将class的表的数据加入广播变量
        val broadcast_class: Broadcast[Map[String, String]] = sc.broadcast(class_map)
        //加载score.txt的数据
        val scoreRDD: RDD[String] = sc.textFile(bigTablePath)
        val resultRDD: RDD[(String, String)] = scoreRDD.map(line => {
          val lines: Array[String] = line.split("\\,")
          var name = lines(0)
          var math = lines(1)
          var chinese = lines(2)
          //从广播变量中将内容取出来
          val class_stu: Map[String, String] = broadcast_class.value
          var class_info = class_stu.getOrElse(name, null)
          (name, math +","+ chinese +","+ class_info)
        })
        resultRDD
    
      }
    
    }
    
    

    6.key进行分拆
    如果有两张大表join,一张正常,但是另外一张有key非常的多怎么办呢
    思路如图
    在这里插入图片描述
    具体描述结合代码呈现

    import java.util.Random
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    import scala.collection.mutable.ArrayBuffer
    
    /**join 操作的是两张大表,分别是A表和B表
      * 但是A表中有一个key特别的多,join的过程中会出现数据的倾斜
      * 怎么办呢?\
      * 思路如下:
      * 1,找到A表中的那个key
      * 2.将这个Key的数据从A表中抽取出来,这样A表变成了两张表A-1(那个key)和A-2,对B表也将这个key的值进行抽取出来B表也变成了B-1(那个key)和B-2两张表
      * 3.将A-1表的key进行随机添加指定范围的前缀,
      * 4.B-1表进行相应倍数的扩容,也就是按照第三步的添加范围进行相应倍数扩容,比如3步随机添加0和1的前缀,那么这一步就扩容2倍
      * 5.A_1和B-1进行join操作  A-2和B-2进行join操作
      * 6.将A_1和B-1进行join操作之后的结果去掉前缀
      * 7.将第6步的结果和A-2和B-2进行join操作结果进行union操作得到最终的结果
      *
      *
      */
    object _06TwoBigTabletogether {
      def main(args: Array[String]): Unit = {
        var conf=new SparkConf().setAppName("_06TwoBigTabletogether".getClass.getSimpleName).setMaster("local[2]")
        var sc =new SparkContext(conf)
        toBigTableTogether9(sc)
        sc.stop()
      }
      def toBigTableTogether9(sc:SparkContext): Unit ={
        //数据如下:
        val left = List(
          ("hello", 1),
          ("hello", 2),
          ("hello", 3),
          ("you", 1),
          ("me", 1),
          ("you", 2),
          ("hello", 4),
          ("hello", 5)
        )
        val right = List(
          ("hello", 11),
          ("hello", 12),
          ("you", 11),
          ("me", 12)
        )
        val leftRDD: RDD[(String, Int)] = sc.parallelize(left)
        val rightRDD: RDD[(String, Int)] = sc.parallelize(right)
        //1.找出left表中的异常的key
        val abnormalkey: String = leftRDD.sample(true,0.8).countByKey().take(1).head._1
        //2.将left表中和right表中的key的数据抽取出来形成不同的表
        val leftAbnormalRDD: RDD[(String, Int)] = leftRDD.filter(_._1==abnormalkey)
        val leftNormalRDD: RDD[(String, Int)] = leftRDD.filter{case (word,count)=>{word!=abnormalkey}}
        val rightAbnormalRDD: RDD[(String, Int)] = rightRDD.filter(_._1==abnormalkey)
        val rightNormalRDD: RDD[(String, Int)] = rightRDD.filter{case (word,count)=>{word!=abnormalkey}}
        //3.对A-1进行家加上随机的前缀  0  1
        val leftPreAbnormalRDD: RDD[(String, Int)] = leftAbnormalRDD.map { case (word, count) => {
          var random = new Random()
          (s"${random.nextInt(2)}_${word}", count)
        }
        }
        //4.B-1进行相应的扩容
        val expandRightAbnomalRDD: RDD[(String, Int)] = rightAbnormalRDD.flatMap { case (word, count) => {
          var ab = new ArrayBuffer[(String, Int)]()
          for (i <- 0 to 1) {
            ab.append((s"${i}_${word}", count))
          }
          ab
        }
        }
        //5.A_1和B-1进行join操作  A-2和B-2进行join操作
        val preAbnormalJoinRDD: RDD[(String, (Int, Int))] = leftPreAbnormalRDD.join(expandRightAbnomalRDD)
        val NormalJoinRDD: RDD[(String, (Int, Int))] = leftNormalRDD.join(rightNormalRDD)
        //6.将preAbnormalJoinRDD中的数据的前缀干掉
        val NopreAbnormalRDD: RDD[(String, (Int, Int))] = preAbnormalJoinRDD.map { case (word, count) => {
          (word.substring(2), count)
        }
        }
        //7.进行union
        val ResultRDD: RDD[(String, (Int, Int))] = NopreAbnormalRDD.union(NormalJoinRDD)
        ResultRDD.foreach{case(word,count)=>{
          println(s"${word}------------------>${count}")
        }}
      }
    }
    
    

    运行结果如下:

    hello------------------>(2,11)
    hello------------------>(1,11)
    hello------------------>(2,12)
    hello------------------>(1,12)
    hello------------------>(3,11)
    hello------------------>(5,11)
    hello------------------>(3,12)
    hello------------------>(5,12)
    hello------------------>(4,11)
    hello------------------>(4,12)
    you------------------>(1,11)
    you------------------>(2,11)
    me------------------>(1,12)
    
    展开全文
  • Hive数据倾斜及处理① 数据倾斜的概念及原因1.1 数据倾斜数据倾斜就是数据的分布不平衡,某些地方特别多,某些地方又特别少,导致的在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务...

    Hive数据倾斜及处理

    ① 数据倾斜的概念及原因

    1.1 数据倾斜

    数据倾斜就是数据的分布不平衡,某些地方特别多,某些地方又特别少,导致的在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务最终迟迟无法完成,这种现象就是数据倾斜。

    针对mapreduce的过程来说就是,有多个reduce,其中有一个或者若干个reduce要处理的数据量特别大,而其他的reduce处理的数据量则比较小,那么这些数据量小的reduce很快就可以完成,而数据量大的则需要很多时间,导致整个任务一直在等它而迟迟无法完成。

    跑mr任务时常见的reduce的进度总是卡在99%,这种现象很大可能就是数据倾斜造成的。

    1.2 产生数据倾斜的原因

    key分布不均匀

    上面就说过,数据倾斜是因为reduce的数据量大小差异过大,而reduce的数据是分区的结果,分区是对key求hash值,根据hash值决定该key被分到某个分区,进而进入到某个reduce,而如果key很集中或者相同,那么计算得到它们的hash值可能一样,那么就会被分配到同一个reduce,就会造成这个reduce所要处理的数据量过大。

    业务数据本身的特性

    比如某些业务数据作为key的字段本就很集中,那么结果肯定会导致数据倾斜啊。

    还有其他的一些原因,但是,根本原因还是key的分布不均匀,而其他的原因就是会造成key不均匀,进而导致数据倾斜的后果,所以说根本原因是key的分布不均匀。

    1.3 数据倾斜的表现

    任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。

    单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。 最长时长远大于平均时长。

    ② 数据倾斜的解决办法

    907733e4dd5c1cefaeee2a3315b7d772.png

    2.1 设置参数

    hive.map.aggr = true    // Map 端部分聚合,相当于Combiner;

    hive.groupby.skewindata=true    //

    有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

    2.2 SQL语句优化

    大小表join

    使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce。

    大表Join大表

    把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

    count distinct大量相同特殊值

    count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

    特殊情况特殊处理

    在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

    参考链接:https://www.jianshu.com/p/42be5ca8b11d

    参考链接:https://blog.csdn.net/xinzhi8/article/details/71455883

    参考链接:https://blog.csdn.net/qq_35036995/article/details/80298403

    展开全文
  • spark优化之数据倾斜

    2017-08-30 14:47:11
    数据倾斜的概念  有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时spark作业的性能会比期望的差的多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题以保证spark作业的性能...
    数据倾斜的概念
       有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时spark作业的性能会比期望的差的多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题以保证spark作业的性能
       绝大多数task执行的都非常快,但个别task执行极慢,比如,总共有1000个task,997个task都在一分钟内执行完了,但是剩余两三个taks需要一两个小时,这种情况很常见。
       原本能够正常执行的spark作业,某天突然报出OOM(内存溢出)异常,观察异常 栈,是我们写的业务代码造成的,这种情况比较少见。
    数据倾斜发生的原理
       数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。
    如何定位导致数据倾斜的代码
       数据倾斜只会发生在shuffle过程中,这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:groupByKey,reduceByKey,join,cogroup,repartition等。出现数据倾斜时,可能是你的使用了这些算子中某一个所导致的。可以使用rdd.sample(false,0.1).countByKey的方式随机抽查出倾斜的数据。
    数据倾斜的解决方案
    方案一:使用Hive Etl预处理数据
    方案使用场景:导致数据倾向性的是Hive表,如果hive表中的数据本身很不均匀。
    方案实现思路:此时可以评估一下,是否可以通过hive来进行数据预处理(即通过hive etl 预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在spark作业中针对的数据源就不是原来的hive表了,而是处理后的hive表,此时由于预先进行过聚合或join操作了,那么在spark作业中就不需要使用原先的shuffle类算子执行这类操作了。
    方案实现原理:这种方案从根源上解决了数据倾斜,因此彻底避免了在spark中执行shuffle类算子,那么肯定不会有数据倾斜的问题了。但是这里也要提醒下大家,这种方式治标不治本,因为毕竟数据本身就存在不均匀问题,所以hive etl 中进行group by 或 join等shuffle操作时,还是会出现数据倾斜,导致hive etl的速度很慢,我们只是把数据倾斜的发生提前到了hive etl中,避免spark程序发生数据倾斜而已。
    方案优点:实现起来简单便捷,效果还非常好,完全避掉了数据倾斜,spark作业性能会大幅度提升。
    解决方案二:过滤少量导致数据倾斜的key
    方案使用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种解决方案,比如99%的key就对应10条数据,但是只有一个key对应了100w万数据,从而导致了数据倾斜。
    方案解决思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么可以使用sample算子对rdd进行采样,然后计算出key的数量,取数据量多的key过滤掉即可。
    方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不对参与计算了,自然不能产生数据倾斜。
    方案优缺点:实现简单,而且效果也很好,可能完全避掉数据倾斜,使用场景不多,大多数情况下,导致倾斜的key还是很多,并不算是只有少数几个。
    方案三:提高shuffle操作的并行度
    方案使用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这些方案,因此这些是处理数据最简单的一种方案。
    方案实现思路:在rdd进行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(10000)该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于spark sql中的shuffle类语句,比如group by ,join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task 的并行度,默认值为200,对于很多场景来说都是有点过小。
    方案实现原理:增加shuffle read task 的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据,二增加了shuffle read task 以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。
    方案优点:实现起来比较简单,可以有效的缓解和减轻数据倾斜的影响。
    方案缺点:只是缓解了数据倾斜而已,并没有彻底根除了问题,根据实战经验来看,其效果有限。
    解决方案四:两阶段聚合(局部聚合+全局聚合)
    方案试用场景:对rdd执行reduceByKey等聚合类shuffle算子或在spark sql中使用group by 语句进行了分组聚合是,比较适用这种方案。
    方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了。
    方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,在次进行全局聚合,就可以得到最终的结果。
    方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的,通常都可以解决掉数据倾斜,或者至少是大幅度的解除数据倾斜,将spark作业的性能提升数倍以上
    方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对比较窄,如果join类的shuffle操作,还得用其他的解决方案。



    展开全文
  • 现在我们来定义一下数据倾斜的概念:一批数据中相同key的数据过多而导致其他reducetask跑完,而一个reducetask迟迟跑不完,甚至触发OOM的现象,称为数据倾斜。 在面试的时候,就算没有遇到过这个问题,我们也要回答...

    1、在项目中有遇到过数据倾斜吗?

    首先要知道什么是数据倾斜。我们知道,在执行shuffle操作的时候,相同的key对应的value一定会被分配到同一个reducetask中去处理,所以当一批数据中相同key的数据过多,就会导致一个reducetask迟迟跑不完。现在我们来定义一下数据倾斜的概念:一批数据中相同key的数据过多而导致其他reducetask跑完,而一个reducetask迟迟跑不完,甚至触发OOM的现象,称为数据倾斜。

    在面试的时候,就算没有遇到过这个问题,我们也要回答遇到过,因为数据倾斜是大数据中非常常见的问题,如果你回答没有遇到过,那面试官可能会觉得你经验不够丰富,另一方面,很明显面试官想考考你这方面的知识,你回答没有,那不就把话题聊死了!那还这么面?所以我们必须要回答遇到过,然后用下面的知识在面试官面前装逼。

    2、你在项目中的如何解决这个问题的?

    2.1 定位发生数据倾斜的位置

    如果发生数据倾斜,99.9%的可能性是因为shuffle操作将大量相同key的数据分配到同一个reducetask中去处理了(剩下0.1%看天)。所以我们首先要定位发生数据倾斜的位置。刚刚说了,发生数据倾斜是因为产生了shuffle操作,所以第一步就是找产生shuffle的RDD,要定位数据倾斜的位置,当然是去查log了,或者打开UI界面,看看是哪一个stage产生很慢,这个stage前面的ShuffledRDD,就是产生数据倾斜的罪魁祸首了。

    2.2 对症下药

    虽然产生数据倾斜的原因大致是一样的,但是也有些差别,同样处理方式也有些差别,下面我将逐个介绍在哪些地方可以处理数据倾斜。

    2.2.1 源数据发生数据倾斜

    Spark作业的来源通常是经过Hive ETL后放在HDFS中的数据,这里就可能会造成数据倾斜的问题,比如一个key对应10w条数据,而另一个key对应了100w条数据,那你跑spark任务的时候,肯定就会产生数据倾斜了。

    解决办法

    (1) ETL端做聚合
    首先,针对这种情况,我们一般可以通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join,然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

    但是,Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中。

    (2) 对key做粒度细化
    其次,我们可以选择对key做粒度细化,比如key原先是城市-人口,每一个key都对应着1000w人,现在我们将key进行粒度细化,使它变成县城-人口,这样每个key就对应着100w人,这也是一个解决办法。

    (3)过滤导致数据倾斜的key
    直接过滤掉那些引起倾斜的Key。这种方法很简单,既然你倾斜,那我不用你就完事。比如说,总共有100万个key。只有2个key,是数据量达到10 万的。其他所有的key,对应的数量都是几十,这样join后会引起倾斜。这个时候,自己可以去取舍,如果业务和需求可以理解和接受的话,在从hive 表查询源数据的时候,直接在sql中用where条件,过滤掉某几个key 。那么这几个原先有大量数据,会导致数据倾斜的key,被过滤掉之后,那么在的spark作业中,自然就不会发生数据倾斜了。

    2.2.2 shuffle过程发生数据倾斜

    如果是因为使用了shuffledRDD(如groupBykey、reduceBykey)导致了数据倾斜则可以使用下面的解决办法。

    解决办法

    (1)简单粗暴但有效—提高shuffle的操作并行度
    在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等。

    需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

    其原理很简单,增加shuffle read task的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task , 从而让每个 task 处理比原来更少的数据 。举例来说, 如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后, 每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。

    但是,这种方法治标不治本,如果可以解决数据倾斜的问题,我们可以考虑使用这种方法。

    (2) 对key进行加盐操作
    所谓加盐,就是对key的前缀加随机值的操作,第一轮聚合的时候,对key进行打散,将原先一样的key,变成不一样的key,相当于是将每个key分为多组。先针对多个组,进行key的局部聚合。接着,再去除掉每个key的前缀,然后对所有的key进行全局的聚合。

    2.2.3 大小表join 发生数据倾斜

    当两个RDD要执行join操作的时候,它们肯定是要进行shuffle的,但是如果一个RDD数据量很大,而另一个RDD数据量很小,那么这样也会产生数据倾斜的问题。

    解决办法

    通常针对这种情况,我们的解决办法就是让join发生在map端(和mapreduce一样),直接让小的RDD变成一个广播变量,让每个excutor都保存一份进行join。这样就避免了shuffle操作。

    值得注意的是,对于join这种操作,不光是考虑数据倾斜的问题。即使是没有数据倾斜问题,也完全可以优先考虑,用我们讲的这种高级的reduce join转map join的技术,不要用普通的join,去通过shuffle,进行数据的join。完全可以通过简单的map,使用map join的方式,牺牲一点内存资源。在可行的情况下,优先这么使用。

    但是针对两个RDD都很大的情况下,这种情况就不适用了,很可能会造成OOM。

    2.2.4 某一个key对应大量数据

    之前说过,某一个key对应大量数据时,我们可以根据需要,直接在ETL时就使用where子句将导致数据倾斜的key过滤掉,但有时候我们不得不处理它,这就产生了一个问题:该怎么处理这个key呢?

    解决办法

    单拉出来那个最多的key,单独进行join,尽可能地将key分散到各个task上去进行join操作。

    3、总结

    这里要说明一下,上述解决方案是按适用性来进行列举的,也就是说,第一个,在数据源端进行聚合是最常用的解决方案,如果是一个方案不适用,才会使用下一个方案,其次,我们在面试过程中,要有条理地阐述各个解决方案,你可以说其中一种场景,使用了什么样的办法,解决了这个问题,不能一上来就把底全给交了,那面试官会觉得你在被答案,还是要假装思索一波的。

    展开全文
  • 1、数据倾斜的概念 数据倾斜是在map/reduce执行程序时,reduce大部分节点执行完毕,但有一个或者少数几个节点执行很慢,导致其他程序一直处于等待的状态,使得整个程序执行时间较长。 2、为什么出现数据倾斜? 主要...
  • 在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念:正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的...
  • 这种4n倾斜的分裂系统在子代细胞中具有重复发生的遗传,如上-右上生长模式所述,类似于癌症进化过程中的化生-EMT / MET胚胎发生事件的概念。 对该文献的审查,证实了发生在MET事件中的四倍体8C细胞使该胚胎学概念...
  • 何为数据倾斜?在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念:正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 8...
  • 在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念:正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的...
  • 数据倾斜

    2018-10-14 10:42:32
    在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同...
  • 数据处理中数据倾斜:个人理解,在数据处理MapReduce程序中,由于数据特殊性,数据中存在大量相同key数据,根据业务需求需要对这个key进行分区操作(group by/join)时,在mappartition阶段将大数据量...
  • 1.1 概念与用途 1.2 创建范围——线图 2、倾斜图 2.1 劳动生产率变化倾斜图 1、范围——线图 1.1 概念与用途 1.2 创建范围——线图 2、倾斜图 2.1 劳动生产率变化倾斜图 ...
  • 菜鸟都应该知道的倾斜摄影测量知识

    万次阅读 多人点赞 2015-07-29 15:54:12
    所谓“站在巨人肩膀上”在这篇文章中就很好体现出来了,本文一些图和一些概念就是借鉴“巨人”发表文章中图和概念。这还得感谢这些“巨人”把我们领向一条光明大道。我不是摄影测量专业人士,GIS圈也...
  • 从传统数据库迁移到GP中一个重要的且经常被开发人员忽略的概念是数据分布,没有良好的设计表的分布键会导致严重的性能问题,以下函数将给开发人员及DBA检测一个表的数据倾斜情况。 -- Function: gpmg.data_skew...
  • 什么是数据倾斜?如何解决数据倾斜?

    千次阅读 2019-03-28 14:31:33
    相信很多接触MapReduce的...正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据倾斜一...
  • 正常的数据分布理论上都是倾斜的,就是我们所说的2-8原理:80%的财富集中在20%的人手中,80%的用户只使用20%的功能,20%的用户贡献了80%的访问量,不同的数据字段的数据倾斜一般有两种情况: 一种是唯一值非常少,...
  • 实际搞过离线数据处理同学都知道,Hive SQL 各种优化方法都是和数据倾斜密切相关,所以我会先来聊一聊 “「数据倾斜」” 基本概念,然后再在此基础上为大家介绍各种场景下 Hive 优化方案。Hive 优化分为...
  • 数据倾斜的直白概念:数据倾斜就是数据的分布不平衡,某些地方特别多,某些地方又特别少,导致的在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务最终迟迟无法完成,这种现象就是数据...
  • 导读 相信很多接触MapReduce的朋友对'数据倾斜'这四个... 正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不...
  • 导读 相信很多接触MapReduce的朋友...正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据...
  • 在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , ...
  • Hive数据倾斜

    2020-10-14 10:29:58
    1.概念 map/reduce程序执行时,reduce节点大部分执行完毕,但是有...2.hive产生数据倾斜的原因 (1)key 分布不均匀 (2)业务数据本身的特性 (3)建表时考虑不周 (4)某些 SQL 语句本身就有数据倾斜 3.解决方案 (1)参数调节
  • Hadoop处理数据倾斜

    2019-01-23 21:48:18
    前言 南国在最开始学习Hadoop的时候,一直其他人说的数据倾斜 集数据倾斜...在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20...
  • 龙源期刊网http://www.qikan.com.cn基于倾斜摄影影像真三维单体化模型精细建模方法研究作者:贺晓阳朱琦来源:《数字技术与应用》2018年第05期摘要:随着现代社会发展进步,尤其是数字地球概念的提出,在全世界...
  • hive 数据倾斜

    2019-10-17 16:56:05
    hive上执行脚本,数据一直跑不出,询问dba说可能是数据倾斜的问题,需要优化脚本(之前脚本可以正常执行),最后发现join表的重复数据过多造成的。网上看了下倾斜,简单总结下。 一、 概念 由于数据分布不均,造成...
  • 数据倾斜以及处理

    2021-01-21 18:44:56
    数据倾斜的直白概念: 数据倾斜就是数据的分布不平衡,某些地方特别多,某些地方又特别少,导致的在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务最终迟迟无法完成,这种现象就是数据...
  • 何为数据倾斜

    千次阅读 2018-08-01 20:33:47
    正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据倾斜一般有两种情况: 一种是唯一值...
  • 数据倾斜原理及解决方案

    万次阅读 多人点赞 2018-11-20 16:03:22
    导读 相信很多接触MapReduce的朋友对'数据倾斜...正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字...
  • Spark处理数据倾斜

    2019-03-16 19:55:08
    1. 数据倾斜的基本概念 关于这点,其实上一篇博客里面 南国已经做了讲述。这里南国再做个简单的论述,数据倾斜主要就是大数据集群并行进行数据处理的时候,由于数据分布不均,导致大量的数据集中分不到一台或者某几...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 396
精华内容 158
关键字:

倾斜的概念