精华内容
下载资源
问答
  • 解决数据倾斜的方法
    千次阅读
    2021-10-01 12:17:31

    一、数据倾斜原理

    做大数据开发,很有可能会遇到数据倾斜的问题,要想解决数据倾斜,首先要理解什么是数据倾斜,以及产生数据倾斜的原因。

    数据倾斜主要是指:主要就是数据在每个节点上的分配不均,导致个别节点处理速度很慢,使得程序迟迟不能运行结束。主要表现为:在mapreduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key中的的条数比其他key要多很多,这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。如何将数据均匀的分配到各个reduce节点中,就是解决数据倾斜的根本所在

    二、Spark中数据倾斜解决

    以下针对spark具体计算场景,给出数据倾斜解决方案:
    场 景当RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,产生数据倾斜。
    出现数据倾斜原因:
    在上述分组场景中,具有相同的key的数据就会被分配到同一个组当中,从而分配到同一分区。如果某些相同key的数据量非常大,而其他是key的数据量相对较小,此时就可能产生比较严重的数据倾斜。
    本方案通过两个阶段聚合:
    阶段一:
    先给每个key都打上一个随机数,比如10以内的随机数,比如(spark,1) (spark, 1) (spark, 1) (spark, 1),就会变成(1_spark, 1) (1_spark, 1) (2_spark, 1) (2_spark, 1)。打上随机数以后,原先一样的key就变成不一样的了。然后对数据进行reduceByKey等聚合操作,局部聚合结果变成了(1_spark, 2) (2_spark, 2)。
    阶段二:
    基于阶段一局部聚合的数据,将各个key的前缀给去掉,就会变成(spark,2)(spark,2),再次进行全局聚合操作,得到最终结果,比如(spark, 4)。

    三、Hive中数据倾斜

    1.group by导致数据倾斜

    (1)、设置hive.map.aggr:默认为true,在map端做聚合,推荐使用

    (2)、设置hive.groupby.skewindata:reduce操作的时候,相同key值并不是都给同一个reduce,而是随机分发到各个reduece做聚合。这个参数其实跟hive.map.aggr做的类似,只是在reduce端做,要额外启动一轮job,不推荐使用

    (3)、优化sql语句

    有个t表,数据量很大,假如字段a代表的性别,那么只有2个值,对a进行group by操作,所有聚合运行将会落在两个节点上。优化方法,先group by a b,b需要一个比较分散的值,比如班级或者年级,得到一个较小规模的中间结果数据,再对中间结果group by a。

    改写前

    select a, count(distinct b) as c from t group by a;
    

    改写后

    select a, count(*) as c from (select a, b from t group by a, b) group by a;
    

    2.join操作导致数据倾斜

    select * from logs a join users b on a.user_id = b.user_id;
    

    日志表有大量未登陆用户的数据,即user_id为0,reduce时候,某个节将会其他节点多出大量数据,形成单点压力。

    (1)、设置hive.optimize.skewjoin和hive.skewjoin.key参数

    其原理把这种user_id为0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。hive.skewjoin.key设置值比如是1万,那么超过1万条记录的值就是特殊值。

    (2)、 优化sql,特殊值分开处理

    user_id=0的单独做join,这样user_id=0转化成map join,user_id!=0是没有数据倾斜的普通join。

       select
            *
        from (select * from logs where user_id = 0) a
        join (select * from users where user_id = 0) b on a.user_id = b.user_id
       union all
       select * from logs a join users b on a.user_id <> 0 and a.user_id = b.user_id;
    

    (3)、优化sql,特殊值赋予新key

       select * from logs a
        left outer join users b
        on
            case
                when a.user_id is null
                then concat('prefix_', rand())
                else a.user_id
            end = b.user_id;
    

    (4)、优化sql,关联key随机打散

       select a.*,b.*
        from (select *, cast(rand() * 10 as int) as r_id from logs) a
        join (select *, r_id from items lateral view explode(range_list(1, 10)) rl as r_id) b
        on a.item_id = b.item_id and a.r_id = b.r_id
    

    对行为表的每条记录生成一个1-10的随机整数,对于item属性表,每个item生成10条记录,随机key分别也是1-10,这样就能保证行为表关联上属性表。其中range_list(1,10)代表用udf实现的一个返回1-10整数序列的方法。这个做法是一个解决join倾斜比较根本性的通用思路,就是如何用随机数将key进行分散。

    3.不同类型关联导致数据倾斜
    用户表中user_id字段为int,logs表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。需要把数字类型统一转换成字符串类型。

    select    * from users a
    left outer join logs b on a.usr_id = cast(b.user_id as string)
    

    4.利用map join解决数据倾斜问题

    (1)、大小表关联

     select * from users as a join logs b  on a.user_id = b.user_id
    

    如果users表只有100行数据,logs表有1亿条数据且数据倾斜特别严重,reduce过程中同样会遇到数据倾斜问题。

    利用map join,会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多。

    select /*+ mapjoin(a)*/ from users as a join logs b  on a.user_id = b.user_id
    

    (2)、where条件中存在不等式造成的笛卡尔积

    map join还有一个优势,能够进行不等连接的join操作,如果将不等条件写在where中,那么mapreduce过程中会进行笛卡尔积,运行效率特别低,如果使用map join操作,在map的过程中就完成了不等值的join操作,效率会高很多。

    select /*+ mapjoin(a)*/ from A join B where A.a>B.a
    

    (3)、小表不大大表不小

    select * from log a left outer join users b on a.user_id = b.user_id;
    

    users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

    log里user_id有上百万个,但每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等,可以利用这一特性,通过map join进行优化

    select
        /*+mapjoin(x)*/
        *
    from
        logs a
    left outer join
        (
            select
                /*+mapjoin(c)*/
                d.*
            from
                (
                    select distinct user_id from logs
                )
                c
            join users d
            on
                c.user_id = d.user_id
        )
        x on a.user_id = b.user_id; 
    
    更多相关内容
  • Hive解决数据倾斜的各种优化方法

    千次阅读 2021-01-20 20:39:46
    数据处理中的数据倾斜:个人理解,在数据处理的MapReduce程序中,由于数据的特殊性,数据中存在大量相同key的数据,根据业务需求需要对这个key进行分区操作(group by/join)时,在map的partition阶段将大数据量的...

    一、概念

    数据处理中的数据倾斜:个人理解,在数据处理的MapReduce程序中,由于数据的特殊性,数据中存在大量相同key的数据,根据业务需求需要对这个key进行分区操作(group by/join)时,在map的partition阶段将大数据量的相同key的数据全部分配到同一个Reduce,导致Reduce的节点数据量分配极度不均衡的现象,称为数据倾斜。

     数据倾斜有哪些表现:

    • 最直观的表现就是:Hive SQL运行得慢
    • 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成,因为其处理的数据量和其他reduce差异过大。
    • 单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多,最长时长远大于平均时长。

    数据倾斜的原因:

    1. key分布不均匀
    2. 业务数据本身的特性
    3. 建表时考虑不周
    4. 某些SQL语句本身就有数据倾斜

    二、数据倾斜优化方法分类

    在实际Hive SQL开发的过程中,Hive SQL性能的问题上实际只有一小部分和数据倾斜相关。很多时候,Hive SQL运行得慢是由开发人员对于使用的数据了解不够以及一些不良的使用习惯引起的,我们可以确定一些关键点,看看是否能通过业务层面来避免写这中运行的特别慢的 hive sql,比如使用公共汇总层的数据代替公共明细层的数据。除此之外就需要真正的Hive优化技术了。

    所以个人将优化方法分为以下三大类:

    1. 业务优化
    2. join无关的优化
      1. group by 引起的数据倾斜优化
      2. count distinct 优化
    3. join相关的优化
      1. mapjoin可以解决的join优化(即大表join小表)
      2. mapjoin无法解决的join优化(即大表join大表)

    三、具体的优化方法

    1.业务优化

    很多时候,Hive SQL运行得慢是由开发人员对于使用的数据了解不够以及一些不良的使用习惯引起的。

    开发人员需要确定以下几点。

    • 需要计算的指标真的需要从数据仓库的公共明细层来自行汇总么?是不是数据公共层团队开发的公共汇总层已经可以满足自己的需求?对于大众的、KPI相关的指标等通常设计良好的数据仓库公共层肯定已经包含了,直接使用即可。
    • 真的需要扫描这么多分区么?比如对于销售明细事务表来说,扫描一年的分区和扫描一周的分区所带来的计算、IO开销完全是两个量级,所耗费的时间肯定也是不同的。笔者并不是说不能扫描一年的分区,而是希望开发人员需要仔细考虑业务需求,尽量不浪费计算和存储资源,毕竟大数据也不是毫无代价的。
    • 尽量不要使用select * from your_table这样的方式,用到哪些列就指定哪些列,如select col1, col2 from your_table。另外,where条件中也尽量添加过滤条件,以去掉无关的数据行,从而减少整个MapReduce任务中需要处理、分发的数据量。
    • 输入文件不要是大量的小文件。Hive的默认Input Split是128MB(可配置),小文件可先合并成大文件。

    在保证了上述几点之后,有的时候发现Hive SQL还是要运行很长时间,甚至运行不出来,这时就需要真正的Hive优化技术了。

    2.join无关的优化

    Hive SQL性能问题基本上大部分都和join相关,对于和join无关的问题主要有group by相关的倾斜和count distinct相关的优化。

    1)group by 引起的数据倾斜优化

    group by引起的倾斜主要是输入数据行按照group by列分布不均匀引起的。

    比如,有个key值有100W个a,此时直接做分组的话,这100W个a将会分到同一个reduce中,这一个节点处理的数据远大于其他节点处理的数据,造成数据倾斜,跑不出数据。其原因就是有大量的key集中分配到了同一个reduce,那么我们的解决思路就是将这些key值打散,使起分散到多个reduce节点处理即可,达到负载均衡的效果。

    实现原理:

    在做group by 之前,我们给key=hello的数据做一次转换(加上0-9的随机数的前缀),变成0-hello,1-hello,2-hello...,此时做group by,数据将分散到多个reduce,然后再在上层查询中,将我们添加的随机数前缀去掉,使其变回a再做一次全局聚合即可,(对于大量不可删除的key值处理也是这个原理)。

    SQL实现方式伪代码:

    -- 假设有表 tb_name(key_col,cnt)且已知由key_col=hello 造成数据倾斜
    
    -- 原查询
    select key_col,sum(cnt) as cnt from tb_name group by key_col;
    
    -- 优化后
    select
      case when key_col like "%hello" then (伪代码:去除前缀) else key_col end as key_col,
      sum(cnt) as cnt 
    from (
      select 
        key_col,
        sum(cnt) as cnt 
      from (
        select 
          case when key_col="hello" then concat_ws("-",rand(),key_col) else key_col end as key_col,
          cnt
        from tb_name
      ) a
      group by key_col
    ) res
    group by case when key_col like "%hello" then (伪代码:去除前缀) else key_col end

    这样看起来操作会比较麻烦,有没有更简单的方法呢?其实Hive已经做了优化,我们只需要配置几个参数就行了。

    对于group by引起的倾斜,优化措施非常简单,只需设置下面参数即可:

    set hive.map.aggr = true
    set hive.groupby.skewindata = true

    此时Hive在数据倾斜的时候会进行负载均衡,生成的查询计划会有两个MapReduce Job

    第一个MapReduce Job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作并输出结果,这样处理的结果是相同的GroupBy Key有可能被分布到不同的Reduce中,从而达到负载均衡的目的;

    第二个MapReduce Job 再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

    2)count distinct优化

    在Hive开发过程中,应该小心使用count distinct,因为很容易引起性能问题,比如下面的SQL:

    select count(distinct user) from some_table;

    由于必须去重,因此Hive将会把Map阶段的输出全部分布到一个Reduce Task上,此时很容易引起性能问题。

    对于这种情况,可以通过先group by再count的方式来优化,优化后的SQL如下

    select count(*) 
    from
    (select user from some_table group by user) a; 

    其原理为:利用group by去重,再统计group by的行数目(不过这种方式需要注意数据倾斜的问题)。

    3.join相关的优化

    1)mapjoin可以解决的join优化(即大表join小表)

    背景:通常情况下,JOIN操作在Reduce阶段执行表连接。整个JOIN过程包含Map、Shuffle、Reduce三个阶段。MAPJOIN在Map阶段执行表连接,而非等到Reduce阶段才执行表连接。这样就节省了大量数据传输的时间以及系统资源,从而起到了优化作业的作用。在大表和一个或多个小表JOIN的场景下,MAPJOIN会将您指定的小表全部加载到执行JOIN操作的程序的内存中,因此指定的表仅能为小表。

    以销售明细事实表为例来说明大表join小表的场景。

    假如供应商会进行评级,比如(五星、四星、三星、两星、一星),此时业务人员希望能够分析各供应商星级的每天销售情况及其占比。

    开发人员一般会写出如下SQL:

    select
        seller_star,
        count(order_id) as order_cnt
    from 
    (select order_id,seller_id from detail_table where dt=20210119) a
    left join 
    (select seller_id,seller_star from dim_seller where dt=20210119) b
    on a.seller_id = b.seller_id
    group by b.seller_star

    但正如上述所言,现实世界的二八准则将导致订单集中在部分供应商上,而好的供应商的评级通常会更高,此时更加剧了数据倾斜的程度,如果不加以优化,上述SQL将会耗费很长时间,甚至运行不出结果。通常来说,供应商是有限的,比如上千家、上万家,数据量不会很大,而销售明细事实表比较大,这就是典型的大表join小表问题,可以通过mapjoin的方式来优化,只需添加mapjoin hint即可,优化后的SQL如下:

    select  /*+mapjoin(b)*/ 
        seller_star,
        count(order_id) as order_cnt
    from 
    (select order_id,seller_id from detail_table where dt=20210119) a
    left join 
    (select seller_id,seller_star from dim_seller where dt=20210119) b
    on a.seller_id = b.seller_id
    group by b.seller_star

    /*+mapjoin(b)*/即mapjoin hint,如果需要mapjoin多个表,则格式为/*+mapjoin(b, c, d)*/。Hive对于mapjoin是默认开启的,设置参数为:

    set hive.auto.convert.join=true;

    mapjoin优化是在Map阶段进行join,而不是像通常那样在Reduce阶段按照join列进行分发后在每个Reduce任务节点上进行join,不需要分发也就没有倾斜的问题,相反Hive会将小表全量复制到每个Map任务节点(对于本例是dim_seller表,当然仅全量复制b表sql指定的列),然后每个Map任务节点执行lookup小表即可。

    所以,小表不能太大,否则全量复制分发得不偿失,那么多小的表算作小表呢?这就涉及到一个阈值划分的问题,hive中通过参数hive.mapjoin.smalltable.filesize(版本不同,相应的参数不同)来确定小表的大小是否满足条件(默认25MB),实际中可以根据集群情况调整,但是一般最大不能超过1GB(太大的话Map任务所在的节点内存会撑爆,Hive会报错。另外需要注意的是,HDFS显示的文件大小是压缩后的大小,当实际加载到内存的时候,容量会增大很多,很多场景下可能会膨胀10倍)。

    使用注意事项:

    • 老版本的hive在join时,会要求将小表放在join的左边来触发mapjoin,但新版本的hive已经做了优化,小表在左在右已经没有区别了,可以使用explain打印出执行计划查看。
    • 执行join操作,hive会自动对参与join的key做空值过滤,打印执行计划会有 " key(参与join的key字段) is not null "的操作。但非inner join的其他join操作不会做过滤。

    2)mapjoin无法解决的join优化(即大表join大表)

    有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同
    的 reducer 上,从而导致内存不够。

    a.空 key 过滤

    此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。如果 key 对应的字段为空,且是异常数据,应该在join前直接过滤掉。

    实例:

    -- 不过滤空 id
    select n.* from tb_name n left join bigtable o on n.id = o.id;
    -- 过滤空 id
    select 
    	n.* 
    from (
    	select 
    		* 
    	from tb_name 
    	where id is not null
    ) n 
    left join bigtable o on n.id = o.id;

    适用场景:

    1. 非inner join
    2. 结果中,参与join的字段不需要null的情况

    b.空 key 转换

    有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上(这里的处理方式其实跟前面的group by的优化类似,这做空 key 转换的优化演示)。

    -- 空 key 转换前
    select n.* from nullidtable n left join bigtable b on n.id = b.id;
    
    -- 空 key 转换后
    select n.* from nullidtable n full join bigtable o on 
    nvl(n.id,rand()) = o.id;
    展开全文
  • 在Spark计算平台中,数据倾斜...提出了广播机制避免Shuffle过程数据倾斜方法,分析了广播变量分发逻辑过程,给出广播变量性能优势分析和该方法的算法实现.通过Broadcast Join实验验证了该方法在性能上有稳定的提升.
  • 解决Spark 数据倾斜的八种实用方法 什么是数据倾斜? 对 Spark/Hadoop 这样的分布式大数据系统来讲数据量大并不可怕可怕的是数据倾斜 对于分布式系统而言理想情况下随着系统规模(节点数量)的增加应用整体耗时线性下降...
  • 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。何谓数据倾斜数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该...
  • Flink 数据倾斜 解决方法

    千次阅读 2021-04-12 21:20:12
    1.数据倾斜的原理和影响 1.1 原理 数据倾斜就是数据的分布严重不均,造成一部分数据很多,一部分数据很少的局面。数据分布理论上都是倾斜的,符合“二八原理”:例如80%的财富集中在20%的人手中、80%的用户只使用...

    1.数据倾斜的原理和影响

    1.1 原理

    数据倾斜就是数据的分布严重不均,造成一部分数据很多,一部分数据很少的局面。数据分布理论上都是倾斜的,符合“二八原理”:例如80%的财富集中在20%的人手中、80%的用户只使用20%的功能、20%的用户贡献了80%的访问量。 数据倾斜的现象,如下图所示。

    1.2 影响

    (1)单点问题

    数据集中在某些分区上(Subtask),导致数据严重不平衡。

    (2)GC 频繁

    过多的数据集中在某些 JVM(TaskManager),使得JVM 的内存资源短缺,导致频繁 GC。

    (3)吞吐下降、延迟增大

    数据单点和频繁 GC 导致吞吐下降、延迟增大。

    (4)系统崩溃

    严重情况下,过长的 GC 导致 TaskManager 失联,系统崩溃。

    数据倾斜的影响

    2.Flink 如何定位数据倾斜?

    步骤1:定位反压

    定位反压有2种方式:Flink Web UI 自带的反压监控(直接方式)、Flink Task Metrics(间接方式)。通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask

    参考:【Flink 精选】如何分析及处理反压?

    步骤2:确定数据倾斜

    Flink Web UI 自带Subtask 接收和发送的数据量。当 Subtasks 之间处理的数据量有较大的差距,则该 Subtask 出现数据倾斜。如下图所示,红框内的 Subtask 出现数据热点。

    Web UI 数据量监控

    3.Flink 如何处理常见数据倾斜?

    优化前

    场景一:数据源 source 消费不均匀 

    解决思路:通过调整并发度,解决数据源消费不均匀或者数据源反压的情况。例如kafka数据源,可以调整 KafkaSource 的并发度解决消费不均匀。调整并发度的原则:KafkaSource 并发度与 kafka 分区数是一样的,或者 kafka 分区数是KafkaSource 并发度的整数倍

    场景二:key 分布不均匀的无统计场景

    说明:key 分布不均匀的无统计场景,例如上游数据分布不均匀,使用keyBy来打散数据

    解决思路: 通过添加随机前缀,打散 key 的分布,使得数据不会集中在几个 Subtask

    优化后

    具体措施:

    ① 在原来分区 key/uid 的基础上,加上随机的前缀或者后缀

    使用数据到达的顺序seq,作为分区的key。

    场景三:key 分布不均匀的统计场景

    解决思路:聚合统计前,先进行预聚合,例如两阶段聚合(加盐局部聚合+去盐全局聚合)

    优化后

    两阶段聚合的具体措施:

    预聚合:加盐局部聚合,在原来的 key 上加随机的前缀或者后缀

    聚合:去盐全局聚合,删除预聚合添加的前缀或者后缀,然后进行聚合统计。

    参考: https://www.jianshu.com/p/4ae20202e06d 

    方法1:调整并行度

    方法2:加随机前缀或后缀

    方法3:两阶段聚合(加盐局部聚合+去盐全局聚合)

    展开全文
  • 数据倾斜解决方案之使用随机key实现双重聚合
  • Flink数据倾斜问题以及解决方法

    千次阅读 2022-04-30 16:34:51
    1. 判断是否存在数据倾斜...2.数据倾斜解决 2.1 keyBy 之前发生数据倾斜 如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身

    1. 判断是否存在数据倾斜

    相同 Task 的多个 Subtask 中,个别Subtask 接收到的数据量明显大于其他 Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。通常,数据倾斜也会引起反压。
    在这里插入图片描述

    2.数据倾斜的解决

    2.1 keyBy 之前发生数据倾斜
    如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些 partition 的数据量较少。对于不存在 keyBy 的 Flink 任务也会出现该情况。
    这种情况,需要让 Flink 任务强制进行shuffle。使用shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。

    2.2 keyBy 后的聚合操作存在数据倾斜
    使用LocalKeyBy的思想:在 keyBy 上游算子数据发送之前,首先在上游算子的本地
    对数据进行聚合后再发送到下游,使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类似MapReduce 中 Combiner 的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。
    注意:Flink是实时流处理,如果keyby之后的聚合操作存在数据倾斜,且没有开窗口的情况下,简单的认为使用两阶段聚合,是不能解决问题的。因为这个时候Flink是来一条处理一条,且向下游发送一条结果,对于原来keyby的维度(第二阶段聚合)来讲,数据量并没有减少,且结果重复计算(非FlinkSQL,未使用回撤流),如下图所示:

    在这里插入图片描述

    实现方式:以计算PV为例,keyby之前,使用flatMap实现LocalKeyby

    class LocalKeyByFlatMap extends RichFlatMapFunction<String, Tuple2<String, 
     //Checkpoint 时为了保证 Exactly Once,将 buffer 中的数据保存到该 ListState 中
     private ListState<Tuple2<String, Long>> localPvStatListState;
     
     //本地 buffer,存放 local 端缓存的 app 的 pv 信息
     private HashMap<String, Long> localPvStat;
     
     //缓存的数据量大小,即:缓存多少数据再向下游发送
     private int batchSize;
     
     //计数器,获取当前批次接收的数据量
     private AtomicInteger currentSize;
    
     //构造器,批次大小传参
     LocalKeyByFlatMap(int batchSize){
     	this.batchSize = batchSize;
     }
    
     @Override
     public void flatMap(String in, Collector collector) throws Exception {
     	// 将新来的数据添加到 buffer 中
     	Long pv = localPvStat.getOrDefault(in, 0L);
     	localPvStat.put(in, pv + 1);
     	// 如果到达设定的批次,则将 buffer 中的数据发送到下游
     	if(currentSize.incrementAndGet() >= batchSize){
     		// 遍历 Buffer 中数据,发送到下游
     		for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
     			collector.collect(Tuple2.of(appIdPv.getKey(), appIdPv.getValue()
     		}
     		// Buffer 清空,计数器清零
     		localPvStat.clear();
     		currentSize.set(0);
     	}
     }
    
     @Override
     public void snapshotState(FunctionSnapshotContext functionSnapshotConte
     	// 将 buffer 中的数据保存到状态中,来保证 Exactly Once
     	localPvStatListState.clear();
     	for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
     		localPvStatListState.add(Tuple2.of(appIdPv.getKey(), appIdPv.ge
     	}
     }
    
     @Override
     public void initializeState(FunctionInitializationContext context) {
     	// 从状态中恢复 buffer 中的数据
     	localPvStatListState = context.getOperatorStateStore().getListState
     	new ListStateDescriptor<>("localPvStat",
     	TypeInformation.of(new TypeHint<Tuple2<String, Long>>})));
     	localPvStat = new HashMap();
     	if(context.isRestored()) {
     		// 从状态中恢复数据到 localPvStat 中
     		for(Tuple2<String, Long> appIdPv: localPvStatListState.get()){
    long pv = localPvStat.getOrDefault(appIdPv.f0, 0L);
     			// 如果出现 pv != 0,说明改变了并行度,
     			// ListState 中的数据会被均匀分发到新的 subtask中
     			// 所以单个 subtask 恢复的状态中可能包含两个相同的 app 的数据
     			localPvStat.put(appIdPv.f0, pv + appIdPv.f1);
     		}
     		// 从状态恢复时,默认认为 buffer 中数据量达到了 batchSize,需要向下游发
     		currentSize = new AtomicInteger(batchSize);
     	} else {
     		currentSize = new AtomicInteger(0);
     	}
     }
    
    }
    

    2.3 keyBy 后的窗口聚合操作存在数据倾斜
    因为使用了窗口,变成了有界数据的处理(3.2.2已分析过),窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
    实现思路:

    • 第一阶段聚合:key拼接随机数前缀或后缀,进行keyby、开窗、聚合注意:聚合完不再是WindowedStream要获取WindowEnd作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
    • 第二阶段聚合:去掉随机数前缀或后缀,按照原来的key及windowEnd作keyby、聚合
    展开全文
  • 八种解决 Spark 数据倾斜方法

    千次阅读 2019-12-29 21:47:00
    一、什么是数据倾斜对 Spark/Hadoop 这样的分布式大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整体耗...
  • 下面列举了一些常见的导致数据倾斜的场景。 场景 1 : 当一个大表和一个小表 join 时, 如果小表的 key 较集中,将会引起大表中的数据被...针对数据倾斜,业界一般有以下几种解决方案。 1 . 调节参数 可以通过修改 h
  • 如何解决数据倾斜?导读相信很多接触MapReduce的朋友对'数据倾斜'这四个字并不陌生,那么究竟什么是数据倾斜?又该怎样解决这种该死的情况呢? 何为数据倾斜?在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念:...
  • 数据倾斜及其高效解决方法

    万次阅读 多人点赞 2018-11-13 18:20:07
    数据倾斜是大数据领域绕不开的拦路虎,当你所需处理的数据量到达了上亿甚至是千亿条的时候,数据倾斜将是横在你面前一道巨大的坎。很可能有几周甚至几月都要头疼于数据倾斜导致的各类诡异的问题。 数据倾斜是指:...
  • 数据倾斜的原因及解决方案

    千次阅读 2022-05-08 09:03:17
    一、什么是数据倾斜数据倾斜是如何产生的 数据倾斜的根本原因是数据的key分布不均,个别key数据很多,超出了计算节点的计算能力的结果; 过程:数据经过 map后,由于不同key 的数据量分布不均,在shuffle 阶段中...
  • 数据倾斜 如果有10亿数据,一台电脑可能要10小时,现在集群有10台,可能1小时就够了,但是有可能大量的数据集中到一台或几台上,要5小时,发生了数据倾斜,例如: 公司一:总用户量1000万,5台64G内存的服务器。 ...
  • Spark常见数据倾斜现象及解决方案总结归纳
  • Spark 解决数据倾斜的几种常用方法

    千次阅读 多人点赞 2019-06-06 16:13:29
    数据倾斜的调优,就是利用各种技术方案解决不同类型的数据倾斜问题,保证 Spark 作业的性能。 一,数据倾斜原理 一个 Spark 作业,会根据其内部的 Action 操作划分成多个 job,每个 job 内部又会根据 shuffle 操作...
  • 行业分类-设备装置-一种解决数据倾斜方法及装置
  • 数据倾斜如何处理
  • 什么是数据倾斜 数据倾斜的表现 发生数据倾斜的原因 如何解决数据倾斜
  • 数据倾斜解决方案

    2021-02-01 14:22:25
    1)聚合原数据(主要操作的是hive数据库中的数据,先通过hive sql将相同key的数据聚合成一条数据,再进行map操作)当没办法聚合成一条数据时:增大key粒度,从而key的数量会减少,但是每个key对应的数据量会增大,key...
  • 数据倾斜解决方案汇总

    千次阅读 2021-05-06 23:32:50
    数据倾斜解决方案汇总如何解决数据倾斜问题背景1、事前对连接 key 进行预处理2、大表关联小表,一般用 mapjoin3、倾斜数据分而治之4、倾斜数据打散处理总结 如何解决数据倾斜问题 背景 分布式环境下经常会碰到数据...
  • 数据倾斜:由于数据分布不均匀,数据集中在某些 SubTask 上,导致部分 SubTask 处理数据量特别大,执行时间过长,影响了整个应用程序的执行效率。 过多的数据集中在某些 JVM(TaskManager),使得 JVM 的内存资源...
  • 数据倾斜原理与解决方法

    千次阅读 2021-12-14 16:46:39
    数据倾斜的概念 数据倾斜这四个字经常会在学习MapReduce中遇到。所谓数据分区,就是数据分区分布因为数据本身或者分区方法的原因变得极为不一致,大量的数据被划分到了同一个区。由于Reducer Task每次处理一个区的...
  • 【HIVE数据倾斜常见解决办法】

    千次阅读 2022-04-04 18:19:18
    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录前言一、pandas是什么?...示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。 二、使用步骤 1.引入库 代.
  • hive数据倾斜解决办法

    千次阅读 2021-01-14 12:49:33
    数据倾斜是进行大数据...在hive中遇到数据倾斜解决办法:一、倾斜原因:map端缓慢,输入数据文件多,大小不均匀当出现小文件过多,需要合并小文件。可以通过set hive.merge.mapfiles=true来解决。set hive.map.a...
  • (2)解决思路:Hive是分阶段执行的,map处理数据量的差异取决于上一个stage的reduce输出,所以解决的根本方法就是如何将数据均匀的分布到各个reduce中 (3)出现数据倾斜的主要操作: (a)join:使用join时,一个...
  • Spark解决数据倾斜的几种方式

    千次阅读 2022-03-17 21:03:53
    相当于将数据倾斜提前到Hive中,Hive的底层是MapReduce,运行稳定,不容易失败,而Spark如果出现数据倾斜,很容易崩溃报错。 2、过滤导致少数倾斜的key 比如数据中有很多null的数据,对业务无影响的前提下,可以在...
  • 解决数据倾斜的几种方法

    千次阅读 2019-12-01 21:42:05
    (注意,这个只会缓解数据倾斜,使得每个excutor可以处理更少的key,但如果一个key的数目超级多,还是无法解决) 3.利用广播变量调优。join的时候,将数据量小的一方作为广播变量。 4.拆解热点key。可以rdd.sample...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 40,149
精华内容 16,059
关键字:

解决数据倾斜的方法