精华内容
下载资源
问答
  • 倾斜是指
    千次阅读
    2022-02-16 15:03:53

    数据倾斜是指在并行计算模式下(hadoop 的map-reduce 框架下,数据被切分为N个片段,分发到不同的计算节点上,单独计算),单个计算节点获得的数据量远远大于其他节点,造成该节点计算压力过大,导致计算效率下降或计算内存溢出。这个现象就是数据倾斜。 同工不同酬。

    解决这个问题的办法一般是:

    1 .对数据集重新分区,增大分区数量,使得每个分区记录数尽量相等

    2. 给数据增加随机id,按这个id重新分区;

    3. 给数据 “加盐”一种生成均匀分布的id值的机制。重分区

    4. 对造成数据集中的超级节点id,进行过滤,单独处理这样的节点。如,模电商的电话每天有数万订单,这个电话关联的业务数据就容易造成数据倾斜

    推荐书籍: spark 快速大数据分析(第二版)

    更多相关内容
  • 导读相信很多接触MapReduce的朋友对'数据倾斜'这四个字并不陌生,那么究竟什么是数据倾斜?又该怎样解决这种该死的情况呢?何为数据倾斜?在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布...

    导读

    相信很多接触MapReduce的朋友对'数据倾斜'这四个字并不陌生,那么究竟什么是数据倾斜?又该怎样解决这种该死的情况呢?

    何为数据倾斜?

    在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念:

        正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据倾斜一般有两种情况:

    一种是唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)

    一种是唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一

    数据倾斜:

        数据倾斜在MapReduce编程模型中十分常见,用最通俗易懂的话来说,数据倾斜无非就是大量的相同key被partition分配到一个分区里,造成了'一个人累死,其他人闲死'的情况,这种情况是我们不能接受的,这也违背了并行计算的初衷,首先一个节点要承受着巨大的压力,而其他节点计算完毕后要一直等待这个忙碌的节点,也拖累了整体的计算时间,可以说效率是十分低下的。

    解决方案:

        1.增加jvm内存,这适用于第一种情况(唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)),这种情况下,往往只能通过硬件的手段来进行调优,增加jvm内存可以显著的提高运行效率。

        2.增加reduce的个数,这适用于第二种情况(唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一),我们知道,这种情况下,最容易造成的结果就是大量相同key被partition到一个分区,从而一个reduce执行了大量的工作,而如果我们增加了reduce的个数,这种情况相对来说会减轻很多,毕竟计算的节点多了,就算工作量还是不均匀的,那也要小很多。

        3.自定义分区,这需要用户自己继承partition类,指定分区策略,这种方式效果比较显著。

        4.重新设计key,有一种方案是在map阶段时给key加上一个随机数,有了随机数的key就不会被大量的分配到同一节点(小几率),待到reduce后再把随机数去掉即可。

        5.使用combinner合并,combinner是在map阶段,reduce之前的一个中间阶段,在这个阶段可以选择性的把大量的相同key数据先进行一个合并,可以看做是local reduce,然后再交给reduce来处理,这样做的好处很多,即减轻了map端向reduce端发送的数据量(减轻了网络带宽),也减轻了map端和reduce端中间的shuffle阶段的数据拉取数量(本地化磁盘IO速率),推荐使用这种方法。

    展开全文
  • smart3D软件的使用无人机倾斜流程
  • 提出了一种新型的宽带声光表面波叉换能器(SAW-IDT), 该换能器应用双梯形倾斜电极,可对单梯形倾斜电极IDT通带的高频成分进行补偿,使通带更宽更平直,并无需外补偿网络.通过计算机模拟得出IDT的优化高频补偿因子ρ=...
  • 数据倾斜在并行计算模式下(map-reduce框架,数据被切分为N个片段,分发到不同的计算节点上,单独计算),部分节点处理的数据量远大于其他节点,造成该节点计算压力过大,从而导致少数节点的运行时长远远超过...

    概念

    数据倾斜是指在并行计算模式下(map-reduce框架,数据被切分为N个片段,分发到不同的计算节点上,单独计算),部分节点处理的数据量远大于其他节点,造成该节点计算压力过大,从而导致少数节点的运行时长远远超过其他节点的平均运行时长,进而影响整体任务产出时效,造成任务延迟,这个现象就是数据倾斜。

    如何定位任务是否出现倾斜,在哪个阶段出现了倾斜?

    对于MaxCompute/ODPS来说,定位是否倾斜,只需要在logview中查看每个mapper/joiner/reducer 阶段的fuxi instance是否有long-tail / data-skews节点,若有,则一定是出现了数据倾斜。

    对于hive,看任务执行的过程中如果一直在某个阶段卡在99%,那么大概率是出现了数据倾斜。

    如果是joiner阶段长尾,Hive可以配合执行计划以及执行日志中定位是哪个阶段出现的倾斜,稍微有点麻烦,需要一点点定位,步骤大致如下。(手头没有集群无法截图。。)

    1、从hive日志中定位长尾节点的"CommonJoinOperator: JOIN struct" 关键字。这里会打印该阶段的关联字段。

    2、然后到执行计划中找到这几个字段所处的阶段和关联表名。

    如何解决数据倾斜?

    定位到是否有倾斜,以及在某个阶段出现的倾斜之后,就可以针对性的去优化。数据倾斜并不是简单的调整并行度就可以解决的,而是需要针对特定情况动态使用解决方案。

    由于篇幅原因,后续我会出一个系列,开一个新坑,详细讲解各种阶段的数据倾斜的解决方案以及其他SQL性能调优实战方案,感兴趣的可以关注收藏一波。

    如:

    Map端长尾

    Join端长尾优化

    Reduce端长尾优化

    ....

    如果我的文章对你有帮助,请帮忙转发/点赞/收藏,谢谢!

    展开全文
  • 数据倾斜优化方案

    2018-12-29 15:24:41
    数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著 多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。 如果数据倾斜没有解决,完全没有可能进行性能调优...
  • 在简单介绍海底定点停驻UUV概念及工作过程的基础上,建立了其在海底时的稳定性分析模型,并推导出了航行器负浮力、主轴与两支撑点之间的半角θ、流体动力Fx、Fy之间的关系。为了研究航行器在海底时的流体动力特性,...
  • 数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。在Spark中,同一个Stage的不同Partition可以并行处理...
  • activity: ...import android.annotation.SuppressLint import android.os.Bundle import android.view.MotionEvent import android.view.View import android.view.View.OnTouchListener import android.widget....
  • 行业分类-设备装置-一种基于触摸屏的手指倾斜识别方法、系统及移动终端.zip,一种基于触摸屏的手指倾斜识别方法、系统及移动终端.pdf
  • Spark优化篇:数据倾斜解决

    千次阅读 2022-04-19 10:44:50
    数据倾斜我们在并行进行数据处理的时候,由于数据散列引起Spark的单个Partition的分布不均,导致大量的数据集中分布到一台或者几台计算节点上,导致处理速度远低于平均计算速度,从而拖延导致整个计算过程过慢,...

    数据倾斜是指我们在并行进行数据处理的时候,由于数据散列引起Spark的单个Partition的分布不均,导致大量的数据集中分布到一台或者几台计算节点上,导致处理速度远低于平均计算速度,从而拖延导致整个计算过程过慢,影响整个计算性能。

    数据倾斜带来的问题

    单个或者多个Task长尾执行,拖延整个任务运行时间,导致整体耗时过大。单个Task处理数据过多,很容易导致OOM。

    数据倾斜的产生原因

    数据倾斜一般是发生在 shuffle 类的算子、SQL函数导致,具体如以下:

    类型RDDSQL
    去重distinctdistinct
    聚合groupByKey、reduceByKey、aggregateByKeygroup by
    关联join、left join、right joinjoin、left join、right join

    ​​​​​​​

    通过Spark web ui event timeline观察明显长尾任务:

    数据倾斜大Key定位

    RDD进行抽取:

    val cscTopKey: Array[(Int, Row)] = sampleSKew(sparkSession,"default.tab_spark","id")
    println(cscTopKey.mkString("\n"))
    
      def sampleSKew( sparkSession: SparkSession, tableName: String, keyColumn: String ): Array[(Int, Row)] = {
        val df: DataFrame = sparkSession.sql("select " + keyColumn + " from " + tableName)
        val top10Key: Array[(Int, Row)] = df
          .select(keyColumn).sample(withReplacement = false, 0.1).rdd
          .map(k => (k, 1)).reduceByKey(_ + _)
          .map(k => (k._2, k._1)).sortByKey(ascending = false)
          .take(10)
        top10Key
      } 

    SQL进行抽取:

    SELECT
    	id,conut(1) as cn
    FROM
    	default.tab_spark_test_3
    GROUP BY id	
    ORDER BY cn DESC
    LIMIT 100;
    100000,2000012
    100001,1600012
    100002,1

    单表数据倾斜优化

    为了减少 shuffle 数据量以及 reduce 端的压力,通常 Spark SQL 在 map 端会做一个partial aggregate(通常叫做预聚合或者偏聚合),即在 shuffle 前将同一分区内所属同 key 的记录先进行一个预结算,再将结果进行 shuffle,发送到 reduce 端做一个汇总,类似 MR 的提前Combiner,所以执行计划中 HashAggregate 通常成对出现。 但是这种也会出现问题,如果key重复的量级特别大,Combiner也是解决不了本质问题。

    解决方案:

    Add Salt局部聚合 2、Remove Salt全局聚合

    sparkSession.udf.register("random_prefix", ( value: Int, num: Int ) => randomPrefixUDF(value, num))
    sparkSession.udf.register("remove_random_prefix", ( value: String ) => removeRandomPrefixUDF(value))
    
    		//t1 增加前缀,t2按照加盐的key进行聚,t3去除加盐,聚合
        val sql =
          """
            |select
            |  id,
            |  sum(sell) totalSell
            |from
            |  (
            |    select
            |      remove_random_prefix(random_id) id,
            |      sell
            |    from
            |      (
            |        select
            |          random_id,
            |          sum(pic) sell
            |        from
            |          (
            |            select
            |              random_prefix(id, 6) random_id,
            |              pic
            |            from
            |              default.tab_spark_test_3
            |          ) t1
            |        group by random_id
            |      ) t2
            |  ) t3
            |group by
            |   id
          """.stripMargin
          
    def randomPrefixUDF( value: Int, num: Int ): String = {
        new Random().nextInt(num).toString + "_" + value
      }
    
    def removeRandomPrefixUDF( value: String ): String = {
        value.toString.split("_")(1)
      }  

    表关联数据倾斜优化

    1、适用场景

    适用于 join 时出现数据倾斜。

    2、解决逻辑

    1、将存在倾斜的表,根据抽样结果,拆分为倾斜 key(skew 表)和没有倾斜 key(common)的两个数据集;

    2、将 skew 表的 key 全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集(old 表)整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍,得到 new 表)。

    3、打散的 skew 表 join 扩容的 new 表

    union common 表 join old 表

    以下为打散大 key 和扩容小表的实现思路

    1、打散大表:实际就是数据一进一出进行处理,对大 key 前拼上随机前缀实现打散;

    2、扩容小表:实际就是将 DataFrame 中每一条数据,转成一个集合,并往这个集合里循环添加 10 条数据,最后使用 flatmap 压平此集合,达到扩容的效果。

     /**
       * 打散大表  扩容小表 解决数据倾斜
       *
       * @param sparkSession
       */
      def scatterBigAndExpansionSmall(sparkSession: SparkSession): Unit = {
        import sparkSession.implicits._
        val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
        val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
          .withColumnRenamed("discount", "pay_discount")
          .withColumnRenamed("createtime", "pay_createtime")
        val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
          .withColumnRenamed("discount", "cart_discount")
          .withColumnRenamed("createtime", "cart_createtime")
    
        // TODO 1、拆分 倾斜的key
        val commonCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") != 101 && item.getAs[Long]("courseid") != 103)
        val skewCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") == 101 || item.getAs[Long]("courseid") == 103)
    
        //TODO 2、将倾斜的key打散  打散36份
        val newCourseShoppingCart = skewCourseShoppingCart.mapPartitions((partitions: Iterator[Row]) => {
          partitions.map(item => {
            val courseid = item.getAs[Long]("courseid")
            val randInt = Random.nextInt(36)
            CourseShoppingCart(courseid, item.getAs[String]("orderid"),
              item.getAs[String]("coursename"), item.getAs[String]("cart_discount"),
              item.getAs[String]("sellmoney"), item.getAs[String]("cart_createtime"),
              item.getAs[String]("dt"), item.getAs[String]("dn"), randInt + "_" + courseid)
          })
        })
        //TODO 3、小表进行扩容 扩大36倍
        val newSaleCourse = saleCourse.flatMap(item => {
          val list = new ArrayBuffer[SaleCourse]()
          val courseid = item.getAs[Long]("courseid")
          val coursename = item.getAs[String]("coursename")
          val status = item.getAs[String]("status")
          val pointlistid = item.getAs[Long]("pointlistid")
          val majorid = item.getAs[Long]("majorid")
          val chapterid = item.getAs[Long]("chapterid")
          val chaptername = item.getAs[String]("chaptername")
          val edusubjectid = item.getAs[Long]("edusubjectid")
          val edusubjectname = item.getAs[String]("edusubjectname")
          val teacherid = item.getAs[Long]("teacherid")
          val teachername = item.getAs[String]("teachername")
          val coursemanager = item.getAs[String]("coursemanager")
          val money = item.getAs[String]("money")
          val dt = item.getAs[String]("dt")
          val dn = item.getAs[String]("dn")
          for (i <- 0 until 36) {
            list.append(SaleCourse(courseid, coursename, status, pointlistid, majorid, chapterid, chaptername, edusubjectid,
              edusubjectname, teacherid, teachername, coursemanager, money, dt, dn, i + "_" + courseid))
          }
          list
        })
    
        // TODO 4、倾斜的大key 与  扩容后的表 进行join
        val df1: DataFrame = newSaleCourse
          .join(newCourseShoppingCart.drop("courseid").drop("coursename"), Seq("rand_courseid", "dt", "dn"), "right")
          .join(coursePay, Seq("orderid", "dt", "dn"), "left")
          .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
            , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
            "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    
    
        // TODO 5、没有倾斜大key的部分 与 原来的表 进行join
        val df2: DataFrame = saleCourse
          .join(commonCourseShoppingCart.drop("coursename"), Seq("courseid", "dt", "dn"), "right")
          .join(coursePay, Seq("orderid", "dt", "dn"), "left")
          .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
            , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
            "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    
        // TODO 6、将 倾斜key join后的结果 与 普通key join后的结果,uinon起来
        df1
          .union(df2)
          .write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail")
      }

    展开全文
  • 1 什么是数据倾斜数据倾斜在大数据计算任务中某个处理任务的进程(通常是一个JVM进程)被分配到的任务量过多,导致任务运行时间超长甚至最终失败,进而导致整个大任务超长时间运行或者失败。外...
  • 如何处理Spark数据倾斜问题

    千次阅读 2022-02-26 22:23:29
    数据倾斜某些任务对应分区上的数据显著多于其他任务对应分区上的数据,从而导致这部分分区上数据的处理速度成为处理整个数据集的瓶颈。 在Spark中,同一Stage内不同的任务可以并行执行,而不同Stage之间的任务...
  • spark学习之处理数据倾斜

    千次阅读 2022-03-27 17:24:21
    大数据面试,遇见数据倾斜不会答?最全的数据倾斜总结来教你如何解决它。
  • 数据倾斜及其高效解决方法

    万次阅读 多人点赞 2018-11-13 18:20:07
    数据倾斜:mapreduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千...
  • 本篇文章主要介绍了"html元素倾斜",主要涉及到html元素倾斜方面的内容,对于html元素倾斜感兴趣的同学可以参考一下。div{width:100px;height:75px;background-color:yellow;border:1px solid black;border-radius:5...
  • 倾斜摄影是最近比较火的一个话题,但也引出了一些问题,有些需要倾斜摄影数据的单位或者公司,想和已有数据做结合,对外提需求就是,能否做到1:500的精度?我们现在有很多1:500的线画图,dlg图,能否做到完美结合...
  • 什么是数据倾斜以及造成的原因

    千次阅读 2018-10-13 21:01:41
    在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要是因为在Job完成后的所得到的Counters是整个Job的总和,优化是基于这些Counters得出的平均值,而由于数据倾斜的原因...
  • 更多文章,可关注微信公众号:Excel办公小技巧 ...数据倾斜,分布式计算时,一些节点计算量高于其他节点,速度很慢,导致其他节点计算完成后,还要等待这些节点完成。 Q2 数据倾斜有什么表现? 直观上..
  • 2021年倾斜摄影测量技术方案.docx2021年倾斜摄影测量技术方案.docx2021年倾斜摄影测量技术方案.docx2021年倾斜摄影测量技术方案.docx2021年倾斜摄影测量技术方案.docx2021年倾斜摄影测量技术方案.docx2021年倾斜摄影...
  • 倾斜摄影测量三维模型技术发展至今,属于一项成熟度很高的技术。很多人手里都有大大小小的一些倾斜摄影三维模型数据,但是数据怎么展示,怎么进行使用,怎么与业务进行结合一直是大家很头痛的事情。下面我会为大家...
  • matlab图像倾斜校正

    千次阅读 2021-05-06 01:26:20
    引言 容提要数字图形图像处理是采用计算机处理图形图像的技术。近些年随着计算机 与信息技术的高速发展,数字图...... 基于MATLAB 图像处理的汽车牌照识别研究 赵珊,裴亮,刘翠,王涛,王新亮,朱... 本设计中采用...
  • spark数据倾斜

    2022-01-06 11:02:19
    数据倾斜指的是,并行处理的数据集里某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。比如统计单词频数的程序中某个Key对应的数据量非常大的...
  • 一种快速的文档图像倾斜角检测算法.docx一种快速的文档图像倾斜角检测算法.docx一种快速的文档图像倾斜角检测算法.docx一种快速的文档图像倾斜角检测算法.docx一种快速的文档图像倾斜角检测算法.docx一种快速的文档...
  • 引言数据量倾斜的成因和应对方法bigkey 导致倾斜Slot 分配不均衡导致倾斜Hash Tag 导致倾斜数据访问倾斜的成因和应对方法 引言 在切片集群中,数据会按照一定的分布规则分散到不同的实例上保存。比如,在使用Redis ...
  • 数据倾斜及一些解决方法

    千次阅读 2021-10-01 12:17:31
    数据倾斜主要是:主要就是数据在每个节点上的分配不均,导致个别节点处理速度很慢,使得程序迟迟不能运行结束。主要表现为:在mapreduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行...
  • 关于文字识别中两种倾斜校正算法的比较研究.pdf关于文字识别中两种倾斜校正算法的比较研究.pdf关于文字识别中两种倾斜校正算法的比较研究.pdf关于文字识别中两种倾斜校正算法的比较研究.pdf关于文字识别中两种倾斜...
  • 数据倾斜指的是,并行处理的数据集 中,某一部分(如Spark的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈. 表现为整体任务基本完成, ... Hive数据倾斜解决方法总结 数据...
  • Spark数据倾斜问题+解决方案

    千次阅读 2021-12-12 20:43:10
    数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于 其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈 数据倾斜俩大直接致命后果 1)数据倾斜直接会导致一...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 37,207
精华内容 14,882
热门标签
关键字:

倾斜是指