精华内容
下载资源
问答
  • spark调优

    2020-04-02 16:19:44
    spark调优之性能调优 1.合理的分配资源 2.设置并行度 3.重构RDD架构以及RDD持久化 4.广播变量 5.调节数据本地化等待时长 spark调优之JVM调优 1.jvm垃圾回收机制 2.降低cache操作的内存占比 3.调节executer对外内存与...

    spark调优目录

    spark调优之性能调优

    1.合理的分配资源
    2.设置并行度
    3.重构RDD架构以及RDD持久化
    4.广播变量
    5.调节数据本地化等待时长

    spark调优之JVM调优

    1.jvm垃圾回收机制
    2.降低cache操作的内存占比
    3.调节executer对外内存与连接等待时长

    spark调优之Shuffle调优

    1.开启shuffle map端输出文件合并的机制
    2.调节map端内存缓冲与reduce端内存的占比

    spark调优之算子调优

    1.map==>MapPartitions
    2.filter==>filter().coalesce / filter==>case when then else end
    3.foreach==>foreachPartirion
    4.SparkSql无法设置并行度
    5.group by key ==>reduce by key本地聚合

    spark调优之问题解决

    1.shuffle reduced端缓冲大小避免ooM
    a.性能调优
    b.reduce端缓冲(buffer)可能会出现的问题及解决方式
    2.解决JVM GC导致的shuffle文件拉取失败

    3.YARN队列资源不足导致的application直接失败

    4.解决各种序列化导致的报错

    5.解决算子函数返回NULL导致的问题

    6.解决yarn-cluster模式的JVM栈内存溢出的问题,

    spark调优实例

    spark调优之性能调优

    1.合理的分配资源

    在执行spark任务的时候使用的是spark-submit shell脚本执行的,
    	执行的时候分配executer的数量,CPUcores的数量,和driver端的内存
    	a.增加executer数量的好处是:提高并行度,例如原来有三个executer,每个executer有两个CPUcores
    	那么task就是六个,就是说同时运行的task只有六个,六个执行完毕之后再能执行下一个六个task,提高效率
    	b.增加cpu_cores的数量:效果和上面的一样,增加了task并行的数量,提高效率
    	c.增大driver端的内存:大概有三个好处
    		(1)RDD进行计算缓存的时候,就可以有更多的数量缓存的内存中,更少的数据缓存到磁盘中,减少了磁盘IO
    		(2)对于shuffle操作,reduce端,会需要内存来存放拉取数据进行聚合,如果内存不够也会写入磁盘,
    		如果给executer分配更多的内存以后,就会有更少的数据需要写入磁盘,甚至不需要写入磁盘,减少IOj,提高效率.
    		(3)对于task执行而言,会创建很多的对象,如果内存不够会导入JVM堆内存满了,导致多次的垃圾回收,
    		程序会等带JVM垃圾回收完毕之后继续运行,速度变慢,效率下降
    

    2.设置并行度

    合理的并行度的设置,应该是设置的足够大,大到可以完全利用你的集群资源task数量,设置成spark application总CPUcore数量的2~3,比如150个CPUcore基本要设置task数量为300~500.
    代码实现:
    SparkConf conf = new SparkConf()
      								.set("spark.default.parallelism", "500")
    

    3.重构RDD架构以及RDD持久化

    将多次复用的RDD持久化,就是将RDD的数据缓存到内存或者磁盘中,再次使用的时候直接去内存或者磁盘中拉取数据即可,如果数据量较大,占用较多的内存,
    	可以将RDD的数据系列化后缓存,将数据序列化为字节数组减小内存,不过使用序列化在获取数据时还需反序列化.
    	Kryo序列化机制
    

    4.广播变量

    例如说在spark执行任务的时候需要一份固定的数据,每一个executer有若干个task那么task执行任务的时候都要去driver拉取这个固定的数据,这样效率低下,数据复用使用广播变量,
    	将固定的数据广播到executer中,那么每个executer上的task都会到自己的executer上读取数据,提高了效率.
    

    5.调节数据本地化等待时长

    spark在执行的过称中,理想的情况是task处理的数据正好在所在的节点上,这样就不用网络传输,那么如果所在数据节点上的计算任务已经满了,那么这个task就会分配到其他节点上运行,
    那么数据要从有数据的那个节点上拉取,进行网络传输,影响效率.new SparkConf().set("spark.locality.wait","10")
    

    spark调优之JVM调优

    jvm的组成

    栈内存:本地方法栈,java虚拟机栈,程序计数器
    堆内存:新生代,生存代,年轻代,老年代,长久代,不同的区域GC(垃圾回收)机制不同,
    	新的对象实例创建会放在Eden(伊甸)区域,随着对象增多,Eden消耗内存接近Eden的最大值,huich
    	所有的对象实例,GC的主要对象
    方法区:存放各种静态方法,和静态变量
    

    1.jvm的GC(Garbage垃圾 Collection回收)

    堆内存存放我们创建的一些对象,有老年代和年轻代。理想情况下,老年代都是放一些生命周期很长的对象,数量应该是很少的,比如数据库连接池。我们在 spark task 执行算子函数(我们自己写的), 可能会创建很多对象,这些对象都是要放入 JVM 年轻代中的。 
    每一次放对象的时候,都是放入eden(伊甸园)区域,和其中一个survivor(存活)区域。另外一个survivor 区域是空闲的.eden(伊甸园)区域和一个survivor区域放满了以后(spark运行过程中,产生的对象实在太多了),就会触 发 minor(次要) gc,小型垃圾回收。把不再使用的对象,从内存中清空,给后面新创建的对象腾出来点儿地 方。清理掉了不再使用的对象之后,那么也会将存活下来的对象(还要继续使用的),放入之前空闲的那 一个 survivor 区域中。这里可能会出现一个问题。默认 eden、survior1 和 survivor2 的内存占比是 8:1:1。 问题是,如果存活下来的对象是 1.5,一个 survivor 区域放不下。此时就可能通过 JVM 的担保机制(不 同 JVM 版本可能对应的行为),将多余的对象,直接放入老年代了。 如果你的 JVM 内存不够大的话,可能导致频繁的年轻代内存满溢,频繁的进行 minor gc。频繁的 minor gc 会导致短时间内,有些存活的对象,多次垃圾回收都没有回收掉。会导致这种短生命周期(其实不 一定是要长期使用的)对象,年龄过大,垃圾回收次数太多还没有回收到,跑到老年代。 老年代中,可能会因为内存不足,囤积一大堆,短生命周期的,本来应该在年轻代中的,可能马上就要被回收掉的对象。此时,可能导致老年代频繁满溢。频繁进行full gc(全局/全面垃圾回收)。full gc就会去回收老年代中的对象。full gc由于这个算法的设计,是针对的是,老年代中的对象数量很少,满溢进行full gc的频率应该很少,因此采取了不太复杂,但是耗费性能和时间的垃圾回收算法。full gc很慢。full gc/minor gc,无论是快,还是慢,都会导致jvm的工作线程停止工作,stop the world。简而言之,就是说,gc的时候,spark停止工作了。等着垃圾回收结束。
    	内存不充足的时候,出现的问题:
    	(1)频繁minor gc,也会导致频繁spark停止工作
    	(2)老年代囤积大量活跃对象(短生命周期的对象),导致频繁full gc,full gc时间很长,短则数十秒,长则数分钟,甚至数小时。可能导致spark长时间停止工作。
    	(3)严重影响咱们的spark的性能和运行的速度
    

    2.降低cache操作的内存占比

    spark 中,堆内存又被划分成了两块,一块是专门用来给 RDD 的 cache、persist 操作进行 RDD 数据缓 存用的。另外一块用来给 spark 算子函数的运行使用的,存放函数中自己创建的对象。 
    	默认情况下,给 RDD cache 操作的内存占比,是 0.660%的内存都给了 cache 操作了。但是问题是, 如果某些情况下 cache 不是那么的紧张,问题在于 task 算子函数中创建的对象过多,然后内存又不太 大,导致了频繁的 minor gc,甚至频繁 full gc,导致 spark 频繁的停止工作。性能影响会很大。 针对上述这种情况,可以在任务运行界面,去查看你的 spark 作业的运行统计,可以看到每个 stage 的运行情况,包括每个 task 的运行时间、gc 时间等等。如果发现 gc 太频繁,时间太长。此时就可以 适当调节这个比例。 
    	降低 cache 操作的内存占比,大不了用 persist 操作,选择将一部分缓存的 RDD 数据写入磁盘,或者 序列化方式,配合 Kryo 序列化类,减少 RDD 缓存的内存占用。降低 cache 操作内存占比对应的,算子函数的内存占比就提升了。这个时候,可能就可以减少 minor gc 的频率,同时减少 full gc 的频率。 对性能的提升是有一定的帮助的。 
    	一句话,让 task 执行算子函数时,有更多的内存可以使用。 
    	spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2 
    

    3.调节executer对外内存与连接等待时长

    在执行spark-submit命令的时候  使用--conf的方法调节堆外内存
    	--conf spark.yarn.executor.memoryOverhead=2048
    	
    	调节连接等待时长
    	executor会从本地拉取数据  如果本地没有 会从替他节点拉取数据 此时如果jvm正在垃圾回收 那么spqrk会停止数据  就不会拉取数据  这个默认的连接是60秒了还没有拉取到数据 呢么就会宣告失败, 那么调节连接等待时长会很有必要 避免部分会因为超时而失败的任务
    	-conf spark.core.connection.ack.wait.timeout=300
    

    spark调优之Shuffle调优

    1.开启shuffle map端输出文件合并的机制

    如果spqrk任务会产生大量的map文件 shuffle过程中会产生大量的磁盘文件 会影响效率
    	基本上spark 作业的性能,都消耗在 shuffle 中了
    	默认情况下,是不开启的,就是会发生如上所述的大量 map 端输出文件的操作,严重影响性能
    	new SparkConf().set("spark.shuffle.consolidateFiles", "true")
    

    2.调节map端内存缓冲与reduce端内存的占比

    map操作时需要将数据先缓存到内存中,当达到一定大小时(默认32K)会溢写磁盘,为了减少溢写的次数 可适当调大一点
    	调节 map task 内存缓冲:spark.shuffle.file.buffer,默认 32k 
    	
    	默认情况下executor分给reduce task的内存比例仅有0.2  那么内存太小 每次task拉取来的数据有可能会放不下 需要溢写磁盘 这样会影响效率
    	调节 reduce 端聚合内存占比:spark.shuffle.memoryFraction,0.2
    	适量的调大一点 也不能过大 因为内存资源有限 这里调的太大 其他地方的占比就小了 
    	
    	总之调节这两个参数都是为了减少磁盘的交互 而且减少了后面聚合读取磁盘文件的数量
    	提高运行效率
    

    spark调优之算子调优

    在spark中,最基本的与原则就是每一个task处理一个RDD的partition

    1.Map==>MapPartitions

    优势:在spark程序中例如map(func)连接mysql,参数func逻辑后返回的数据组成一个新的RDD,所以每一行数据都会经过这个函数处理,假设数据有M行,map(func)就会通过M次网络IO向Executor发送任务进行计算的,
    每一次连接都会创建一个连接一个连接就是一个对象,在这个程序的堆内存中就会创建M个实例对象,mappartition()的方法实际上执行次数等于分区数N,虽然通过网络IO向Executor发送N次任务但是程序最终还是执行了M行,N(分区数)一定是小于M(数据行数)的所以通过网络次数减少,效率是一定会提高的.
    弊端:mappartition()是把计算后的数据通过网络IO发送给Executor的如果计算之后的数据大于Executor的内存就会报OOM异常个GC异常,
    所以内存足够的话可以考虑用mappartition()的算子来提高效率.
    补充:
    OOM异常是:OutOfMemoryError(内存溢出异常)
    GC异常是:频繁的垃圾回收,导致数据等待,如果连续等待的次数超过了5次还没有等待垃圾回收完毕,就会GC导致程序超时而中断,我们的内存分为,执行内存和缓存内存(cached memory),默认缓存内存(cached memory)60%,我们将缓存内存调小,就会增大执行内存,就会加快垃圾回收,从而减少GC的出现概率.
    

    2.2.filter==>filter().coalesce 或者 filter==>case when then else end

    filter操作后每个partition的数据会有所变化 但task的数量不变  那么后续的task就会数据倾斜,或者数据量可以压缩为更少的分区 减少task的数量  那么就造成了task资源的浪费
    	解决办法1:调用 coalesce 算子 压缩partition的数据,节省task,将节省出来的资源,去执行其他的程序,提高整体效率.
    	解决办法2:调用case when then else end函数,将需要反复filter的程序,一遍过.
    

    3.foreach==>foreachPartirion

    默认的forech 每一条数据都会执行一次function ,如果需要连接数据库  那么有多少条数据  就会创建多少次的链接和sql语句的执行,在堆内存中就会产生多少个对象的实例,消耗资源,影响效率.
    foreachpartition算子 在每个partition中只会创建一次链接和只发送一次的sql语句和参数 提升性能  但是这个算子和maopartiton算子一样如果数据量很大的话,还是会有OOM GC 的风险的!!
    

    4.SparkSql无法设置并行度

    在sqarksql中 我们设置的并行度是不起作用的 因为sqarksql在读取hive表的时候  会根据hive表所对应hdfs数据的块数量来决定的  例如我们设置的是100并行度  而文件的切块是10个  那么就会导致第一层的stage会很慢 第二层的stage会很块(对比之下)。
    	解决问题:使用 repartition 算子去重新进行分区,此时可以分成多个partition  那么repartition分区后的RDD的并行度就会按照我们设置的运行了。
    

    5.group by key ==>reduce by key本地聚合

    相对与groupbykey来说  使用本地聚合的好处是 在本地聚合以后 map端的数据量就变少了  减少了磁盘IO
    对于下一个stage来说 拉取的数据也少了 在reduce端占用的内存就变少了
    

    spark调优之问题解决

    1.shuffle reduced端缓冲大小避免ooM

    a.性能调优
    	map是不断的拉取数据的  reduce也是在不断的拉取数据 如果数据量不是很大的话 map端输出一点 reduce就会拉取一点  在资源充足的情况下 可以适当的增大reduce 缓存的大小 减少拉取的次数 可以提升性能  当然必须是在性能资源充足的情况下
    b.reduce端缓冲(buffer)可能会出现的问题及解决方式
    	如果数据量很大 而且map端输出的速度很快时  reduce端的task就会每次将数据拉满后计算 这样的话再加上计算任务会产生很多的对象  那么有可能会造成内存不足OOM的情况  
    需要将reduce端的缓存调小  避免内存溢出
    	ew SparkConf().set(spark.reducer.maxSizeInFlight,”48)
    

    2.解决JVM GC导致的shuffle文件拉取失败

    比如再stage在拉取文件的时候  上一个stage正在进行垃圾回收  默认情况是会重复拉取3次  每次延迟5秒  如果15后还没有拉取到数据 那么就会失败
    	调节重复次数和延迟时间
    	spark.shuffle.io.maxRetries 60 
    	spark.shuffle.io.retryWait 60s
    

    3.YARN队列资源不足导致的application直接失败

    用yarn方式提交作业时  实际占用的资源会比设置的资源要大一些  如果我们设置的资源占比时整个集群的一半  那么实际的占用要大于1/2  如果此时又提交了同样的任务  那么资源调度不过来会报错
    	解决办法:
    	(1)限制任务的个数 在sqarksubmit shell设置中限制 每次只提交一个任务  确保这个任务的执行
    	(2)采用调度分区的方式 将长时间作业的任务  和短时间的任务区分开来
    	(3)使用性能调优  计算使用的资源 尽量每一次的任务都达到最满的资源利用
    	(4)通过线程池的方式  一个线程池对应一个资源队列
    

    4.解决各种序列化导致的报错

    1、你的算子函数里面,如果使用到了外部的自定义类型的变量,那么此时,就要求你的自定义类型, 必须是可序列化的。
    	、如果要将自定义的类型,作为 RDD 的元素类型,那么自定义的类型也必须是可以序列化的
    	、不能在上述两种情况下,去使用一些第三方的,不支持序列化的类型
    

    5.解决算子函数返回NULL导致的问题

    1、在返回的时候,返回一些特殊的值,不要返回 null,比如“-9992、在通过算子获取到了一个 RDD 之后,可以对这个 RDD 执行 filter 操作,进行数据过滤。filter 内, 可以对数据进行判定,如果是-999,那么就返回 false,给过滤掉就可以了。
    

    6.解决yarn-cluster模式的JVM栈内存溢出的问题,

    有时候yarn-client下任务会执行  但是在yarn-cluster上运行不了会报出 JVM 的 PermGen(永久代)的内存溢出 这个原因是yarn-cluster模式默认的永久代内存小与yarn-client的默认大小 
    	spark-submit 脚本中,加入以下配置即可: 
    	--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M" 
    	如果sql语句中又大量的or语句  可能就会出现一个 driver 端的 jvm stack overflow,JVM 栈 
    	内存溢出的问题。那么需要优化sql语句  可以将这个复杂的语句拆分为几个  大概每个语句or控制在100以内
    
    展开全文
  • Spark调优

    2019-11-12 20:20:01
    Spark调优 1.SparkCore调优 1.1数据序列化 Spark支持两种方式的序列化: ​ 1、Java原生序列化JavaSerializer ​ 2、Kryo序列KryoSerliazer 序列化对于Spark应用的性能来说,具有很大的影响。在特定的数据格式情况下...

    Spark调优

    1.SparkCore调优

    1.1数据序列化

    Spark支持两种方式的序列化:

    ​ 1、Java原生序列化JavaSerializer

    ​ 2、Kryo序列KryoSerliazer

    序列化对于Spark应用的性能来说,具有很大的影响。在特定的数据格式情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略。

    KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持,兼容性不好,所以需要手动注册类。

    序列化功能用在两个地方:序列化任务和序列化数据。Spark任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和KryoSerializer。

    操作步骤

    Spark程序运行时,在shuffle和RDD Cache等过程中,会有大量的数据需要序列化,默认使用JavaSerializer,通过配置让KryoSerializer作为数据序列化器来提升序列化性能。

    在开发应用程序时,添加如下代码来使用KryoSerializer作为数据序列化器。

    • 实现类注册器并手动注册类。

      package com.etl.common;
      
      import com.esotericsoftware.kryo.Kryo;
      import org.apache.spark.serializer.KryoRegistrator; 
      
      public class DemoRegistrator implements KryoRegistrator
      {
          @Override
          public void registerClasses(Kryo kryo)
          {
              //以下为示例类,请注册自定义的类
              kryo.register(AggrateKey.class);
              kryo.register(AggrateValue.class);
          }
      }
      

      您可以在Spark客户端对spark.kryo.registrationRequired参数进行配置,设置是否需要Kryo注册序列化。

      当参数设置为true时,如果工程中存在未被序列化的类,则会抛出异常。如果设置为false(默认值),Kryo会自动将未注册的类名写到对应的对象中。此操作会对系统性能造成影响。设置为true时,用户需手动注册类,针对未序列化的类,系统不会自动写入类名,而是抛出异常,相对比false,其性能较好。

    • 配置KryoSerializer作为数据序列化器和类注册器。

      val conf = new SparkConf()
      conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "com.etl.common.DemoRegistrator")
      

    1.2使用External Shuffle Service提升性能

    操作场景

    Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致触发GC(Garbage Collection)而不能为其他Executor提供shuffle数据时,会影响任务运行。

    External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。

    操作步骤
    spark.shuffle.service.enabled false true

    1.3使用广播变量

    操作场景

    Broadcast(广播)可以把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。

    1. 每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。
    2. nbgh大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。

    1.4设置并行度

    操作场景

    并行度控制任务的数量,影响shuffle操作后数据被切分成的块数。调整并行度让任务的数量和每个任务处理的数据与机器的处理能力达到最优。

    查看CPU使用情况和内存占用情况,当任务和数据不是平均分布在各节点,而是集中在个别节点时,可以增大并行度使任务和数据更均匀的分布在各个节点。增加任务的并行度,充分利用集群机器的计算能力,一般并行度设置为集群CPU总和的2-3倍。

    操作步骤

    并行度可以通过如下三种方式来设置,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。

    • 在会产生shuffle的操作函数内设置并行度参数,优先级最高。

      testRDD.groupByKey(24)
      
    • 在代码中配置

      “spark.default.parallelism”

      设置并行度,优先级次之。

      val conf = new SparkConf()
      conf.set("spark.default.parallelism", 24)
      
    • “$SPARK_HOME/conf/spark-defaults.conf”

      文件中配置

      “spark.default.parallelism”

      的值,优先级最低。

      spark.default.parallelism    24
      

    1.5配置内存

    操作场景

    Spark是内存计算框架,计算过程中内存不够对Spark的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存中RDD的大小来判断内存是否变成性能瓶颈,并根据情况优化。

    监控节点进程的GC情况(在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"

    ),如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。

    操作步骤
    • 优化GC,调整老年代和新生代的大小和比例。在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:-XX:NewRatio。如," -XX:NewRatio=2",则新生代占整个堆空间的1/3,老年代占2/3。

    • 开发Spark应用程序时,优化RDD的数据结构。

      • 使用原始类型数组替代集合类,如可使用fastutil库。
      • 避免嵌套结构。
      • Key尽量不要使用String。
    • 开发Spark应用程序时,建议序列化RDD。

      RDD做cache时默认是不序列化数据的,可以通过设置存储级别来序列化RDD减小内存。例如:

      testRDD.persist(StorageLevel.MEMORY_ONLY_SER)
      

    1.6 设计DAG

    操作场景

    合理的设计程序结构,可以优化执行效率。在程序编写过程中要尽量减少shuffle操作,合并窄依赖操作。

    1.7经验总结

    使用mapPartitions,按每个分区计算结果

    如果每条记录的开销太大,例:

    rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}
    

    则可以使用MapPartitions,按每个分区计算结果,如

    rdd.mapPartitions(records => conn.getDBConn;for(item <- records)
    write(item.toString); conn.close)
    

    使用mapPartitions可以更灵活地操作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。

    使用coalesce调整分片的数量

    coalesce可以调整分片的数量。coalesce函数有两个参数:

    coalesce(numPartitions: Int, shuffle: Boolean = false)
    

    当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。

    遇到下列场景,可选择使用coalesce算子:

    • 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。
    • 当输入切片个数太大,导致程序无法正常运行时使用。
    • 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。
    localDir配置

    Spark的Shuffle过程需要写本地磁盘,Shuffle是Spark性能的瓶颈,I/O是Shuffle的瓶颈。配置多个磁盘则可以并行的把数据写入磁盘。如果节点中挂载多个磁盘,则在每个磁盘配置一个Spark的localDir,这将有效分散Shuffle文件的存放,提高磁盘I/O的效率。如果只有一个磁盘,配置了多个目录,性能提升效果不明显。

    Collect小数据

    大数据量不适用collect操作。

    collect操作会将Executor的数据发送到Driver端,因此使用collect前需要确保Driver端内存足够,以免Driver进程发生OutOfMemory异常。当不确定数据量大小时,可使用saveAsTextFile等操作把数据写入HDFS中。只有在能够大致确定数据大小且driver内存充足的时候,才能使用collect。

    使用reduceByKey

    reduceByKey会在Map端做本地聚合,使得Shuffle过程更加平缓,而groupByKey等Shuffle操作不会在Map端做聚合。因此能使用reduceByKey的地方尽量使用该算子,避免出现groupByKey().map(x=>(x._1,x._2.size))这类实现方式。

    广播时map代替数组

    当每条记录需要查表,如果是Driver端用广播方式传递的数据,数据结构优先采用set/map而不是Iterator,因为Set/Map的查询速率接近O(1),而Iterator是O(n)。

    数据倾斜

    当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

    • 需要重新设计key,以更小粒度的key使得task大小合理化。
    • 修改并行度。
    优化数据结构
    • 把数据按列存放,读取数据时就可以只扫描需要的列。
    • 使用Hash Shuffle时,通过设置spark.shuffle.consolidateFiles为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO操作以提升性能。最终文件数为reduce tasks数目。
    展开全文
  • Spark 调优

    2021-01-14 21:59:55
    Spark调优一、Spark资源参数调优二、开发调优 **调优思路:**优先使用参数调优,如果参数调优不能满足我们的业务场景,这里就要涉及到代码调优 一、Spark资源参数调优 • num-executors:该作业总共需要多少executor...


    **调优思路:**优先使用参数调优,如果参数调优不能满足我们的业务场景,这里就要涉及到代码调优

    一、Spark资源参数调优

    num-executors:该作业总共需要多少executor进程执行
    – 建议:每个作业运行一般设置50~100个左右较合适
    executor-memory:设置每个executor进程的内存, num-executors* num- executors代表作业申请的总内存量(尽量不要超过最大总内存的1/3~1/2)
    – 建议:设置4G~8G较合适
    executor-cores:每个executor进程的CPU Core数量,该参数决定每个executor进程并行执行task线程的能力, num-executors* executor-cores代表作业申请总CPU core数(不要超过总CPU Core的1/3~1/2 )
    – 建议:设置2~4个较合适
    driver-memory: – 建议:通常不用设置,一般1G就够了,若出现使用collect算子将RDD数据全部拉取到Driver上处理,就必须确保该值足够大,否则OOM内存溢出
    spark.default.parallelism:每个stage的默认task数量
    – 建议:设置500~1000较合适,默认一个HDFS的block对应一个task,Spark默认值偏少,这样导致不能充分利用资源
    spark.storage.memoryFraction:设置RDD持久化数据在executor内存中能占的比例,默认0.6,即默认executor 60%的内存可以保存持久化RDD数据
    – 建议:若有较多的持久化操作,可以设置高些,超出内存的会频繁gc导致运行缓慢
    spark.shuffle.memoryFraction:聚合操作占executor内存的比例,默认0.2
    – 建议:若持久化操作较少,但shuffle较多时,可以降低持久化内存占比,提高shuffle操作内存占比
    注意:使用 mapjoin操作,就会将小表放到driver中,前提:如果数据量不是很大,可以操作
    反之,①避免mapjoin操作 ② 增加driver内存
    方式一:(脚本中进行设置):
    Spark_submit 命令调优实例(脚本中进行设置)
    在这里插入图片描述

    --master yarn-cluster 执行模式是集群模式
    --num-executors 100  100个executor进程执行
    --executor-memory 6G 申请的总内存量(进程)内存6G
    --executor-cores 4 每个executor进程的CPU Core数量为4
    --driver-memory 1G  driver设置1G内存
    

    方式二:代码调优
    代码参数调优案例:

    val conf = new SparkConf()
      .registerKryoClasses(Array(classOf[JiebaSegmenter]))
      .set("spark.rpc.message.maxSize","800")
    val spark = SparkSession
      .builder()
      .appName("Jieba UDF")
      .enableHiveSupport()
      .config(conf) //将conf配置到spark中
      .getOrCreate()
    
      //注册自定义类交给KryoSerializer序列化处理类进行序列化  .registerKryoClasses(Array(classOf[xxxx]))
      xxxx--->是要进行序列化的类名
    

    二、开发调优

    原则一:避免创建重复的RDD
    – 对同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据
    – 极大浪费内存
    利用RDD1中的tuple中元素来保留RDD2 节省内存
    在这里插入图片描述
    原则二:尽可能复用同一个RDD
    – 比如:一个RDD数据格式是key-value,另一个是单独value类型,这两个RDD的value部分完
    全一样,这样可以复用达到减少算子执行次数
    在这里插入图片描述
    原则三:对多次使用的RDD进行持久化处理
    – 每次对一个RDD执行一个算子操作时,都会重新从源头处理计算一遍,计算出那个RDD出来,
    然后进一步操作,这种方式性能很差
    – 对多次使用的RDD进行持久化,将RDD的数据保存在内存或磁盘中,避免重复劳动
    – 借助cache()和persist()方法
    在这里插入图片描述
    – persist持久化级别
    在这里插入图片描述
    原则四:避免使用shuffle类算子(现实中不可以避免的)(对性能影响最大的就是shuffle类算子)
    – 在spark作业运行过程中,最消耗性能的地方就是shuffle过程
    – 将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合和join处理,比如
    groupByKey、reduceByKey、join等算子,都会触发shuffle
    重点:shuffle涉及到写磁盘,数据合并,网络数据的传输,因此能避免使用shuffle,就不使用
    在这里插入图片描述
    原则五:使用map-side预聚合的shuffle操作
    – 一定要使用shuffle的,无法用map类算子替代的,那么尽量使用map-site预聚合的算子
    – 思想类似MapReduce中的Combiner – 可能的情况下使用reduceByKey或aggregateByKey算子替代groupByKey算子,因为
    reduceByKey或aggregateByKey算子会使用用户自定义的函数对每个节点本地相同的key进行
    预聚合,而groupByKey算子不会预聚合
    在这里插入图片描述
    原则六:使用Kryo优化序列化性能
    – Kryo是一个序列化类库,来优化序列化和反序列化性能
    – Spark默认使用Java序列化机制(ObjectOutputStream/ ObjectInputStream API)进行序列
    化和反序列化
    – Spark支持使用Kryo序列化库,性能比Java序列化库高很多,10倍左右
    在这里插入图片描述

    展开全文
  • 本文76000字,通篇spark性能调优,性能监控风湿,数据倾斜调优,shuffle调优,程序开发调优,运行资源调优,JVM,GC调优,以及企业spark大数据平台调优真实案例,用于企业spark调优参考,学习交流
  • spark调优指导

    2016-07-21 13:19:38
    spark调优
  • Spark调优 | Spark Streaming 调优1、数据序列化2、广播大变量3、数据处理和接收时的并行度4、设置合理的批处理间隔5、内存优化5.1 内存管理5.2优化策略5.3垃圾回收(GC)优化5.5Spark Streaming 内存优化6、实例...



    原文地址:微信公众号:HBase技术社区
    Spark调优 | Spark Streaming 调优实践



    1、数据序列化

    在分布式应用中,序列化(serialization)对性能的影响是显著的。如果使用一种对象序列化慢、占用字节多的序列化格式,就会严重降低计算效率。通常在 Spark 中,主要有如下3个方面涉及序列化:

    • ① 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
    • ② 将自定义的类型作为 RDD 的泛型类型时,所有自定义类型对象都会进行序列化。因此这种情况下,也要求自定义的类必须实现
      Serializable 接口。
    • ③ 使用可序列化的持久化策略时(比如 MEMORY_ONLY_SER),Spark 会将 RDD 中的每个 partition都序列化成一个大的字节数组。

    而 Spark 综合考量易用性和性能,提供了下面两种序列化库。

    • ① Java 序列化:默认情况下,Spark 使用 Java 的对象输出流框架(ObjectOutputStream framework)来进行对象的序列化,并且可用在任意实现 Java.io.Serializable 接口的自定义类上。我们可以通过扩展Java.io.Externalizable 来更加精细地控制序列化行为。Java序列化方式非常灵活,但是通常序列化速度非常慢而且对于很多类会产生非常巨大的序列化结果。

    • ② Kryo 序列化:Spark 在2.0.0以上的版本可以使用 Kryo 库来非常快速地进行对象序列化,Kryo 要比 Java序列化更快、更紧凑(10倍),但是其不支持所有的 Serializable 类型,并且在使用自定义类之前必须先注册。

    在初始化 SparkConf 时,调用 conf.set("spark.serializer","org.apache.spark. serializer.KryoSerializer")来使用 Kryo。一旦进行了这个配置,Kryo 序列化不仅仅会用在 Shuffling 操作时 worker 节点间的数据传递,也会用在 RDDs 序列化到硬盘的过程。

    Spark 官方解释没有将 Kryo 作为默认序列化方式的唯一原因是,Kryo 必须用户自己注册(注意如果我们不注册自定义类,Kryo 也是可以正常运行的,但是它必须存储每个对象的完整类名,这是非常浪费的),但是其推荐在网络频繁传输的应用中使用 Kryo。

    另外值得注意的是,在 Spark 2.0.0 之后,Spark 已经默认将 Kryo 序列化作为简单类型(基本类型、基本类型的数组及 string 类型)RDD 进行 Shuffling 操作时传输数据的对象序列化方式。

    Spark 已经自动包含注册了绝大部分 Scala 的核心类,如果需要向 Kryo 注册自己的类别,可以使用 registerKryoClasses 方法。使用 Kryo 的代码框架如下:

    // Spark配置项
    val conf = new SparkConf().setMaster(...).setAppName(...)
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 配置序列化方式
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) // 注册需要序列化的类
    val sc = new SparkContext(conf)
    

    如果我们的对象非常大,可能需要增加 Spark.kryoserializer.buffer 的配置。

    同样在 Spark Streaming 中,通过优化序列化格式可以缩减数据序列化的开销,而在 Streaming 中还会涉及以下两类数据的序列化。

    输入数据:Spark Streaming 中不同于 RDD 默认是以非序列化的形式存于内存当中,Streaming> 中由接收器(Receiver接收而来的数据,默认是以序列化重复形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于Executor的内存当中。而采用这种方式的目的,一方面是由于将输入数据序列化为字节流可以减少垃圾回收(GC)的开销,另一方面对数据的重复可以对Executor节点的失败有更好的容错性。同时需要注意的是,输入数据流一开始是保存在内存当中,当内存不足以存放流式计算依赖的输入数据时,会自动存放于硬盘当中。而在 Streaming 中这部分序列化是一个很大的开销,接收器必须先反序列化(deserialize)接收到的数据,然后再序列化(serialize)为 Spark 本身的序列化格式。

    由 Streaming 操作产生 RDD 的持久化:由流式计算产生的 RDDs有可能持久化在内存当中,例如由于基于窗口操作的数据会被反复使用,所以会持久化在内存当中。值得注意的是,不同于 Spark核心默认使用非序列化的持久化方式(StorageLevel.MEMORY_ONLY),流式计算为了减少垃圾回收(GC)的开销,默认使用了序列化的持久化方式StorageLevel.MEMORY_ONLY_SER)。

    不管在 Spark 还是在 Spark Streaming 中,使用 Kryo 序列化方式,都可以减少 CPU 和内存的开销。而对于流式计算,如果数据量不是很大,并且不会造成过大的垃圾回收(GC)开销,我们可以考虑利用非序列化对象进行持久化。

    例如,我们使用很小的批处理时间间隔,并且没有基于窗口的操作,可以通过显示设置相应的存储级别来关闭持久化数据时的序列化,这样可以减少序列化引起的 CPU 开销,但是潜在的增加了 GC 的开销。

    2、广播大变量

    不论 Spark 还是 Spark Streaming 的应用,在集群节点间进行数据传输时,都会有序列化和反序列化的开销,而如果我们的应用有非常大的对象时,这部分开销是巨大的。比如应用中的任何子任务需要使用 Driver 节点的一个大型配置查询表,这时就可以考虑将该表通过共享变量的方式,广播到每一个子节点,从而大大减少在传输和序列化上的开销。

    另外,Spark 在 Master 节点会打印每个任务的序列化对象大小,我们可以通过观察任务的大小,考虑是否需要广播某些大变量。通常一个任务的大小超过 20KB,是值得去优化的。

    当我们将大型的配置查询表广播出去时,每个节点可以读取配置项进行任务计算,那么假设配置发生了动态改变时,如何通知各个子节点配置表更改了呢?(尤其是对于流式计算的任务,重启服务代价还是蛮大的。)

    我们知道广播变量是只读的,也就是说广播出去的变量没法再修改,那么应该怎么解决这个问题呢?我们可以利用 Spark 中的 unpersist() 函数,Spark 通常会按照 LRU(least Recently Used)即最近最久未使用原则对老数据进行删除,我们并不需要操作具体的数据,但如果是手动删除,可以使用 unpersist() 函数。

    所以这里更新广播变量的方式是,利用 unpersist() 函数先将已经发布的广播变量删除,然后修改数据后重新进行广播,我们通过一个广播包装类来实现这个功能,代码如下:

    import java.io.{ ObjectInputStream, ObjectOutputStream }
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.streaming.StreamingContext
    import scala.reflect.ClassTag
    
    // 通过包装器在DStream的foreachRDD中更新广播变量
    // 避免产生序列化问题
    case class BroadcastWrapper[T: ClassTag](
    
        @transient private val ssc: StreamingContext,
        @transient private val _v: T) {
    
      @transient private var v = ssc.sparkContext.broadcast(_v)
    
      def update(newValue: T, blocking: Boolean = false): Unit = {
        // 删除RDD是否需要锁定
        v.unpersist(blocking)
        v = ssc.sparkContext.broadcast(newValue)
      }
    
      def value: T = v.value
    
      private def writeObject(out: ObjectOutputStream): Unit = {
        out.writeObject(v)
      }
    
      private def readObject(in: ObjectInputStream): Unit = {
        v = in.readObject().asInstanceOf[Broadcast[T]]
      }
    }
    

    利用 wrapper 更新广播变量,可以动态地更新大型的配置项变量,而不用重新启动计算服务,大致的处理逻辑如下:

    // 定义
    val yourBroadcast = BroadcastWrapper[yourType](ssc, yourValue)
    
    yourStream.transform(rdd => {
      //定期更新广播变量
      if (System.currentTimeMillis - someTime > Conf.updateFreq) {
        yourBroadcast.update(newValue, true)
      }
      // do something else
    })
    

    3、数据处理和接收时的并行度

    作为分布式系统,增加接收和处理数据的并行度是提高整个系统性能的关键,也能够充分发挥集群机器资源。

    关于 partition 和 parallelism。partition 指的就是数据分片的数量,每一次 Task 只能处理一个 partition 的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多 Executor 的计算能力无法充分利用;但是如果 partition 太大了则会导致分片太多,执行效率降低。

    在执行 Action 类型操作的时候(比如各种 reduce 操作),partition 的数量会选择 parent RDD 中最大的那一个。而 parallelism 则指的是在 RDD 进行 reduce 类操作的时候,默认返回数据的 paritition 数量(而在进行 map 类操作的时候,partition 数量通常取自 parent RDD 中较大的一个,而且也不会涉及 Shuffle,因此这个 parallelism 的参数没有影响)。

    由上述可得,partition 和 parallelism 这两个概念密切相关,都是涉及数据分片,作用方式其实是统一的。通过 Spark.default.parallelism 可以设置默认的分片数量,而很多 RDD 的操作都可以指定一个 partition 参数来显式控制具体的分片数量,如 reduceByKey和reduceByKeyAndWindow。

    Spark Streaming 接收 Kafka 数据的方式,这个过程有一个数据反序列化并存储到 Spark 的开销,如果数据接收成为了整个系统的瓶颈,那么可以考虑增加数据接收的并行度。每个输入 DStream 会创建一个单一的接收器(receiver 在 worker 节点运行)用来接收一个单一的数据流。而对于接收多重数据的情况,可以创建多个输入 DStream 用来接收源数据流的不同分支(partitions)。

    如果我们利用 Receiver 的形式接收 Kafka,一个单一的 Kafka 输入 DStream 接收了两个不同 topic 的数据流,我们为了提高并行度可以创建两个输入流,分别接收其中一个 topic 上的数据。这样就可以创建两个接收器来并行地接收数据,从而提高整体的吞吐量。而之后对于多个 DStreams,可以通过 union 操作并为一个 DStream,之后便可以在这个统一的输入 DStream 上进行操作,代码示例如下:

    val numStreams = 5
    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...)}
    val unifiedStream = streamingContext.union(kafkaStreams)
    unifiedStream.print()
    

    如果采用 Direct 连接方式,前面讲过 Spark 中的 partition 和 Kafka 中的 partition 是一一对应的,但一般默认设置为 Kafka 中 partition 的数量,这样来达到足够并行度以接收 Kafka 数据。

    4、设置合理的批处理间隔

    对于一个 Spark Streaming 应用,只有系统处理数据的速度能够赶上数据接收的速度,整个系统才能保持稳定,否则就会造成数据积压。换句话说,即每个 batch 的数据一旦生成就需要被尽快处理完毕。这一点我们可以通过 Spark 监控界面进行查看(在2.3.4节我们介绍过),比较批处理时间必须小于批处理间隔。

    通过设置合理的批处理大小(batch size),使得每批数据能够在接收后被尽快地处理完成(即数据处理的速度赶上数据生成的速度)。

    如何选取合适的批处理时间呢?一个好的方法是:先保守地设置一个较大的批处理间隔(如 5~10s),以及一个很低的数据速率,来观测系统是否能够赶上数据传输速率。我们可以通过查看每个处理好的 batch 的端到端延迟来观察,也可以看全局延迟来观察(可以在 Spark log4j 的日志里或者使用 StreamingListener 接口,也可以直接在 UI 界面查看)。

    如果延迟保持在一个相对稳定的状态,则整个系统是稳定的,否则延迟不断上升,那说明整个系统是不稳定的。在实际场景中,也可以直接观察系统正在运行的 Spark 监控界面来判断系统的稳定性。

    5、内存优化

    内存优化是在所有应用落地中必须经历的话题,虽然 Spark 在内存方面已经为开发者做了很多优化和默认设置,但是我们还是需要针对具体的情况进行调试。

    在优化内存的过程中需要从3个方面考虑这个问题:对象本身需要的内存;访问这些对象的内存开销;垃圾回收(GC garbage collection)导致的开销。

    通常来说,对于 Java 对象而言,有很快的访问速度,但是很容易消耗原始数据2~5倍以上的内存空间,可以归结为以下几点原因:

    • ① 每个独立的 Java 对象,都会有一个“对象头”,大约16个字节用来保存一些基本信息,如指向类的指针,对于一个只包含很少数据量在内的对象(如一个 Int类型数据),这个开销是相对巨大的。

    • ② Java 的 String 对象会在原始数据的基础上额外开销40个字节,因为除了字符数组(Charsarray)本身之外,还需要保存如字符串长度等额外信息,而且由于 String 内部存储字符时是按照 UTF-16 格式编码的,所以一个10字符的字符串开销很容易超过60个字符。

    • ③ 对于集合类(collection classes),如HashMap、LinkedList,通常使用链表的形式将数据结构链在一起,那么对于每一个节点(entry,如Map.Entry)都会有一个包装器(wrapper),而这个包装器对象不仅包含对象头,还会保存指向下一个节点的指针(每个8字节)。

    • ④ 熟悉 Java 的开发者应该知道,Java 数据类型分为基本类型和包装类型,对于 int、long等基本类型是直接在栈中分配空间,如果我们想将这些类型用在集合类中(如Map<String, Integer>),需要使用对基本数据类型打包(当然这是 Java 的一个自动过程),而打包后的基本数据类型就会产生额外的开销。

    针对以上内存优化的基本问题,接下来首先介绍 Spark 中如何管理内存,之后介绍一些能够在具体应用中更加有效地使用内存的具体策略,例如,如何确定合适的内存级别,如何改变数据结构或将数据存储为序列化格式来节省内存等,也会从 Spark 的缓存及 Java 的垃圾回收方面进行分析,另外,也会对 Spark Streaming 进行分析。

    5.1 内存管理

    Spark 对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于 Shuffle 类操作、join 操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送。

    在 Spark 内部执行和存储分享同一片内存空间(M),当没有执行类内存被使用时,存储类内存可以使用全部的内存空间,反之亦然。执行类内存可以剥夺存储类内存的空间,但是有一个前提是,存储类内存所占空间不得低于某一个阈值 R,也就是说R指定了 M 中的一块子空间块是永远不会被剥夺的。而另一方面由于实现上的复杂性,存储类内存是不可以剥夺执行类内存的。

    Spark 的这种设计方式确保了系统一些很好的特性:首先,如果应用不需要缓存数据,那么所有的空间都可以用作执行类内存,可以一定程度上避免不必要的内存不够用时溢出到硬盘的情况;其次,如果应用需要使用缓存数据,会有最小的内存空间R能够保证这部分数据块免于被剥夺;最后,这种方式对于使用者而言是完全黑盒的,使用者不需要了解内部如何根据不同的任务负载来进行内存划分。

    Spark 提供了两个相关的配置,但是大多数情况下直接使用默认值就能满足大部分负载情况:

    • Spark Memory.Fraction 表示 M 的大小占整个 JVM(Java Virtue Machine)堆空间的比例(默认是0.6),剩余的空间(40%)被用来保存用户的数据结构及 Spark 内部的元数据(metadata),另一方面预防某些异常数据记录造成的 OOM(Out of Memory)错误。
    • Spark.Memory.StorageFraction 表示 R 的大小占整个 M 的比例(默认是0.5),R 是存储类内存在 M 中占用的空间,其中缓存的数据块不会被执行类内存剥夺。

    5.2优化策略

    当我们需要初步判断内存的占用情况时,可以创建一个 RDD,然后将其缓存(cache)起来,然后观察网页监控页面的存储页部分,就可以看出 RDD 占用了多少内存。而对于特殊的对象,我们可以调用 SizeEstimator 的 estimate() 方法来评估内存消耗,这对于实验不同数据层的内存消耗,以及判断广播变量在每个 Executor 堆上所占用的内存是非常有效的。

    当我们了解了内存的消耗情况后,发现占用内存过大,可以着手做一些优化,一方面可以在数据结构方面进行优化。首先需要注意的是,我们要避免本章开头提到的 Java 本身数据结构的头部开销,比如基于指针的数据结构或者包装器类型,有以下方式可以进行优化:

    • 在设计数据结构时,优先使用基本数据类型及对象数组等,避免使用 Java 或者 Scala 标准库当中的集合类(如 HashMap),在
      fastutil 库中,为基本数据类型提供了方便的集合类接口,这些接口也兼容 Java 标准库。

    • 尽可能避免在数据结构中嵌套大量的小对象和指针。

    • 考虑使用数值类 ID 或者枚举对象来代替字符串类型作为主键(Key)。

    • 如果我们的运行时内存小于 32GB,可以加上 JVM 配置-XX:+UseCompressedOops
      将指针的占用空间由8个字节压缩到4个字节,我们也可以在 Spark-env.sh 中进行配置。

    假设我们通过以上策略还是发现对象占用了过大的内存,可以用一个非常简单的方式来降低内存使用,就是将对象以序列化的形式(serialized form)存储,在 RDD 的持久化接口中使用序列化的存储级别,如 MEMORY_ONLY_SER,Spark 便会将每个 RDD 分区存储为一个很大的字节数组。而这种方式会使得访问数据的速度有所下降,因为每个对象访问时都需要有一个反序列化的过程。在7.1节中我们已经介绍过,优先使用 Kryo 序列化方式,其占用大小远低于 Java 本身的序列化方式。

    5.3垃圾回收(GC)优化

    如果我们在应用中进行了频繁的 RDD 变动,那么 JVM 的垃圾回收会成为一个问题(也就是说,假设在程序中只创建了一个 RDD,后续所有操作都围绕这个 RDD,那么垃圾回收就不存在问题)。当 Java 需要通过删除旧对象来为新对象开辟空间时,它便会扫描我们曾创建的所有对象并找到不再使用的对象。

    所以垃圾回收的开销是和 Java 对象的个数成比例的,我们要尽可能地使用包含较少对象的数据结构(如使用 Int 数组代替 LinkedList)来降低这部分开销。另外前面提到的用序列化形式存储也是一个很好的方法,序列化后每个对象在每个 RDD 分区下仅有一个对象(一个字节数组)。注意当 GC 开销成为瓶颈时,首先要尝试的便是序列化缓存(serialized caching)。

    在做 GC 优化时,我们首先需要了解 GC 发生的频率以及其所消耗的时间。这可以通过在 Java 选项中加入 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 来实现;之后当 Spark 任务运行后,便可以在 Worker 日志中看到 GC 发生时打印的信息。注意这些日志是打印在集群中的 Worker 节点上的(在工作目录的 stdout 文件中),而非 Driver 程序。

    为了进一步优化 GC,首先简单介绍下 Java 虚拟机内部是如何进行内存管理的。

    ① Java 对象是存储在堆空间内的,堆空间被分为两部分,即年轻区域(Young region)和老年区域(Old region),其中年轻代(Young generation)会用来存储短生命周期的对象,而老年代(Old generation)会用来存储较长生命周期的对象。

    ② 年轻代的区域又被分为3个部分 [Eden, Survivor1, Survivor2]。

    ③ 一个简单的 GC 流程大致是:当 Eden 区域满了,一次小型 GC 过程会将 Eden 和 Survivor1 中还存活的对象复制到 Survivor2 区域上,Survivor 区域是可交换的(即来回复制),当一个对象存活周期已足够长或者 Survivor2 区域已经满时,那么它们会被移动到老年代上,而当老年代的区域也满了时,就会触发一次完整的 GC 过程。

    Java 的这种 GC 机制主要是基于程序中创建的大多数对象,都会在创建后被很快销毁,只有极少数对象会存活下来,所以其分为年轻代和老年代两部分,而这两部分 GC 的方式也是不同的,其时间复杂度也是不同的,年轻代会更加快一些,感兴趣的读者可以进一步查阅相关资料。

    基于以上原因,Spark 在 GC 方面优化的主要目标是:只有长生命周期的 RDD 会被存储在老年代上,而年轻代上有足够的空间来存储短生命周期的对象,从而尽可能避免任务执行时创建的临时对象触发完整 GC 流程。我们可以通过以下步骤来一步步优化:

    ① 通过 GC 统计信息观察是否存在过于频繁的 GC 操作,如果在任务完成前,完整的 GC 操作被调用了多次,那么说明可执行任务并没有获得足够的内存空间。

    ② 如果触发了过多的小型 GC,而完整的 GC 操作并没有调用很多次,那么给 Eden 区域多分配一些内存空间会有所帮助。我们可以根据每个任务所需内存大小来预估 Eden 的大小,如果 Eden 设置大小为 E,可以利用配置项-Xmn=4/3*E 来对年轻代的区域大小进行设置(其中4/3的比例是考虑到 survivor 区域所需空间)。

    ③ 如果我们观察 GC 打印的统计信息,发现老年代接近存满,那么就需要改变 spark.memory.fraction 来减少存储类内存(用于 caching)的占用,因为与其降低任务的执行速度,不如减少对象的缓存大小。另一个可选方案是减少年轻代的大小,即通过 -Xmn 来进行配置,也可以通过 JVM 的 NewRatio 参数进行调整,大多数 JVM 的该参数的默认值是2,意思是老年代占整个堆内存的2/3,这个比例需要大于 Spark.Memory.Fraction。

    ④ 通过加入 -XX:+UserG1GC 来使用 G1GC 垃圾回收器,这可以一定程度提高 GC 的性能。另外注意对于 executor 堆内存非常大的情况,一定通过 -XX:G1HeapRegionSize 来增加 G1 区域的大小。

    针对以上步骤我们举一个例子,如果我们的任务是从 HDFS 当中读取数据,任务需要的内存空间可以通过从 HDFS 当中读取的数据块大小来进行预估,一般解压后的数据块大小会是原数据块的2~3倍,所以如果我们希望3、4个任务同时运行在工作空间中,假设每个 HDFS 块大小是 128MB,那么需要将 Eden 大小设置为 4×3×128MB。改动之后,我们可以监控 GC 的频率和时间消耗,看看有没有达到优化的效果。

    对于优化 GC,主要还是从降低全局 GC 的频率出发,executor 中对于 GC 优化的配置可以通过 spark.executor.extraJavaOptions 来配置。

    5.5Spark Streaming 内存优化

    前面介绍了 Spark 中的优化策略和关于 GC 方面的调优,对于 Spark Streaming 的应用程序,这些策略也都是适用的,除此之外还会有一些其他方面的优化点。

    对于 Spark Streaming 应用所需要的集群内存,很大程度上取决于要使用哪种类型的 transformation 操作。比如,假设我们想使用10分钟数据的窗口操作,那么我们的集群必须有足够的空间能够保存10分钟的全部数据;亦或,我们在大量的键值上使用了 updateStateByKey 操作,那么所需要的内存空间会较大。而如果我们仅仅使用简单的 Map、Filter、Store 操作,那么所需空间会较小。

    默认情况下,接收器接收来的数据会以 StorageLevel.MEMORY_AND_DISK_SER_2 的格式存储,那么如果内存不足时,数据就会序列化到硬盘上,这样会损失 Spark Streaming 应用的性能。所以通常建议为 Spark Streaming 应用分配充足的内存,可以在小规模数据集上进行测试和判断。

    另一方面与 Spark 程序有显著区别的是,Spark Streaming 程序对实时性要求会较高,所以我们需要尽可能降低 JVM 垃圾回收所导致的延迟。

    基于此,我们可以通过以下几个参数对内存使用和 GC 开销进行优化调整。

    DStream 的持久化级别:在前文中讲过,输入数据默认是持久化为字节流的,因为相较于反序列化的开销,其更会降低内存的使用并且减少 GC的开销。所以优先使用 Kryo 序列化方式,可以大大降低序列化后的尺寸和内存开销。另外,如果需要更进一步减少内存开销,可以通过配置spark.rdd.compress 进行更进一步的压缩(当然对于目前的集群机器,大多数内存都足够了)。

    • 及时清理老数据:默认情况下所有的输入数据和由 DStream 的 Transormation 操作产生的持久 RDD 会被自动清理,即Spark Streaming 会决定何时对数据进行清理。例如,假设我们使用10分钟的窗口操作,Spark Streaming 会保存之前10分钟的所有数据,并及时清理过时的老数据。数据保存的时间可以通过 stremingContext. remember进行设置。

    CMS 垃圾回收器:不同于之前我们在 Spark 中的建议,由于需要减少 GC 间的停顿,所以这里建议使用并发标记清除类的 GC方式。即使并发 GC 会降低全局系统的生产吞吐量,但是使用这种 GC 可以使得每个 Batch 的处理时间更加一致(不会因为某个 Batch 处理时发生了 GC,而导致处理时间剧增)。我们需要确保在 Driver 节点(在 spark-submit 中使用—driver-java-options)和 Executor 节点(在 Spark 配置中使用spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都设置了 CMS GC方式。

    • 其他减少 GC 开销的方式有:可以通过 OFF_HEAP 存储级别的 RDD 持久化方式,以及可以在 Executor上使用更小的堆内存,从而降低每个 JVM 堆垃圾回收的压力。

    6、实例项目调优

    6.1合理的批处理时间(batchDuration)

    关于 Spark Streaming 的批处理时间设置是非常重要的,Spark Streaming 在不断接收数据的同时,需要处理数据的时间,所以如果设置过段的批处理时间,会造成数据堆积,即未完成的 batch 数据越来越多,从而发生阻塞。

    另外值得注意的是,batchDuration 本身也不能设置为小于 500ms,这会导致 Spark Streaming 进行频繁地提交作业,造成额外的开销,减少整个系统的吞吐量;相反如果将 batchDuration 时间设置得过长,又会影响整个系统的吞吐量。

    如何设置一个合理的批处理时间,需要根据应用本身、集群资源情况,以及关注和监控 Spark Streaming 系统的运行情况来调整,重点关注监控界面中的 Total Delay,如图1所示。
    Spark UI 中全局延迟

    6.2合理的 Kafka 拉取量(maxRatePerPartition 参数设置)

    对于数据源是 Kafka 的 Spark Streaming 应用,在 Kafka 数据频率过高的情况下,调整这个参数是非常必要的。我们可以改变 spark.streaming.kafka.maxRatePerPartition 参数的值来进行上限调整,默认是无上限的,即 Kafka 有多少数据,Spark Streaming 就会一次性全拉出,但是上节提到的批处理时间是一定的,不可能动态变化,如果持续数据频率过高,同样会造成数据堆积、阻塞的现象。

    所以需要结合 batchDuration 设置的值,调整 spark.streaming.kafka.maxRatePerPatition 参数,注意该参数配置的是 Kafka 每个 partition 拉取的上限,数据总量还需乘以所有的 partition 数量,调整两个参数 maxRatePerPartition 和 batchDuration 使得数据的拉取和处理能够平衡,尽可能地增加整个系统的吞吐量,可以观察监控界面中的 Input Rate 和 Processing Time,如图2所示。
    Spark UI 中输入速率和平均处理时间

    6.3缓存反复使用的 Dstream(RDD)

    Spark 中的 RDD 和 SparkStreaming 中的 Dstream 如果被反复使用,最好利用 cache() 函数将该数据流缓存起来,防止过度地调度资源造成的网络开销。可以参考并观察 Scheduling Delay 参数,如图3所示。
    图3 SparkUI 中调度延迟

    6.4其他一些优化策略

    除了以上针对 Spark Streaming 和 Kafka 这个特殊场景方面的优化外,对于前面提到的一些常规优化,也可以通过下面几点来完成。

    设置合理的 GC 方式:使用–conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"来配置垃圾回收机制。

    • 设置合理的 parallelism:在 SparkStreaming+kafka 的使用中,我们采用了 Direct 连接方式,前面讲过Spark 中的 partition 和 Kafka 中的 Partition 是一一对应的,一般默认设置为 Kafka 中Partition 的数量。

    设置合理的 CPU 资源数:CPU 的 core 数量,每个 Executor 可以占用一个或多个 core,观察 CPU使用率(Linux 命令 top)来了解计算资源的使用情况。例如,很常见的一种浪费是一个 Executor 占用了多个 core,但是总的 CPU 使用率却不高(因为一个 Executor 并不会一直充分利用多核的能力),这个时候可以考虑让单个 Executor 占用更少的 core,同时 Worker 下面增加更多的 Executor;或者从另一个角度,增加单个节点的 worker 数量,当然这需要修改 Spark 集群的配置,从而增加 CPU 利用率。值得注意是,这里的优化有一个平衡,Executor 的数量需要考虑其他计算资源的配置,Executor 的数量和每个 Executor 分到的内存大小成反比,如果每个 Executor 的内存过小,容易产生内存溢出(out of memory)的问题。

    • 高性能的算子:所谓高性能算子也要看具体的场景,通常建议使用 reduceByKey/aggregateByKey 来代替 groupByKey。而存在数据库连接、资源加载创建等需求时,我们可以使用带 partition 的操作,这样在每一个分区进行一次操作即可,因为分区是物理同机器的,并不存在这些资源序列化的问题,从而大大减少了这部分操作的开销。例如,可以用 mapPartitions、foreachPartitions 操作来代替 map、foreach 操作。另外在进行 coalesce 操作时,因为会进行重组分区操作,所以最好进行必要的数据过滤 filter 操作。

    Kryo 优化序列化性能:我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为 RDD泛型类型的自定义类型等)。

    结果:

    通过以上种种调整和优化,最终我们想要达到的目的便是,整个流式处理系统保持稳定,即 Spark Streaming 消费 Kafka 数据的速率赶上爬虫向 Kafka 生产数据的速率,使得 Kafka 中的数据尽可能快地被处理掉,减少积压,才能保证实时性,如图4所示。
    Spark Streaming 和 Kafka 稳定运行监控图
    当然不同的应用场景会有不同的图形,这是本文词频统计优化稳定后的监控图,我们可以看到在 Processing Time 柱形图中有一条 Stable 的虚线,而大多数 Batch 都能够在这一虚线下处理完毕,说明整体 Spark Streaming 是运行稳定的。

    对于项目中具体的性能调优,有以下几个点需要注意:

    一个 DStream 流只关联单一接收器,如果需要并行多个接收器来读取数据,那么需要创建多个 DStream 流。一个接收器至少需要运行在一个 Executor 上,甚至更多,我们需要保证在接收器槽占用了部分核后,还能有足够的核来处理接收到的数据。例如在设置 spark.cores.max 时需要将接收器的占用考虑进来,同时注意在分配 Executor 给接收器时,采用的是轮循的方式(round robin fashion)。

    • 当接收器从数据源接收到数据时,会创建数据块,在每个微秒级的数据块间隔(blockInterval milliseconds)中都会有一个新的数据块生成。在每个批处理间隔内(batchInterval)数据块的数量 N=batchInterval/blockInterval。这些数据块会由当前执行器Executor)的数据块管理器(BlockManager)分发到其他执行器的数据块管理器。之后在Driver 节点上运行的输入网络追踪器(Network Input Tracker)会通知数据块所在位置,以期进一步处理。

    RDD 是基于 Driver 节点上每个批处理间隔产生的数据块(blocks)而创建的,这些数据块是 RDD 的分支(partitions),每个分支是 Spark 中的一个任务(task)。如果 blockInterval == batchInterval,那么意味着创建了单一分支,并且可能直接在本地处理。

    • 数据块上的映射(map)任务在执行器(一个接收块,另一个复制块)中处理,该执行器不考虑块间隔,除非出现非本地调度。拥有更大的块间隔(blockInterval)意味着更大的数据块,如果将 spark.locality.wait 设置一个更大的值,那么更有可能在本地节点处理数据块。我们需要在两个参数间(blockInterval和spark.locality.wait)做一个折中,确保越大的数据块更可能在本地被处理。

    除了依赖于 batchInterval 和 blockInterval,我们可以直接通过 inputDstream. repartition(n) 来确定分支的数量。这个操作会重新打乱(reshuffles)RDD 中的数据,随机的分配给 n 个分支。当然打乱(shuffle)过程会造成一定的开销,但是会有更高的并行度。RDD 的处理是由驱动程序的 jobscheduler 作为作业安排的。在给定的时间点上,只有一个作业是活动的。因此,如果一个作业正在执行,那么其他作业将排队。

    • 如果我们有两个 Dstreams,那么将形成两个RDDs,并将创建两个作业,每个作业(job)都被安排为一个接着一个地执行。为了避免这种情况,可以联合两个Dstreams(union)。这将确保为 Dstreams 的两个 RDD 形成单一的 unionRDD。而这个 unionRDD
      会被视为一个作业,但是 RDDs 的分区不会受到影响。

    如果批处理时间大于 batchinterval,那么很明显,接收方的内存将逐渐被填满,并最终抛出异常(很可能是 BlockNotFoundException)。目前没有办法暂停接收,那么可以利用 SparkConf 配置项中的 spark.streaming.receiver.maxRate 来控制接收器的速率。

    小结

    ① Spark Streaming 中需要大量的序列化和反序列化操作,在2.0.0以上的 Spark 版本中,我们应当优先考虑使用 Kryo 序列化方式。

    ② 对于非常大的变量,如配置信息,可以提前利用广播变量的方式传送给每一个节点。

    ③ 在流式处理系统中,我们需要兼顾数据的接收和数据处理,即消费数据的速率要赶上生产数据的速率。当发现生产数据速率过慢时,可以考虑增加并行度,使用更多的接收器(Receiver);如果处理速度过慢,可以考虑加机器、优化程序逻辑及 GC 优化等方式。

    ④ Spark 内存分为执行类内存和存储类内存,执行类内存可以剥夺存储类内存空间,但是存储类内存空间有一个最低阈值会保证保留。

    ⑤ 内存优化最简单的方式是使用序列化格式进行对象存储,另外一方面考虑到 Java/Scala 对象本身会有所开销,应尽可能减少对象的数量。

    ⑥ 对于 Spark 而言,垃圾回收采用 G1GC,而 Spark Streaming 采用 CMS。

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 18,107
精华内容 7,242
关键字:

spark调优