精华内容
下载资源
问答
  • SparkStreaming解决数据倾斜方法 两阶段聚合的方式解决数据倾斜 解释: 对DStream 进行map操作对原始key前加上随机值,map完后进行第一次reducebykey操作,此结果为打散key后的reducebykey结果,再次进行map操作...

    SparkStreaming解决数据倾斜方法
    两阶段聚合的方式解决数据倾斜

    在这里插入图片描述

    解释:
    对DStream 进行map操作对原始key前加上随机值,map完后进行第一次reducebykey操作,此结果为打散key后的reducebykey结果,再次进行map操作根据分隔符,去掉随机数保留原有key,map后再进行reducebykey,保证相同key的数据准确累加。

    代码实现

        val dsStream=stream.filter(item => item.value().split("\t").length == 3)//过滤合格的数据
        .mapPartitions(partitions => //对所有分区的数据进行转换
            partitions.map(item => {
              val rand = new Random() //创建随机数对象
              val line = item.value() //获取value
              val arr = line.split("\t")  //切割
              val id = arr(1)       //取第二个元素
              (rand.nextInt(3) + "_" + id, 1)    //随机数与app_id进行拼接,并且返回二元组
            }))
        val result = dsStream.reduceByKey(_ + _)     //重组之后的第一次reduceByKey
        result.map(item => {                                     //进行map转换把key切割还原
          val id = item._1.split("_")(1)
          (id, item._2)
        }).reduceByKey(_ + _).print()                //还原之后进行第二次聚合
    

    想要更多文档资料及更多大数据相关文档资料请加qq群下载:912703269

    展开全文
  • 什么是数据倾斜 数据倾斜的表现 发生数据倾斜的原因 如何解决数据倾斜

    什么是数据倾斜

    • Hadoop能够进行对海量数据进行批处理的核心,在于它的分布式思想,也就是多台服务器(节点)组成集群,进行分布式的数据处理
    • 举例:如果有10亿数据,一台电脑可能要10小时,现在集群有10台,可能1小时就够了,但是有可能大量的数据集中到一台或几台上,要5小时,发生了数据倾斜

    数据倾斜的表现

    • Mapreduce任务
      • reduce阶段 卡在99.99%不动
      • 各种container报错OOM(内存溢出)
      • 读写数据量很大,超过其他正常reduce
    • spark任务
      • 个别task执行很慢
      • 单个执行特别久
      • shuffle出错
      • sparkstreaming做实时算法使,会有executor出现内存溢出,但是其他的使用率很低

    发生数据倾斜的原因

    • shuffle是按照key,来进行values的数据的输出、拉取和聚合的,一旦发生shuffle,所有相同key的值就会拉到一个或几个节点上,个别key对应的数据比较多,就容易发生单个节点处理数据量爆增的情况。
    • key分布不均匀
      • 存在大量相同值的数据
      • 存在大量异常值或者空值
    • 业务数据本身的特性
      • 例如某个分公司或某个城市订单量大幅提升几十倍甚至几百倍,对该城市的订单统计聚合时,容易发生数据倾斜。
    • 某些SQL语句本身就有数据倾斜
      • 两个表中关联字段存在大量空值(解决方法:去除或者加随机数),或是关联字段的数据不统一(解决方法:把数字类型转为字符串类型,统一大小写)
      • join 一个key集中的小表 (解决方法:reduce join 改成 map join)
      • group by维度过小 某值的数量过多 (解决方法:两阶段聚合,放粗粒度)
      • count distinct 某特殊值过多 (解决方法:先用group by)
    • 数据频率倾斜——某一个区域的数据量要远远大于其他区域。
    • 数据大小倾斜——部分记录的大小远远大于平均值。

    如何解决数据倾斜

    聚合类group by操作,发生数据倾斜

    • map段部分聚合

      • 开启Map端聚合参数设置set hive.map.aggr=true
      • 在Map端进行聚合操作的条目数目set hive.grouby.mapaggr.checkinterval=100000
      • 有数据倾斜的时候进行负载均衡(默认是false)set hive.groupby.skewindata = true
    • 阶段拆分-两阶段聚合 需要聚合的key前加一个随机数的前后缀,这样就均匀了,之后再按照原始的key聚合一次

    • 生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。

    • 假设 key = 水果
      select count(substr(a.key,1,2)) as key
      from(
      	select concat(key,'_',cast(round(10*rand())+1 as string)) tmp
      	from table
      	group by tmp
      )a
      group by key
      

    空值产生的数据倾斜

    • 1.在查询的时候,过滤掉所有为NULL的数据,比如:
      SELECT * FROM log a
      JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id = b.user_id
      UNION ALL
      SELECT *FROM log a WHERE a.user_id IS NULL;
      
      2.查询出空值并给其赋上随机数,避免了key值为空(数据倾斜中常用的一种技巧)
      SELECT *FROM log a
      LEFT JOIN bmw_users b ON 
      CASE WHEN a.user_id IS NULL THEN concat(‘dp_hive’, rand()) ELSE a.user_id END = b.user_id;
      

    Reduce join 改为Map join

    • 适用于小表和大表 join,将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD 的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
    • 设置自动选择MapJoin set hive.auto.convert.join = true;默认为true
    • reduce join: 先将所有相同的key,对应的values,汇聚到一个task中,然后再进行join。
    • map reduce:broadcast出去那个小表的数据以后,就会在每个executor的block manager中都驻留一份+map算子来实现与join同样的效果。不会发生shuffe,从根本上杜绝了join操作可能导致的数据倾斜的问题

    少用count(distinct),先用group 去重 再count子查询,

    • 采用sum() group by的方式来替换count(distinct)完成计算。

    • select count(distinct a) from test ;
      select count x.a 
      from (select a from test group by a ) x 
      
      select a, count(distinct b) as c from tbl group by a;
      select a, count(*) as c from (select a, b from tbl group by a, b) group by a;
      

    特殊值分开处理法

    • 当需要把用户表和日志表关联起来时,再日志表中有很多没注册的用户表,可以分开处理

    • select *from
      (select * from logs where user_id = 0)a
      join
      (select * from users where user_id = 0)b
      on a.user_id = b.user_id
      union all
      select * from logs a join users b on a.user_id <> 0 and a.user_id = b.user_id;
      

    大表 join 大表

    • 将有大表中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD笛卡尔积,相当于将其数据增到到原来的N倍,N即为随机前缀的总个数)然后将二者Join后去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。
    • RDD扩容

    不同数据类型关联产生数据倾斜

    • 一张表 s8_log,每个商品一条记录,要和商品表关联。**s8_log 中有字符串商品 id,也有数字的商品 id。**字符串商品 id 类型是 string 的,但商品中的数字 id 是 bigint 的。

    • 问题的原因是把 s8_log 的商品 id 转成数字 id 做 Hash(数字的 Hash 值为其本身,相同的字符串的 Hash 也不同)来分配 Reducer,所以相同字符串 id 的 s8_log,都到一个 Reducer 上了。

    • -- 把数字类型转换成字符串类型
      SELECT *
      FROM s8_log a
      LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);
      

    多表 union all 会优化成一个 job

    • 推广效果表要和商品表关联,效果表中的 auction id 列既有商品 id,也有数字 id,和商品表关联得到商品的信息。
    SELECT *
    FROM effect a
      JOIN (
          SELECT auction_id AS auction_id
          FROM auctions
          UNION ALL
          SELECT auction_string_id AS auction_id
          FROM auctions
      ) b
      ON a.auction_id = b.auction_id;
    
    • 结论: 这样子比分别过滤数字 id,字符串 id ,然后分别和商品表关联性能要好。这样写的好处:1个 MR 作业,商品表只读取一次,推广效果表只读取一次。把这个 sql 换成 MR 代码的话,map 的时候,把 a 表的记录打上标签 a ,商品表记录每读取一条,打上标签 t,变成两个<key,value> 对,<t,数字id,value>,<t,字符串id,value>。所以商品表的 HDFS(Hadoop Distributed File System) 读只会是一次。

    • 问题:比如推广效果表要和商品表关联,效果表中的 auction_id 列既有 32 为字符串商 品 id,也有数字 id,和商品表关联得到商品的信息。 比分别过滤数字 id,字符串 id 然后分别和商品表关联性能要好。

    	SELECT * FROM effect a 
    	JOIN 
    	(SELECT auction_id AS auction_id FROM auctions 
    	UNION All 
    	SELECT auction_string_id AS auction_id FROM auctions) b 
    	ON a.auction_id=b.auction_id;	
    
    • 场景:有一张user表,为卖家每天收到表,user_id,ds(日期)为key,属性有主营类目,指标有交易金额,交易笔数。每天要取前10天的总收入,总笔数,和最近一天的主营类目。
       SELECT user_id, substr(MAX(CONCAT(ds, cat)), 9) AS main_cat, SUM(qty), SUM(amt) FROM users
       WHERE ds BETWEEN 20120301 AND 20120329
       GROUP BY user_id
    

    优化in/exists语句

    • hive1.2.1也支持in/exists操作,但还是推荐使用hive的一个高效替代方案:left semi join

    排序选择

    • cluster by: 对同一字段分桶并排序,不能和sort by连用;
    • distribute by + sort by: 分桶,保证同一字段值只存在一个结果文件当中,结合sort by 保证每个reduceTask结果有序;
    • sort by: 单机排序,单个reduce结果有序
    • order by:全局排序,缺陷是只能使用一个reduce
    展开全文
  • 1.数据倾斜的原理和影响 1.1 原理 数据倾斜就是数据的分布严重不均,造成一部分数据很多,一部分数据很少的局面。数据分布理论上都是倾斜的,符合“二八原理”:例如80%的财富集中在20%的人手中、80%的用户只使用...

    1.数据倾斜的原理和影响

    1.1 原理

    数据倾斜就是数据的分布严重不均,造成一部分数据很多,一部分数据很少的局面。数据分布理论上都是倾斜的,符合“二八原理”:例如80%的财富集中在20%的人手中、80%的用户只使用20%的功能、20%的用户贡献了80%的访问量。 数据倾斜的现象,如下图所示。

    1.2 影响

    (1)单点问题

    数据集中在某些分区上(Subtask),导致数据严重不平衡。

    (2)GC 频繁

    过多的数据集中在某些 JVM(TaskManager),使得JVM 的内存资源短缺,导致频繁 GC。

    (3)吞吐下降、延迟增大

    数据单点和频繁 GC 导致吞吐下降、延迟增大。

    (4)系统崩溃

    严重情况下,过长的 GC 导致 TaskManager 失联,系统崩溃。

    数据倾斜的影响

    2.Flink 如何定位数据倾斜?

    步骤1:定位反压

    定位反压有2种方式:Flink Web UI 自带的反压监控(直接方式)、Flink Task Metrics(间接方式)。通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask

    参考:【Flink 精选】如何分析及处理反压?

    步骤2:确定数据倾斜

    Flink Web UI 自带Subtask 接收和发送的数据量。当 Subtasks 之间处理的数据量有较大的差距,则该 Subtask 出现数据倾斜。如下图所示,红框内的 Subtask 出现数据热点。

    Web UI 数据量监控

    3.Flink 如何处理常见数据倾斜?

    优化前

    场景一:数据源 source 消费不均匀 

    解决思路:通过调整并发度,解决数据源消费不均匀或者数据源反压的情况。例如kafka数据源,可以调整 KafkaSource 的并发度解决消费不均匀。调整并发度的原则:KafkaSource 并发度与 kafka 分区数是一样的,或者 kafka 分区数是KafkaSource 并发度的整数倍

    场景二:key 分布不均匀的无统计场景

    说明:key 分布不均匀的无统计场景,例如上游数据分布不均匀,使用keyBy来打散数据

    解决思路: 通过添加随机前缀,打散 key 的分布,使得数据不会集中在几个 Subtask

    优化后

    具体措施:

    ① 在原来分区 key/uid 的基础上,加上随机的前缀或者后缀

    使用数据到达的顺序seq,作为分区的key。

    场景三:key 分布不均匀的统计场景

    解决思路:聚合统计前,先进行预聚合,例如两阶段聚合(加盐局部聚合+去盐全局聚合)

    优化后

    两阶段聚合的具体措施:

    预聚合:加盐局部聚合,在原来的 key 上加随机的前缀或者后缀

    聚合:去盐全局聚合,删除预聚合添加的前缀或者后缀,然后进行聚合统计。

    参考: https://www.jianshu.com/p/4ae20202e06d 

    方法1:调整并行度

    方法2:加随机前缀或后缀

    方法3:两阶段聚合(加盐局部聚合+去盐全局聚合)

    展开全文
  • hive数据倾斜解决方法

    2018-10-17 11:20:11
    Hive的过程中经常会碰到数据倾斜问题,数据倾斜基本都发生在group、join等需要数据shuffle的操作中,这些过程需要按照key值进行数据汇集处理,如果key值过于集中,在汇集过程中大部分数据汇集到一台机器上,这就会导致...

    Hive的过程中经常会碰到数据倾斜问题,数据倾斜基本都发生在group、join等需要数据shuffle的操作中,这些过程需要按照key值进行数据汇集处理,如果key值过于集中,在汇集过程中大部分数据汇集到一台机器上,这就会导致数据倾斜。

    具体表现为:作业经常reduce完成在99%后一直卡住,最后的1%花了几个小时都没有跑完。

    常见产生数据倾斜的原因:
    #空值产生的数据倾斜
    #不同数据类型关联产生的数据倾斜
    #关联的key非空,但是某个key值大量重复 #distinct、count(distinct)

    1、 空值产生的数据倾斜场景:
    如日志中,常会有信息丢失的问题,比如全网日志中的user_id,如果取其中的user_id和bmw_users关联,会碰到数据倾斜的问题。
    解决方法1:
    user_id为空的不参与关联

    select * 
    from log a
    join bmw_users b
    on a.user_id is not null
    and a.user_id = b.user_id
    union all
    select * 
    from log a
    where a.user_id is null;

    解决方法2 :
    赋与空值分新的key值(推荐)

    select * 
    from logs a 
    left join bmw_users b 
    on case when a.user_id is null then concat(‘dp_hive’,rand() ) 

    #把空值的key变成一个字符串加上随机数else a.user_id end = b.user_id;

    2、 不同数据类型关联产生数据倾斜场景:
    用户表中user_id字段为int,logs表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的join操作时,默认的Hash操作会按照int型的id来进行分配,这样会导致所有string类型的id记录都分配到同一个reduce中。
    解决方法:
    把数字类型转换成字符串类型

    select * 
    from users a
    left join logs b
    on a.user_id = cast(b.user_id as string)

    3、关联的key非空,但是某个key值大量重复
    解决方法:
    加入随机数

    select a.key as key, b.pv as pv 
    from(
    select key 
    from table1 
    where dt='2018-06-18') a
    left join(
    select key, sum(pv) as pv 	 
    from (
    select key,	round(rand()*1000) as rnd, #加入随机数,增加并发度		
    count(1) as pv		
    from table2 
    where dt='2018-06-18' group by key,rnd) tmp    
    group by key) b 
    on a.key = b.key

    4、distinct、count(distinct)
    解决方法:
    用group by 去重
    #distinct替换:
    原始sql:

    select distinct key from A;		

    替换后的sql:

    select key from A group by key;

    #单维度count(distinct)替换
    原始sql:

    select ship_id, count(distinct order_id) as ship_order_num			
    from table A			
    where dt = '2018-06-18' 
    group by ship_id;

    替换后的sql:

    select ship_id, count(1) as ship_order_num			
    from 			
    (select ship_id, order_id 
    from table A 
    where dt = '2018-06-18' 
    group by ship_id, order_id) t			
    group by ship_id;	

    #多维度count(distinct)替换 —每个维度单独处理后关联

    展开全文
  • 数据倾斜及其高效解决方法

    万次阅读 多人点赞 2018-11-13 18:20:07
    数据倾斜是大数据领域绕不开的拦路虎,当你所需处理的数据量到达了上亿甚至是千亿条的时候,数据倾斜将是横在你面前一道巨大的坎。很可能有几周甚至几月都要头疼于数据倾斜导致的各类诡异的问题。 数据倾斜是指:...
  • Spark 解决数据倾斜的几种常用方法

    千次阅读 多人点赞 2019-06-06 16:13:29
    数据倾斜的调优,就是利用各种技术方案解决不同类型的数据倾斜问题,保证 Spark 作业的性能。 一,数据倾斜原理 一个 Spark 作业,会根据其内部的 Action 操作划分成多个 job,每个 job 内部又会根据 shuffle 操作...
  • 在Spark计算平台中,数据倾斜...提出了广播机制避免Shuffle过程数据倾斜方法,分析了广播变量分发逻辑过程,给出广播变量性能优势分析和该方法的算法实现.通过Broadcast Join实验验证了该方法在性能上有稳定的提升.
  • 行业分类-设备装置-一种解决数据倾斜方法及装置
  • 解决数据倾斜的几种方法

    千次阅读 2019-12-01 21:42:05
    (注意,这个只会缓解数据倾斜,使得每个excutor可以处理更少的key,但如果一个key的数目超级多,还是无法解决) 3.利用广播变量调优。join的时候,将数据量小的一方作为广播变量。 4.拆解热点key。可以rdd.sample...
  • 八种解决 Spark 数据倾斜方法

    千次阅读 2019-12-29 21:47:00
    一、什么是数据倾斜对 Spark/Hadoop 这样的分布式大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整体耗...
  • Hive数据倾斜解决方法总结

    千次阅读 2018-01-05 12:09:26
    Hive数据倾斜解决方法总结  数据倾斜是进行大数据计算时最经常遇到的问题之一。当我们在执行HiveQL或者运行MapReduce作业时候,如果遇到一直卡在map100%,reduce99%一般就是遇到了数据倾斜的问题。数据倾斜其实是...
  • Hive数据倾斜解决方法

    千次阅读 2019-07-16 18:47:27
    Hive学习之路 (十九)Hive的数据倾斜 2 个人光环大数据学习 Hive面试常问: hq语句 优化 sort by order by distribute by 分区表 分桶表的区别 内部表 外部表的区别 数据倾斜:数据分布不均匀 hive底层的执行引擎 ...
  • 什么是数据倾斜?如何解决数据倾斜?

    千次阅读 2019-03-28 14:31:33
    相信很多接触MapReduce的朋友对’数据倾斜’这四个字并不陌生,那么究竟什么是数据倾斜?又该怎样解决这种该死的情况呢? 何为数据倾斜? 在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布理论上...
  • 导读相信很多接触MapReduce的朋友对'数据倾斜'这四个字并不陌生,那么究竟什么是数据倾斜?又该怎样解决这种该死的情况呢?何为数据倾斜?在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布...
  • (2)解决思路:Hive是分阶段执行的,map处理数据量的差异取决于上一个stage的reduce输出,所以解决的根本方法就是如何将数据均匀的分布到各个reduce中 (3)出现数据倾斜的主要操作: (a)join:使用join时,一个...
  • Spark数据倾斜解决原理和方法总论

    千次阅读 2016-09-10 12:41:05
    2、Spark数据倾斜解决方法总论一:均衡数据是我们的目标,或者说我们要解决数据倾斜的发力点。一般说shuffle是产生数据倾斜的主要原因,为什么shuffle产生数据倾斜主要是因为网络通信,如果计算之前通过ETL(ETL...
  • Spark 之 解决数据倾斜(一)

    万次阅读 2021-06-14 00:56:11
    Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。 例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配到...
  • Hive解决数据倾斜问题

    千次阅读 2018-12-19 19:44:08
    什么是数据倾斜以及数据倾斜是怎么产生的? 简单来说数据倾斜就是数据的key 的分化严重不均,造成一部分数据很多,一部分数据很少的局面。 举个 word count 的入门例子,它的map 阶段就是形成 (“aaa”,1)...
  • Hive之数据倾斜的原因和解决方法

    千次阅读 2018-09-02 22:22:44
    数据倾斜 在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要是因为在Job完成后的所得到的Counters是整个Job的总和,优化是基于这些Counters得出的平均值,而由于数据倾斜...
  • hive数据倾斜原因和解决方法

    万次阅读 2018-04-19 12:01:22
    在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要是因为在Job完成后的所得到的Counters是整个Job的总和,优化是基于这些Counters得出的平均值,而由于数据倾斜的原因...
  • 解决Spark 数据倾斜的八种实用方法 什么是数据倾斜? 对 Spark/Hadoop 这样的分布式大数据系统来讲数据量大并不可怕可怕的是数据倾斜 对于分布式系统而言理想情况下随着系统规模(节点数量)的增加应用整体耗时线性下降...
  • 数据倾斜产生的原因 数据倾斜的原因很大部分是join倾斜和聚合倾斜两大类 Hive倾斜之group by聚合倾斜 原因: 分组的维度过少,每个维度的值过多,导致处理某值的reduce耗时很久; 对一些类型统计的时候某种...
  • 什么是数据倾斜?对 Spark/Hadoop 这样的分布式大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整
  • 常见数据倾斜解决方法

    千次阅读 2020-04-01 14:41:14
    1.group by导致数据倾斜 设置hive.map.aggr:默认为true,在map端做聚合,推荐使用 设置hive.groupby.skewindata:reduce操作的时候,相同key值并不是都给同一个reduce,而是随机分发到各个reduece做聚合。...
  • 数据倾斜解决方案

    千次阅读 2018-09-05 21:43:38
    数据倾斜定义 简单的讲,数据倾斜就是我们在数据计算的时候,由于数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,这些机器的计算速度远远低于整个集群的平均计算速度,导致整个计算过程十分缓慢...
  • 数据倾斜概述 简单来说数据倾斜就是数据的key的分化严重不均,造成一部分数据很多,一部分数据很少的情况。举个word count的入门例子,在map阶段形成了(’“hello”,1)的形式,然后在reduce阶段进行value统计,算...
  • 怎么解决数据倾斜问题?

    千次阅读 2019-04-19 14:35:32
    相信大家在工作中一定遇到过数据倾斜的问题,读完本文,你会了解到数据倾斜的定义及其危害、产生的原因及应对措施、常见倾斜场景及解决办法等知识,相信对你今后处理数据倾斜问题会有一定的帮助。 目前流行的大数据...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 34,249
精华内容 13,699
关键字:

解决数据倾斜的方法