2016-07-25 17:52:26 lsshlsw 阅读数 14263
  • redis高并发处理由浅入深(备java基础,javaee课程)

    Redis是一款依据BSD开源协议发行的高性能Key-Value存储系统(cache and store)。 命令主要分以下几部分关键字(Keys) 字符串(String) 哈希(Hashs) 列表(Lists) 集合(Sets) 有序集合(Sorted Sets)HyperLogLog 发布/订阅(Pub/Sub) 事务(Transactions) 脚本(Scripting)

    11861 人正在学习 去看看 任亮

一. 数据倾斜的现象

多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。

二. 数据倾斜的原因

常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作。

数据问题

  1. key本身分布不均匀(包括大量的key为空)
  2. key的设置不合理

spark使用问题

  1. shuffle时的并发度不够
  2. 计算方式有误

三. 数据倾斜的后果

  1. spark中一个stage的执行时间受限于最后那个执行完的task,因此运行缓慢的任务会拖累整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。
  2. 过多的数据在同一个task中执行,将会把executor撑爆,造成OOM,程序终止运行。

一个理想的分布式程序:
理想的分布式程序

发生数据倾斜时,任务的执行速度由最大的那个任务决定:
发生数据倾斜

四. 数据问题造成的数据倾斜

发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。

找出异常的key

如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。

选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个

df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)

如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。

经过分析,倾斜的数据主要有以下三种情况:

  1. null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
  2. 无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
  3. 有效数据,业务导致的正常数据分布。

解决办法

第1,2种情况,直接对数据进行过滤即可。

第3种情况则需要进行一些特殊操作,常见的有以下几种做法。

  1. 隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。
  2. 对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
  3. 使用reduceByKey 代替 groupByKey
  4. 使用map join。

举例:

如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作如下:

  1. 将原始的 key 转化为 key + 随机值(例如Random.nextInt)
  2. 对数据进行 reduceByKey(func)
  3. key + 随机值 转成 key
  4. 再对数据进行 reduceByKey(func)

tip1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。

tips2: 单独处理异常数据时,可以配合使用Map Join解决。

五. spark使用不当造成的数据倾斜

1. 提高shuffle并行度

dataFramesparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。
rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。

局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。

2. 使用map join 代替reduce join

在小表不是特别大(取决于你的executor大小)的情况下使用,可以使程序避免shuffle的过程,自然也就没有数据倾斜的困扰了。

局限性: 因为是先将小数据发送到每个executor上,所以数据量不能太大。

具体使用方法和处理流程参照:

Spark map-side-join 关联优化

spark join broadcast优化

2017-12-22 16:06:11 bitcarmanlee 阅读数 1738
  • redis高并发处理由浅入深(备java基础,javaee课程)

    Redis是一款依据BSD开源协议发行的高性能Key-Value存储系统(cache and store)。 命令主要分以下几部分关键字(Keys) 字符串(String) 哈希(Hashs) 列表(Lists) 集合(Sets) 有序集合(Sorted Sets)HyperLogLog 发布/订阅(Pub/Sub) 事务(Transactions) 脚本(Scripting)

    11861 人正在学习 去看看 任亮

1.热点key的数据倾斜

在大数据相关的统计与处理中,热点key造成的数据倾斜非常常见也非常讨厌,经常会造成job运行时间变长或者造成job的OOM最后导致任务失败。例如在wordcount任务中,如果有一个word是热点词,出现的次数很多,那么最后这个job的运行时间就是由这个热点词所在的task运行时间决定的。因此遇到这种热点问题,我们需要想办法改进代码,优化任务,提高最终的运行效率。

2.实际case

现在有这么一个简单的实际例子:
hdfs上有一个名为”xxx”的路径,此路径下的数据量比较大,有几百G之多。现在我们想统计一下这个路径下所有文件的行数。
如果数据量不大,在spark-shell中,可以用一行简单的代码解决问题:

scala> sc.textFile("xxx").count()

但是数据量大了以后,运行的速度很慢很慢,慢到不可接受;而且最后程序会报OOM退出,得不到最终的结果。那怎么办呢?

3.通过将热点key打算做计算

我们将上述需求稍微做一下转型:
统计所有数据的行数,假设每一行对应的一个key就是”all”,每一行的输出是”all, 1”,最后需要做的就是简单的wordcount,针对all这个热点key,然后求和!
这种我们明确知道热点key是啥的case,一般的做法是将热点key先打散,然后再聚回来!
直接上代码:

    def linestats(sc: SparkContext) = {
        val inputpath = "xxx"
        sc.textFile(inputpath)
            .map(x => {
                val randomNum = (new java.util.Random).nextInt(2000)
                val allkey = randomNum + "_all"
                (allkey, 1)
            })
            .reduceByKey((x, y) => x + y)
            .map(x => {
                val (keywithrandom, num) = (x._1, x._2)
                val key = StringUtils.split(keywithrandom, "_")(1)
                (key, num.toLong)
            })
            .reduceByKey((x, y) => x + y)
            .map(x => "%s\t%s".format(x._1, x._2))
            .repartition(1)
    }

