为您推荐:
精华内容
最热下载
问答
  • 5、大表join大表优化  如果Hive优化实战2中mapjoin中小表dim_seller很大呢?比如超过了1GB大小?这种就是大表join大表的问题。首先引入一个具体的问题场景,然后基于此介绍各自优化方案。  5.1、问题场景  ...

    原文链接:http://www.520mwx.com/view/5677

     

      5、大表join大表优化

          如果Hive优化实战2中mapjoin中小表dim_seller很大呢?比如超过了1GB大小?这种就是大表join大表的问题。首先引入一个具体的问题场景,然后基于此介绍各自优化方案。

       5.1、问题场景

          问题场景如下:

          A表为一个汇总表,汇总的是卖家买家最近N天交易汇总信息,即对于每个卖家最近N天,其每个买家共成交了多少单,总金额是多少,假设N取90天,汇总值仅取成交单数。

          A表的字段有:buyer_id、seller_id、pay_cnt_90day。

          B表为卖家基本信息表,其字段有seller_id、sale_level,其中sale_levels是卖家的一个分层评级信息,比如吧卖家分为6个级别:S0、S1、S2、S3、S4和S5。

          要获得的结果是每个买家在各个级别的卖家的成交比例信息,比如:

          某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S5:10%。

          正如mapjoin中的例子一样,第一反应是直接join两表并统计:

          select
             m.buyer_id,
            sum(pay_cnt_90day)  as pay_cnt_90day,
            sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
            sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
            sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
            sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
            sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
            sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
          from (
            select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
            from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
            join
                   (select seller_id,  sale_level  from table_B)  b
            on  a.seller_id  = b.seller_id
            )  m
          group by m.buyer_id

          但是此SQL会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分的卖家90天内买家的数目并不多,join table_A和table_B的时候,

        ODPS会按照seller_id进行分发,table_A的大卖家引起了数据倾斜。

          但是数据本身无法用mapjoin table_B解决,因为卖家超过千万条,文件大小有几个GB,超过了1GB的限制。

       5.2、优化方案1:转为mapjoin

          一个很正常的想法是,尽管B表无法直接mapjoin, 但是是否可以间接mapjoin它呢?

          实际上此思路有两种途径:限制行和限制列。

          限制行的思路是不需要join B全表,而只需要join其在A表中存在的,对于本问题场景,就是过滤掉90天内没有成交的卖家。

          限制列的思路是只取需要的字段。

          加上如上的限制后,检查过滤后的B表是否满足了Hive  mapjoin的条件,如果能满足,那么添加过滤条件生成一个临时B表,然后mapjoin该表即可。采用此思路的语句如下:

          

          select
             m.buyer_id,
            sum(pay_cnt_90day)  as pay_cnt_90day,
            sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
            sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
            sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
            sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
            sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
            sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
          from ( 
            select  /*+mapjoin(b)*/
              a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
            from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
            join
                   (
               select seller_id,  sale_level  from table_B b0
               join 
               (select seller_id from table_A group by seller_id) a0
                 on b0.seller_id = a0.selller_id
              )  b
            on  a.seller_id  = b.seller_id
            )  m
          group by m.buyer_id

          此方案在一些情况可以起作用,但是很多时候还是无法解决上述问题,因为大部分卖家尽管90天内买家不多,但还是有一些的,过滤后的B表仍然很多。

      5.3、优化方案2:join时用case when语句

          此种解决方案应用场景是:倾斜的值是明确的而且数量很少,比如null值引起的倾斜。其核心是将这些引起倾斜的值随机分发到Reduce,其主要核心逻辑在于join时对这些特殊值concat随机数,

        从而达到随机分发的目的。此方案的核心逻辑如下:

    select a.user_id, a.order_id, b.user_id
    from table_a a join table_b b
    on (case when a.user_is is null then concat('hive', rand()) else a.user_id end) = b.user_id

          Hive 已对此进行了优化,只需要设置参数skewinfo和skewjoin参数,不修改SQL代码,例如,由于table_B的值“0” 和“1”引起了倾斜,值需要做如下设置:

          set hive.optimize.skewinfo=table_B:(selleer_id) [ ( "0") ("1") ) ] 
          set hive.optimize.skewjoin = true;

          但是方案2因为无法解决本问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化。

       

      5.4 、优化方案3:倍数B表,再取模join

         1、通用方案

          此方案的思路是建立一个numbers表,其值只有一列int 行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。代码如下:      

          select
             m.buyer_id,
            sum(pay_cnt_90day)  as pay_cnt_90day,
            sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
            sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
            sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
            sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
            sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
            sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
          from (
            select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
            from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
            join
                   (
              select  /*+mapjoin(members)*/
                seller_id,  sale_level ,member
              from table_B
                join members
              )  b
            on  a.seller_id  = b.seller_id
              and mod(a.pay_cnt_90day,10)+1 = b.number 
            )  m
          group by m.buyer_id

             此思路的核心在于,既然按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少到原来的1/10,可以通过配置numbers表改放大倍数来降低倾斜程度,

          但这样做的一个弊端是B表也会膨胀N倍。

        2、专用方案

            通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(

          比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。

            在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持不变。具体代码如下:

          

          select
             m.buyer_id,
            sum(pay_cnt_90day)  as pay_cnt_90day,
            sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
            sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
            sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
            sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
            sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
            sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
          from (
            select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
            from (  
              select  /*+mapjoin(big)*/
                 buyer_id,  seller_id,  pay_cnt_90day,
                 if(big.seller_id is not null, concat(  table_A.seller_id,  'rnd',  cast(  rand() * 1000 as bigint ), table_A.seller_id)  as seller_id_joinkey
                  from table_A
                   left outer join
                 --big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变
                 (select seller_id  from dim_big_seller  group by seller_id)big
                 on table_A.seller_id = big.seller_id
            )  a
            join
                   (
              select  /*+mapjoin(big)*/
                seller_id,  sale_level ,
                --big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样
                coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey
              from table_B
                left out join
              --table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变
              (select seller_id, seller_id_joinkey from dim_big_seller) big
              on table_B.seller_id= big.seller_id
              )  b
            on  a.seller_id_joinkey= b.seller_id_joinkey
              and mod(a.pay_cnt_90day,10)+1 = b.number 
            )  m
          group by m.buyer_id

          相比通用方案,专用方案的运行效率明细好了许多,因为只是将B表中大卖家的行数放大了1000倍,其它卖家的行数保持不变,但同时代码复杂了很多,而且必须首先建立大数据表。

       5.5 、方案4:动态一分为二

          实际上方案2和3都用了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案是动态一分为二,即对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,倾斜的把他们找出来做mapjoin,最后union all其结果即可。

          但是此种解决方案比较麻烦,代码复杂而且需要一个临时表存放倾斜的键值。代码如下:

          --由于数据倾斜,先找出90天买家超过10000的卖家

          insert overwrite table  temp_table_B
          select 
            m.seller_id,  n.sale_level
          from (
            select   seller_id
            from (
              select seller_id,count(buyer_id) as byr_cnt
              from table_A
              group by seller_id
              ) a
            where a.byr_cnt >10000
            ) m
          left join 
          (
           select seller_id, sale_level  from table_B
          ) n
            on m.seller_id = n.seller_id;
          --对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可。
          select
             m.buyer_id,
            sum(pay_cnt_90day)  as pay_cnt_90day,
            sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
            sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
            sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
            sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
            sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
            sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
          from (
            select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
            from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
            join
                   (
              select seller_id,  a.sale_level 
               from table_A  a
               left join temp_table_B b
              on a.seller_id = b.seller_id
              where b.seller_id is not null
              )  b
            on  a.seller_id  = b.seller_id
           union all
           select  /*+mapjoin(b)*/
              a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
            from ( 
               select buyer_id,  seller_id,  pay_cnt_90day   
              from table_A
              )  a
            join
                   (
               select seller_id,  sale_level  from table_B 
              )  b
            on  a.seller_id  = b.seller_id
         )  m  group by m.buyer_id
         ) m
         group by m.buyer_id
    

        总结:方案1、2以及方案3中的同用方案不能保证解决大表join大表问题,因为它们都存在种种不同的限制和特定使用场景。

        而方案3的专用方案和方案4是推荐的优化方案,但是它们都需要新建一个临时表来存储每日动态变化的大卖家。相对方案4来说,方案3的专用方案不需要对代码框架进行修改,但是B表会被放大,所以一定要是是维度表,不然统计结果会是错误的。方案4最通用,自由度最高,但是对代码的更改也最大,甚至修改更难代码框架,可以作为终极方案使用。

     

     

    原文链接:http://www.520mwx.com/view/5677

    展开全文
    lovedieya 2020-08-10 15:13:33
  • 摘要: MAPJOIN 当一个大表和一个或多个小表做JOIN时,最好使用MAPJOIN,性能比普通的JOIN要快很多。 另外,MAPJOIN 还能解决数据倾斜的问题。 MAPJOIN的基本原理是:在小数据量情况下,SQL会将用户指定的小表全部...

    摘要: MAPJOIN 当一个大表和一个或多个小表做JOIN时,最好使用MAPJOIN,性能比普通的JOIN要快很多。 另外,MAPJOIN 还能解决数据倾斜的问题。 MAPJOIN的基本原理是:在小数据量情况下,SQL会将用户指定的小表全部加载到执行JOIN操作的程序的内存中,从而加快JOIN的执行速度。

    1、小、大表 join

    在小表和大表进行join时,将小表放在前边,效率会高。hive会将小表进行缓存。

    2、mapjoin

    使用mapjoin将小表放入内存,在map端和大表逐一匹配。从而省去reduce。

    样例:

    select /*+MAPJOIN(b)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1
    
    缓存多张小表:
    select /*+MAPJOIN(b,c)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1 JOIN tbalec c on a.a1=c.c1
    
    mapjoin的join发生在map阶段,join的join发生在reduce阶段,mapjoin可以提高效率

    使用

    方法一:

    在Hive0.11前,必须使用MAPJOIN来标记显示地启动该优化操作,由于其需要将小表加载进内存所以要注意小表的大小

    SELECT /*+ MAPJOIN(smalltable)*/  .key,value
    FROM smalltable JOIN bigtable ON smalltable.key = bigtable.key

    方法二

    在Hive0.11后,Hive默认启动该优化,也就是不在需要显示的使用MAPJOIN标记,其会在必要的时候触发该优化操作将普通JOIN转换成MapJoin,可以通过以下两个属性来设置该优化的触发时机

    hive.auto.convert.join

    默认值为true,自动开户MAPJOIN优化

    hive.mapjoin.smalltable.filesize

    默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中

     

    注意:使用默认启动该优化的方式如果出现默名奇妙的BUG(比如MAPJOIN并不起作用),就将以下两个属性置为fase手动使用MAPJOIN标记来启动该优化

    hive.auto.convert.join=false(关闭自动MAPJOIN转换操作)
    hive.ignore.mapjoin.hint=false(不忽略MAPJOIN标记)

     

    对于以下查询是不支持使用方法二(MAPJOIN标记)来启动该优化的

    select /*+MAPJOIN(smallTableTwo)*/ idOne, idTwo, value FROM
      ( select /*+MAPJOIN(smallTableOne)*/ idOne, idTwo, value FROM
        bigTable JOIN smallTableOne on (bigTable.idOne = smallTableOne.idOne)                                                  
      ) firstjoin                                                            
      JOIN                                                                 
      smallTableTwo ON (firstjoin.idTwo = smallTableTwo.idTwo)  

    但是,如果使用的是方法一即没有MAPJOIN标记则以上查询语句将会被作为两个MJ执行,进一步的,如果预先知道表大小是能够被加载进内存的,则可以通过以下属性来将两个MJ合并成一个MJ

    hive.auto.convert.join.noconditionaltask:Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,并是否将多个MJ合并成一个
    hive.auto.convert.join.noconditionaltask.size:多个MJ合并成一个MJ时,其表的总的大小须小于该值,同时hive.auto.convert.join.noconditionaltask必须为true

    MAPJOIN

    当一个大表和一个或多个小表做JOIN时,最好使用MAPJOIN,性能比普通的JOIN要快很多。 另外,MAPJOIN 还能解决数据倾斜的问题。
    MAPJOIN的基本原理是:在小数据量情况下,SQL会将用户指定的小表全部加载到执行JOIN操作的程序的内存中,从而加快JOIN的执行速度。
    使用MAPJOIN时,需要注意:

    • LEFT OUTER JOIN的左表必须是大表;
    • RIGHT OUTER JOIN的右表必须是大表;
    • INNER JOIN左表或右表均可以作为大表;
    • FULL OUTER JOIN不能使用MAPJOIN;
    • MAPJOIN支持小表为子查询;
    • 使用MAPJOIN时需要引用小表或是子查询时,需要引用别名;
    • 在MAPJOIN中,可以使用不等值连接或者使用OR连接多个条件;
    • 目前ODPS在MAPJOIN中最多支持指定6张小表,否则报语法错误;
    • 如果使用MAPJOIN,则所有小表占用的内存总和不得超过512M(解压后的逻辑数据量)。

    MAPJOIN 判定逻辑:

    同时满足下面2个条件:
    1) Join阶段 max(join instance 运行时间) > 10分钟 && max( join instance 运行时间 ) > 2 * avg( join instance 运行时间 )
    2) 参与join 的最小表数据量小于100M (解压前的逻辑数据量)

    MAPJOIN 内存自定义设置:

    set odps.sql.mapjoin.memory.max=512
    设置mapjoin时小表的最大内存,默认512,单位M,[128,2048]之间调整

    举例

    这个例子比较综合,既涉及到了数据倾斜问题,又涉及到当“小表”不是很小时(>512M)如何利用mapjoin.
    场景:

      select * from log a
      left outer join users b
      on a.user_id = b.user_id;

    日志表(log)通常来说是记录数比较多的,但用户表(users)也不小,600W+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
    解决方法:

      select /*+mapjoin(b)*/
      * 
      from log a
      left outer join (
        select  /*+mapjoin(c)*/
        d.*
        from ( select distinct user_id from log ) c
        join users d
        on c.user_id = d.user_id
        ) b
      on a.user_id = b.user_id;

    这种解决方法的前提场景是:每日的会员uv不会太多,即 log 表中的 count(distinct user_id) 不会太大。

    展开全文
    u012036736 2018-12-12 20:18:37
  • 大表join大表优化 如果Hive优化实战2中mapjoin中小表dim_seller很大呢?比如超过了1GB大小?这种就是大表join大表的问题。首先引入一个具体的问题场景,然后基于此介绍各自优化方案。 5.1、问题场景 问题场景如下...

    大表join大表优化

    如果Hive优化实战2中mapjoin中小表dim_seller很大呢?比如超过了1GB大小?这种就是大表join大表的问题。首先引入一个具体的问题场景,然后基于此介绍各自优化方案。

    5.1、问题场景

    问题场景如下:

    A表为一个汇总表,汇总的是卖家买家最近N天交易汇总信息,即对于每个卖家最近N天,其每个买家共成交了多少单,总金额是多少,假设N取90天,汇总值仅取成交单数。

    A表的字段有:buyer_id、seller_id、pay_cnt_90day。

    B表为卖家基本信息表,其字段有seller_id、sale_level,其中sale_levels是卖家的一个分层评级信息,比如吧卖家分为6个级别:S0、S1、S2、S3、S4和S5。

    要获得的结果是每个买家在各个级别的卖家的成交比例信息,比如:

    某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S5:10%。

    正如mapjoin中的例子一样,第一反应是直接join两表并统计:

    select

    m.buyer_id,

    sum(pay_cnt_90day) as pay_cnt_90day,

    sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,

    sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,

    sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,

    sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,

    sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,

    sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5

    from (

    select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day

    from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a

    join

    (select seller_id, sale_level from table_B) b

    on a.seller_id = b.seller_id

    ) m

    group by m.buyer_id

    但是此SQL会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上千万的买家,但是大部分的卖家90天内买家的数目并不多,join table_A和table_B的时候,

    ODPS会按照seller_id进行分发,table_A的大卖家引起了数据倾斜。

    但是数据本身无法用mapjoin table_B解决,因为卖家超过千万条,文件大小有几个GB,超过了1GB的限制。

    5.2、优化方案1

    一个很正常的想法是,尽管B表无法直接mapjoin, 但是是否可以间接mapjoin它呢?

    实际上此思路有两种途径:限制行和限制列。

    限制行的思路是不需要join B全表,而只需要join其在A表中存在的,对于本问题场景,就是过滤掉90天内没有成交的卖家。

    限制列的思路是只取需要的字段。

    加上如上的限制后,检查过滤后的B表是否满足了Hive mapjoin的条件,如果能满足,那么添加过滤条件生成一个临时B表,然后mapjoin该表即可。采用此思路的语句如下:

    select

    m.buyer_id,

    sum(pay_cnt_90day) as pay_cnt_90day,

    sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,

    sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,

    sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,

    sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,

    sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,

    sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5

    from (

    select /*+mapjoin(b)*/

    a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day

    from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a

    join

    (

    select seller_id, sale_level from table_B b0

    join

    (select seller_id from table_A group by seller_id) a0

    on b0.seller_id = a0.selller_id

    ) b

    on a.seller_id = b.seller_id

    ) m

    group by m.buyer_id

    此方案在一些情况可以起作用,但是很多时候还是无法解决上述问题,因为大部分卖家尽管90天内买家不多,但还是有一些的,过滤后的B表仍然很多。

    5.3、优化方案2

    此种解决方案应用场景是:倾斜的值是明确的而且数量很少,比如null值引起的倾斜。其核心是将这些引起倾斜的值随机分发到Reduce,其主要核心逻辑在于join时对这些特殊值concat随机数,

    从而达到随机分发的目的。此方案的核心逻辑如下:

    select a.user_id, a.order_id, b.user_id

    from table_a a join table_b b

    on (case when a.user_is is null then concat('hive', rand()) else a.user_id end) = b.user_id

    Hive 已对此进行了优化,只需要设置参数skewinfo和skewjoin参数,不修改SQL代码,例如,由于table_B的值“0” 和“1”引起了倾斜,值需要做如下设置:

    set hive.optimize.skewinfo=table_B:(selleer_id) [ ( "0") ("1") ) ]

    set hive.optimize.skewjoin = true;

    但是方案2因为无法解决本问题场景的倾斜问题,因为倾斜的卖家大量存在而且动态变化。

    5.4 、优化方案3:倍数B表,在取模join

    1、通用方案

    此方案的思路是建立一个numbers表,其值只有一列int 行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。代码如下:

     

    select

    m.buyer_id,

    sum(pay_cnt_90day) as pay_cnt_90day,

    sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,

    sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,

    sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,

    sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,

    sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,

    sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5

    from (

    select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day

    from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a

    join

    (

    select /*+mapjoin(members)*/

    seller_id, sale_level ,member

    from table_B

    join members

    ) b

    on a.seller_id = b.seller_id

    and mod(a.pay_cnt_90day,10)+1 = b.number

    ) m

    group by m.buyer_id

    此思路的核心在于,既然按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少到原来的1/10,可以通过配置numbers表改放大倍数来降低倾斜程度,

    但这样做的一个弊端是B表也会膨胀N倍。

    2、专用方案

    通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(

    比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。

    在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持不变。具体代码如下:

    select

    m.buyer_id,

    sum(pay_cnt_90day) as pay_cnt_90day,

    sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,

    sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,

    sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,

    sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,

    sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,

    sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5

    from (

    select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day

    from (

    select /*+mapjoin(big)*/

    buyer_id, seller_id, pay_cnt_90day,

    if(big.seller_id is not null, concat( table_A.seller_id, 'rnd', cast( rand() * 1000 as bigint ), table_A.seller_id) as seller_id_joinkey

    from table_A

    left outer join

    --big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变

    (select seller_id from dim_big_seller group by seller_id)big

    on table_A.seller_id = big.seller_id

    ) a

    join

    (

    select /*+mapjoin(big)*/

    seller_id, sale_level ,

    --big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样

    coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey

    from table_B

    left out join

    --table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变

    (select seller_id, seller_id_joinkey from dim_big_seller) big

    on table_B.seller_id= big.seller_id

    ) b

    on a.seller_id_joinkey= b.seller_id_joinkey

    and mod(a.pay_cnt_90day,10)+1 = b.number

    ) m

    group by m.buyer_id

    相比通用方案,专用方案的运行效率明细好了许多,因为只是将B表中大卖家的行数放大了1000倍,其它卖家的行数保持不变,但同时代码复杂了很多,而且必须首先建立大数据表。

    5.5 、动态一分为二

    实际上方案2和3都用了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案是动态一分为二,即对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,

    倾斜的把他们找出来做mapjoin,最后union all其结果即可。

    但是此种解决方案比较麻烦,代码复杂而且需要一个临时表存放倾斜的键值。代码如下:

    --由于数据倾斜,先找出90天买家超过10000的卖家

    insert overwrite table temp_table_B

    select

    m.seller_id, n.sale_level

    from (

    select seller_id

    from (

    select seller_id,count(buyer_id) as byr_cnt

    from table_A

    group by seller_id

    ) a

    where a.byr_cnt >10000

    ) m

    left join

    (

    select seller_id, sale_level from table_B

    ) n

    on m.seller_id = n.seller_id;

    --对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可。

    select

    m.buyer_id,

    sum(pay_cnt_90day) as pay_cnt_90day,

    sum(case when m.sale_level = 0 then pay_cnt_90day end) as pay_cnt_90day_s0,

    sum(case when m.sale_level = 1 then pay_cnt_90day end) as pay_cnt_90day_s1,

    sum(case when m.sale_level = 2 then pay_cnt_90day end) as pay_cnt_90day_s2,

    sum(case when m.sale_level = 3 then pay_cnt_90day end) as pay_cnt_90day_s3,

    sum(case when m.sale_level = 4 then pay_cnt_90day end) as pay_cnt_90day_s4,

    sum(case when m.sale_level = 5 then pay_cnt_90day end) as pay_cnt_90day_s5

    from (

    select a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day

    from ( select buyer_id, seller_id, pay_cnt_90day from table_A) a

    join

    (

    select seller_id, a.sale_level

    from table_A a

    left join temp_table_B b

    on a.seller_id = b.seller_id

    where b.seller_id is not null

    ) b

    on a.seller_id = b.seller_id

    union all

    select /*+mapjoin(b)*/

    a.buer_id, a.seller_id, b.sale_level, a.pay_cnt_90day

    from (

    select buyer_id, seller_id, pay_cnt_90day

    from table_A

    ) a

    join

    (

    select seller_id, sale_level from table_B

    ) b

    on a.seller_id = b.seller_id

    ) m group by m.buyer_id

    ) m

    group by m.buyer_id

    总结:方案1、2以及方案3中的同用方案不能保证解决大表join大表问题,因为它们都存在种种不同的限制和特定使用场景。而方案3的专用方案和方案4是推荐的优化方案,但是它们都需要新建一个临时表

    来存储每日动态变化的大卖家。相对方案4来说,方案3的专用方案不需要对代码框架进行修改,但是B表会被放大,所以一定要是是维度表,不然统计结果会是错误的。方案4最通用,自由度最高,

    但是对代码的更改也最大,甚至修改更难代码框架,可以作为终极方案使用。

    参考资料:《离线和实时大数据开发实战》

    呼叫结果(call_result)与销售历史(sale_history)的join优化:

    CALL_RESULT: 32亿条/444G SALE_HISTORY:17亿条/439G

    • 原逻辑
    • Map: 3255 Reduce: 950 Cumulative CPU: 238867.84 sec HDFS Read: 587550313339 HDFS Write: 725372805057 SUCCESS 28.1MIN
    • 开启中间结果压缩
    • set hive.exec.compress.intermediate=true;
    • set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
    • Map: 3255 Reduce: 950 Cumulative CPU: 268479.06 sec HDFS Read: 587548211067 HDFS Write: 725372805057 SUCCESS 31.6MIN
    • 从结果看cpu的耗时增加,这是压缩解压缩过程的消耗;HDFS读取量略有减少,可能是因为源表是RCFile存储,本身已经压缩导致,因此整体时间上没有明显减少。
    • 开启中间和最终压缩
    • set hive.exec.compress.intermediate=true;
    • set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
    • set hive.exec.compress.output=true;
    • set mapred.output.compression.codec=org.apache.hadoop.io.compress.GZipCodec
    • Map: 3255 Reduce: 950 Cumulative CPU: 264034.27 sec HDFS Read: 587546058107 HDFS Write: 136021543504 SUCCESS 24.7MIN
    • 从结果看HDFS write明显减少近6倍,整体运行时间有所降低
    • 设置map数量减少一倍
    • set mapred.max.split.size=512000000
    • Map: 1684 Reduce: 950 Cumulative CPU: 191656.39 sec HDFS Read: 585689265249 HDFS Write: 725372805057 SUCCESS 22.9MIN
    • map数减少一倍后,消耗cpu资源减少;整体运行时间略有下降
    • 只开启JVM重用(10)
    • set mapred.job.reuse.jvm.num.tasks=10;
    • Map: 3255 Reduce: 950 Cumulative CPU: 259683.41 sec HDFS Read: 587550076795 HDFS Write: 725372805057 SUCCESS 28.9MIN
    • CPU开销增加,总运行时间没有变化
    • 减少map数并设置JVM重用(10)
    • Map: 1684 Reduce: 950 Cumulative CPU: 223036.3 sec HDFS Read: 585692215905 HDFS Write: 725372805057 SUCCESS 29.4MIN 效果不大
    • 减少map数并开启压缩
    • Map: 1684 Reduce: 950 Cumulative CPU: 251331.5 sec HDFS Read: 585697165921 HDFS Write: 136021488023 SUCCESS 26.1MIN
    • 开启中间压缩,对于输入数据量有少许减少,但是cpu开销增大,对于单stage任务总体不理想
    • 减少map数并开启最终压缩
    • Map: 1687 Reduce: 951 Cumulative CPU: 234941.99 sec HDFS Read: 586512467704 HDFS Write: 136164828062 SUCCESS 24.8MIN
    • 只开启结果压缩,cpu资源消耗较之前有所减少,写入数据量明显降低,性能有提升

    总体来看,效果都不明显;hive默认使用reduce side join,当两个表中有一个较小的时候可以考虑map join ,但这两个表都是大表,可以尝试使用bucket map join;基本处理方法是将两个表在join key上做hash bucket,将较小表(sale_history)的bucket设置为较大表(call_result)的数倍。这样数据就会按照join key做hash bucket。这样做的话,小表依然会复制到各个节点上,map join的时候,小表的每一组bucket加载成hashtable,与对应的大表bucket做局部join。

    如果两表的join key 都具有唯一性(是主键关联),还可以进一步做sort merge bucket map join ;做法是两表都做bucket的基础上,已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击每个bucket内部还要进行排序,这样做得好处就是在两边的bucket要做局部join的时候,用类似merge sort算法中的merge操作一样把两个bucket顺序遍历一下即可。

    然而以上两种方法经过测试依然没有太好的性能表现;稳定在20min之内已经不错了,又要考虑从源库抽取数据如何保留等问题,最终无法采用,后经过和业务系统沟通,两表每天数据量巨大,业务系统不会更新历史数据,每个表当天的数据是一一对应的,即当天的呼叫和销售历史是对应的,因此将程序优化为当天增量数据关联,数据下降几个数量级,自然不存在性能问题;

    所以,优化无止境,不一定非技术手段不可,首先基于业务逻辑做优化,要做到业务与技术相结合。

    hive大表优化

    展开全文
    qingdsj 2020-01-14 12:10:58
  • hive-大表Join的数据偏斜 hive大表Join的数据偏斜 大表Join的数据偏斜 MapReduce编程模型下开发代码需要考虑数据偏斜的问题,Hive代码也是一样。数据偏斜的原因包括以下两点: Map输出key数量极少,导致reduce端...

    hive-大表Join的数据偏斜

    hive—大表Join的数据偏斜
    大表Join的数据偏斜
    MapReduce编程模型下开发代码需要考虑数据偏斜的问题,Hive代码也是一样。数据偏斜的原因包括以下两点:

    1. Map输出key数量极少,导致reduce端退化为单机作业。
    2. Map输出key分布不均,少量key对应大量value,导致reduce端单机瓶颈。
      Hive中我们使用MapJoin解决数据偏斜的问题,即将其中的某个表(全量)分发到所有Map端进行Join,从而避免了reduce。这要求分发的表可以被全量载入内存。
      极限情况下,Join两边的表都是大表,就无法使用MapJoin。
      这种问题最为棘手,目前已知的解决思路有两种:
    3. 如果是上述情况1,考虑先对Join中的一个表去重,以此结果过滤无用信息。这样一般会将其中一个大表转化为小表,再使用MapJoin 。
      一个实例是广告投放效果分析,例如将广告投放者信息表i中的信息填充到广告曝光日志表w中,使用投放者id关联。因为实际广告投放者数量很少(但是投放者信息表i很大),因此可以考虑先在w表中去重查询所有实际广告投放者id列表,以此Join过滤表i,这一结果必然是一个小表,就可以使用MapJoin。
    4. 如果是上述情况2,考虑切分Join中的一个表为多片,以便将切片全部载入内存,然后采用多次MapJoin得到结果。
      一个实例是商品浏览日志分析,例如将商品信息表i中的信息填充到商品浏览日志表w中,使用商品id关联。但是某些热卖商品浏览量很大,造成数据偏斜。例如,以下语句实现了一个innerjoin逻辑,将商品信息表拆分成2个表:
    select * from
    (
    select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat
    from w left outer join i sampletable(1 out of 2 on id) i1
    )
    union all
    (
    select w.id, w.time, w.amount, i2.name, i2.loc, i2.cat
    from w left outer join i sampletable(1 out of 2 on id) i2
    )
    );
    

    以下语句实现了left outer join逻辑:

    select t1.id, t1.time, t1.amount,
        coalease(t1.name,t2.name),
        coalease(t1.loc, t2.loc),
        coalease(t1.cat, t2.cat)
    from (  
        select w.id, w.time,w.amount, i1.name, i1.loc, i1.cat
        from w left outer join isampletable(1 out of 2 on id) i1
    ) t1 left outer join i sampletable(2 out of 2 on id)t2;
    

    上述语句使用Hive的sample table特性对表做切分。

    展开全文
    weixin_37090394 2020-12-21 11:27:58
  • qq_42189083 2018-08-29 09:06:25
  • ytp552200ytp 2020-03-20 09:47:34
  • debimeng 2019-10-02 22:25:10
  • weixin_38610500 2020-12-27 11:16:41
  • weixin_31866177 2021-04-28 01:24:30
  • dijuan6962 2019-10-05 05:23:44
  • weixin_45651336 2019-11-07 23:01:43
  • smile0198 2014-10-25 21:49:25
  • weixin_41919236 2018-11-15 15:05:17
  • weixin_39851307 2020-12-30 09:21:05
  • weixin_32265569 2021-08-07 23:23:05
  • weixin_42523326 2020-12-30 09:21:02
  • u012671925 2019-02-15 18:55:33
  • qq_41018861 2021-01-05 16:55:14
  • qq_26442553 2020-07-04 20:39:09
  • weixin_29159711 2021-01-17 14:14:40
  • qq_26442553 2018-06-30 11:14:07
  • weixin_36342477 2021-03-07 23:37:21
  • weixin_44131414 2021-02-20 13:21:14
  • haijiege 2019-12-11 15:22:13
  • qq_33788242 2019-05-04 01:26:17
  • wisgood 2018-04-24 16:24:12

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 18,860
精华内容 7,544
关键字:

hive大表join大表优化