上面代码的思路如下:
1.第一步先将key打算,给所有”all”加上一个随机前缀。
2.然后对带有随机前缀的key做第一次聚合,即reduceByKey操作,得出第一次聚合的结果。
3.再将随机前缀去掉,做第二次聚合,即reduceByKey操作,得到最终的结果!

2019-09-21 14:10:52 weixin_44685655 阅读数 127
  • redis高并发处理由浅入深(备java基础,javaee课程)

    Redis是一款依据BSD开源协议发行的高性能Key-Value存储系统(cache and store)。 命令主要分以下几部分关键字(Keys) 字符串(String) 哈希(Hashs) 列表(Lists) 集合(Sets) 有序集合(Sorted Sets)HyperLogLog 发布/订阅(Pub/Sub) 事务(Transactions) 脚本(Scripting)

    11861 人正在学习 去看看 任亮

Spark数据倾斜和Hive数据倾斜理解

Spark数据倾斜出现的现象,原因,方案

现象:

  • 单个或者某几个task拖延整个任务运行时间,导致整体耗时过大
  • 单个task处理数据过多,很容易导致oom

原因:

(1)数据的问题:本身Key的分布不均,这里面含有null值,无效数据,有效数据

解决的方案是对前两点可以直接对数据过滤;对有效数据可以进行隔离操作,将异常的key单独处理,处理好后和正常数据进行union操作,也可以向添加随机值,进行操作后,去掉随机值,再执行一次操作;使用reduceByKey 代替 groupByKey

df.select(“key”).sample(false,0.1).map(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)

(2)spark的使用问题:

解决方案:提高shuffle并行度,rdd和dataframe以及spark sql都可以进行相应参数上的设置;使用map join 代替reduce join(主要考虑的是小表不是很大的情况下,避免shuffle的过程)

Hive数据倾斜出现的原因,方案

1.key分布不均 空值产生的数据倾斜

过滤空值:select * from userid where user_id is not null

赋予空值添加随机值:select * from log a left join user b on case when a.user_id is null then concat(‘hive’,rand()) else a.user_id =b.user_id

2.把不同的数据类型进行关联也会出现数据倾斜

把int类型id转换成为string类型的id

select * from user a left join log b on b.user_id=cast(a.user_id as string)

3.大小表关联查询产生数据倾斜

使用map join来解决小表关联大表的数据倾斜问题。在hive0.11版本后可以设置mapjoin优化开关。

2019-10-31 21:31:05 liminghui4321 阅读数 15
  • redis高并发处理由浅入深(备java基础,javaee课程)

    Redis是一款依据BSD开源协议发行的高性能Key-Value存储系统(cache and store)。 命令主要分以下几部分关键字(Keys) 字符串(String) 哈希(Hashs) 列表(Lists) 集合(Sets) 有序集合(Sorted Sets)HyperLogLog 发布/订阅(Pub/Sub) 事务(Transactions) 脚本(Scripting)

    11861 人正在学习 去看看 任亮

1.Spark出现数据倾斜场景:
1.在join的时候,有很多数据的join的值为空值.那么这个时候所有空值的数据都会分配到一个task中从而出现数据倾斜
解决方案:过滤空值
2,当分区数设置过小,导致很多key聚集到一个分区从而导致数据倾斜
解决方案:增大分区数
3.某个key特别多的groupBy的时候出现倾斜
解决方案:局部聚合+全局聚合
4.大表join小表,因为大表中某一个key的数据特别的时候,也会出现数据倾斜
解决方案:将小表广播出去,避免shuffle操作
5,大表join大表的时候,由于某个或者某几个key特比多的时候,也会出现数据倾斜
解决方案:将产生数据倾斜的key过滤出来,进行单独处理,其余没有出现数据倾斜的key照常处理
6.大表join大表的时候,有很多的key数据量都比较大,那这些key都会导致数据倾斜
解决方案:将表进行扩容

2.SQL题目
表结构:id,name,account,一条语句查询出大于平均金额的账号。不能用子查询
SELECT a.* FROM account a , account b GROUP BY a.id HAVING a.account>=AVG(b.account)

3.数仓Hive数据模型
星型模型
​ 核心是一个事实表及多个非正规化描述的维度表组成。
雪花模型
​ 它是星型模型的扩展,不同的是维度表被规范化,进一步分解到附加表中。
星座模型
​ 由多个事实表组合,维表是公共的,可以被多个事实表共享。星座模型是数据仓库最常使用的模型。

4.运行一个 Spark 程序运行流程
①启动 Driver, 创建 SparkContext
②Client 提交程序给 Drive, Drive 向 Cluster Manager 申请集群资源
③资源申请完毕, 在 Worker 中启动 Executor
④Driver 将程序转化为 Tasks, 分发给 Executor 执行

5.添加新列
​ select null as year
withcolumn

6.Kafka数据只有在一个分区内是有序的

7.分桶与分区
​ 分区 ​ 是指按照数据表的某列或某些列分为多个区,区从形式上可以理解为文件夹,比如我们要收集某个大型网站的日志数据,一个网站每天的日志数据存在同一张表上,由于每天会生成大量的日志,导致数据表的内容巨大,在查询时进行全表扫描耗费的资源非常多。 ​ 那其实这个情况下,我们可以按照日期对数据表进行分区,不同日期的数据存放在不同的分区,在查询时只要指定分区字段的值就可以直接从该分区查找。 ​ 分桶 ​ 分桶是相对分区进行更细粒度的划分。 ​ 分桶将整个数据内容按照某列属性值得hash值进行区分,如要安装name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。 ​ 如取模结果为0的数据记录存放到一个文件,取模为1的数据存放到一个文件,取模为2的数据存放到一个文件。

8.RDD五大属性
​ 分区列表 ​
计算函数 ​
依赖关系(分区的依赖关系) ​
分区函数
​ 数据本地优先

9.弹性分布式数据集
​ 容错 ​
可分区(动态调整分区) ​
内存不足使用磁盘

10.Spark运行模式较多,包含但是不止以下
local 就是单机,jobs都在这台机器上运行。
standalone 就是说多台机器组成一个集群,然后jobs可以分在多台机器上运行 yarn 就是说spark程序运行在yarn上(别的应用共享服务器建议用yarn)
client 就是jobs在不同机器运行,然后结果返回到这台机器上。
cluster 就是jobs在不同机器运行,结果返回到集群中的某一台机器上。

11.spark参数
1.foreachpartitions() 一批数据处理.每个分区
2.分区数=本次任务CPU的核数的2-3倍
3.Try(arr(6).toInt).getOrElse(0)
4.//spark.default.parallelism 设置shuffle的分区数

12.实时与离线区别
实时:
web接口(写kafka代码,数据写入kafka)+sparkstreaming/flink/structuredStreaming+hbase/es/mysql

离线:
flume + kafka + hdfs + hive + spark + mysql => 前端展示

13.repartitioin与coalesce区别
repartitioin(numPartitions)默认可大可小
coalesce(numPartitions, shuffle)默认减少分区

2018-08-31 20:24:03 qq_35394891 阅读数 680
  • redis高并发处理由浅入深(备java基础,javaee课程)

    Redis是一款依据BSD开源协议发行的高性能Key-Value存储系统(cache and store)。 命令主要分以下几部分关键字(Keys) 字符串(String) 哈希(Hashs) 列表(Lists) 集合(Sets) 有序集合(Sorted Sets)HyperLogLog 发布/订阅(Pub/Sub) 事务(Transactions) 脚本(Scripting)

    11861 人正在学习 去看看 任亮

一、前述

数据倾斜问题是大数据中的头号问题,所以解决数据倾斜尤为重要,本文只针对几个常见的应用场景做些分析 。

 

二。具体方法

 

 1、使用Hive ETL预处理数据

方案适用场景:

如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:

此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:

这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

 

2、过滤少数导致倾斜的key

方案适用场景:

如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

方案实现思路:

如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

方案实现原理:

将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

 

3、提高shuffle操作的并行度

方案实现思路:

在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

方案实现原理:

增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个不同的key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。

 

4、双重聚合

方案适用场景:

对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:

这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

方案实现原理:

将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。如果一个RDD中有一个key导致数据倾斜,同时还有其他的key,那么一般先对数据集进行抽样,然后找出倾斜的key,再使用filter对原始的RDD进行分离为两个RDD,一个是由倾斜的key组成的RDD1,一个是由其他的key组成的RDD2,那么对于RDD1可以使用加随机前缀进行多分区多task计算,对于另一个RDD2正常聚合计算,最后将结果再合并起来。

随机前缀加几,ReduceByKey分几个区。

 

5、将reduce join转为map join(彻底避免数据倾斜)

BroadCast+filter(或者map)

方案适用场景:

在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

方案实现思路:

不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

方案实现原理:

普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。

 

6、采样倾斜key并分拆join操作

方案适用场景:

两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。而另外两个普通的RDD就照常join即可。最后将两次join的结果使用union算子合并起来即可,就是最终的join结果 。

 

7、使用随机前缀和扩容RDD进行join

 

方案适用场景:

如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。然后将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。最后将两个处理后的RDD进行join即可。

 

spark数据倾斜

阅读数 96

没有更多推荐了,返回首页