spark-sql 性能调优

2018-07-14 17:12:54 pengzonglu7292 阅读数 549

最近在学习spark时,觉得Spark SQL性能调优比较重要,所以自己写下来便于更过的博友查看,欢迎大家指导。

在spark中,Spark SQL性能调优只要是通过下面的一些选项进行优化的:

1 spark.sql.codegen 默认值为false,当它设置为true时,Spark SQL会把每条查询的语句在运行时编译为java的二进制代码。这有什么作用呢?它可以提高大型查询的性能,但是如果进行小规模的查询的时候反而会变慢,就是说直接用查询反而比将它编译成为java的二进制代码快。所以在优化这个选项的时候要视情况而定。

2 spark.sql.inMemoryColumnStorage.compressed 默认值为false 它的作用是自动对内存中的列式存储进行压缩

3 spark.sql.inMemoryColumnStorage.batchSize 默认值为1000 这个参数代表的是列式缓存时的每个批处理的大小。如果将这个值调大可能会导致内存不够的异常,所以在设置这个的参数的时候得注意你的内存大小

4 spark.sql.parquet.compressed.codec 默认值为snappy 这个参数代表使用哪种压缩编码器。可选的选项包括uncompressed/snappy/gzip/lzo

uncompressed这个顾名思义就是不用压缩的意思

                          下面是总结的几种压缩选项的特点

格式可分割平均压缩速度文本文件压缩效率Hadoop压缩编解码器纯java实现原生备注
snappy非常快org.apache.hadoop.io.
compress.SnappyCodec
 
Snappy有纯java
的移植版,
但是在Spark/
Hadoop中
不能用
gziporg.apache.hadoop.io.
compress.GzipCodec


lzo非常快中等org.apache.hadoop.io.
compress.LzoCodec

需要在每个节点上
安装LZO

当然在设置上面这些参数的时候需要给予特别的考量。第一spark.sql.codegen,这个选项可以让Spark SQL把每条查询语句在运行前编译为java二进制代码,由于生成了专门运行指定查询的代码,codegen可以让大型查询或者频繁重复的查询明显变快,然而在运行特别快(1-2秒)的即时查询语句时,codegen就可能增加额外的开销(将查询语句编译为java二进制文件)。

codegen还是一个实验性的功能,但是在大型的或者重复运行的查询中使用codegen。

调优时可能还需要考虑第二个选项是spark.sql.inMemoryColumnarStorage.batchSize,在缓存SchemaRDD(Row RDD)时,Spark SQL会安照这个选项设定的大小(默认为1000)把记录分组,然后分批次压缩。

太小的批处理会导致压缩比过低,而太大的话,比如当每个批处理的数据超过内存所能容纳的大小时,也有可能引发问题。

如果你表中的记录比价大(包含数百个字段或者包含像网页这样非常大的字符串字段),就可能需要调低批处理的大小来避免内存不够(OOM)的错误。如果不是在这样的场景下,默认的批处理 的大小是比较合适的,因为压缩超过1000条压缩记录时也基本无法获得更高的压缩比了。


原文链接:https://blog.csdn.net/yqlakers/article/details/68925328

2017-08-01 18:01:25 xgjianstart 阅读数 6282

1、Spark调优背景

目前Zeppelin已经上线一段时间,Spark作为底层SQL执行引擎,需要进行整体性能调优,来提高SQL查询效率。本文主要给出调优的结论,因为涉及参数很多,故没有很细粒度调优,但整体调优方向是可以得出的。

环境:服务器600+,spark 2.0.2,Hadoop 2.6.0

2、调优结果

调优随机选取线上9条SQL,表横轴是调优测试项目,测试在集群空闲情况下进行,后一个的测试都是叠加前面测试参数。从数据可参数经过调优,理想环境下性能可提高50%到300%

3、 下面为调优分享PPT

1)一图概览

这里写图片描述

2) Spark集群优化——数据本地性

这里写图片描述

3)Spark集群优化——存储格式选择

这里写图片描述

4)Spark参数优化——计算资源

这里写图片描述

5) Spark参数优化——并行度

这里写图片描述

6)Spark参数优化——offheap内存

这里写图片描述

7)Spark参数优化——大小表join

这里写图片描述

8)Spark参数优化——其他

这里写图片描述

9) Spark参数优化——shuffle过程

这里写图片描述

10)Spark代码优化——RDD复用

这里写图片描述

11)Spark代码优化——选择合适算子

这里写图片描述

12) Spark代码优化——shuffle算子并行度调优

这里写图片描述

13)Spark代码优化——数据倾斜

这里写图片描述

14)Spark代码优化——优化数据结构

这里写图片描述

15)Spark代码优化——使用DateSet API

这里写图片描述

16)Spark代码优化——使用DateSet API

这里写图片描述

17) 目前Spark的瓶颈——内存

这里写图片描述

18) 目前Spark的瓶颈——内存

这里写图片描述

3、总结

调优参数虽名目多样,但最终目的是提高CPU利用率,降低带宽IO,提高缓存命中率,减少数据落盘。 
不同数据量的最优参数都不相同,调优目的是让参数适应数据的量级以最大程度利用资源,经调优发现并不是所有参数有效,有的参数的效果也不明显,最后折中推荐如下调优参数以适应绝大多数SQL情况,个别SQL需要用户单独调参优化。(以下参数主要用于Spark Thriftserver,仅供参考)

参数 含义 默认值 调优值
spark.sql.shuffle.partitions 并发度 200 800
spark.executor.overhead.memory executor堆外内存 512m 1.5g
spark.executor.memory executor堆内存 1g 9g
spark.executor.cores executor拥有的core数 1 3
spark.locality.wait.process 进程内等待时间 3 3
spark.locality.wait.node 节点内等待时间 3 8
spark.locality.wait.rack 机架内等待时间 3 5
spark.rpc.askTimeout rpc超时时间 10 1000
spark.sql.autoBroadcastJoinThreshold 小表需要broadcast的大小阈值 10485760 33554432
spark.sql.hive.convertCTAS 创建表是否使用默认格式 false true
spark.sql.sources.default 默认数据源格式 parquet orc
spark.sql.files.openCostInBytes 小文件合并阈值 4194304 6291456
spark.sql.orc.filterPushdown orc格式表是否谓词下推 false true
spark.shuffle.sort.bypassMergeThreshold shuffle read task阈值,小于该值则shuffle write过程不进行排序 200 600
spark.shuffle.io.retryWait 每次重试拉取数据的等待间隔 5 30
spark.shuffle.io.maxRetries 拉取数据重试次数 3 10

如果觉得文章有什么值得讨论的欢迎来讨论,如果觉得文章不错,也希望点个赞作为对我的支持。

2019-05-18 20:24:59 maizi1045 阅读数 1414

本文主要是日常工作的积累,主要是简单罗列了常见的spark SQL的参数及其含义。

#Job ID /Name
spark.app.name=xxx

#yarn 进行调度,也可以是mesos,yarn,以及standalone

#一个spark application,是一个spark应用。一个应用对应且仅对应一个sparkContext。每一个应用,运行一组独立的executor processes。一个应用,可以以多线程的方式提交多个作业job。spark可以运行在多种集群管理器上如:mesos,yarn,以及standalone,每种集群管理器都会提供跨应用的资源调度策略。
spark.master=yarn

#激活外部shuffle服务。服务维护executor写的文件,因而executor可以被安全移除。
#需要设置spark.dynamicAllocation.enabled 为true,同事指定外部shuffle服务。
#对shuffle来说,executor现将自己的map输出写入到磁盘,然后,自己作为一个server,向其他executor提供这些map输出文件的数据。而动态资源调度将executor返还给集群后,这个shuffle数据服务就没有了。因此,如果要使用动态资源策略,解决这个问题的办法就是,将保持shuffle文件作为一个外部服务,始终运行在spark集群的每个节点上,独立于应用和executor
spark.shuffle.service.enabled=true

#在默认情况下,三种集群管理器均不使用动态资源调度模式。所以要使用动态资源调度需要提前配置。
spark.dynamicAllocation.enabled=true

# 如果所有的executor都移除了,重新请求时启动的初始executor数
spark.dynamicAllocation.initialExecutors=10

# 最少保留的executor数
spark.dynamicAllocation.minExecutors=5

# 最多使用的executor数,默认为你申请的最大executor数
spark.dynamicAllocation.maxExecutors=50

# 可以是cluster也可以是Client
spark.submit.deployMode=cluster

# 指定提交到Yarn的资源池
spark.yarn.queue=xxxx

#  在yarn-cluster模式下,申请Yarn App Master(包括Driver)所用的内存。
spark.driver.memory=4g
# excutor的核心数
spark.executor.cores=32
# 一个Executor对应一个JVM进程。Executor占用的内存分为两部分:ExecutorMemory和MemoryOverhead
spark.executor.memory=16g
spark.yarn.executor.memoryOverhead=1g

# shuffle分区数100,根据数据量进行调控,这儿配置了Join时shuffle的分区数和聚合数据时的分区数。
spark.sql.shuffle.partitions=12

# 如果用户没有指定并行度,下面这个参数将是RDD中的分区数,它是由join,reducebykey和parallelize 
# 这个参数只适用于未加工的RDD不适用于dataframe
# 没有join和聚合计算操作,这个参数将是无效设置
spark.default.parallelism

# 打包传入一个分区的最大字节,在读取文件的时候。
spark.sql.files.maxPartitionBytes=128MB

# 用相同时间内可以扫描的数据的大小来衡量打开一个文件的开销。当将多个文件写入同一个分区的时候该参数有用。
# 该值设置大一点有好处,有小文件的分区会比大文件分区处理速度更快(优先调度)。
spark.sql.files.openCostInBytes=4MB

# Spark 事件总线是SparkListenerEvent事件的阻塞队列大小
spark.scheduler.listenerbus.eventqueue.size=100

# 是否启动推测机制
spark.speculation=false

# 开启spark的推测机制,开启推测机制后如果某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后Spark会选取最快的作为最终结果。
# 2表示比其他task慢两倍时,启动推测机制
spark.speculation.multiplier=2

# 推测机制的检测周期
spark.speculation.interval=500ms

# 完成task的百分比时启动推测
spark.speculation.quantile=0.5

# 最多允许失败的Executor数量。
spark.task.maxFailures=10

# spark序列化  对于优化<网络性能>极为重要,将RDD以序列化格式来保存减少内存占用.
spark.serializer=org.apache.spark.serializer.KryoSerializer
 
# 因为spark是基于内存的机制,所以默认是开启RDD的压缩
spark.rdd.compress=true

# Spark的安全管理
#https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SecurityManager.scala
spark.ui.view.acls=*
spark.ui.view.acls.groups=*

# 表示配置GC线程数为3
spark.executor.extraJavaOptions="-XX:ParallelGCThreads=3"

# 最大广播表的大小。设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表。这里设置的是10MB
spark.sql.autoBroadcastJoinThreshold=104857600

# 广播等待超时,这里单位是秒
spark.sql.broadcastTimeout=33

# 心跳检测间隔
spark.yarn.scheduler.heartbeat.interval-ms=1000

#假如设置为true,SparkSql会根据统计信息自动的为每个列选择压缩方式进行压缩。
spark.sql.inMemoryColumnarStorage.compressed=true

#控制列缓存的批量大小。批次大有助于改善内存使用和压缩,但是缓存数据会有OOM的风险
spark.sql.inMemoryColumnarStorage.batchSize=100

参考:
http://spark.apache.org/docs/latest/configuration.html

2018-08-13 14:26:00 weixin_39861172 阅读数 1766

--sparksubmit
   --num-executors  
        该参数主要用于设置该应用总共需要多少executors来执行,Driver在向集群资源管理器申请资源时需要根据此参数决定分配的Executor个数,并尽量满足所需。在不带的情况下只会分配少量Executor。这个值得设置还是要看分配的队列的资源情况,太少了无法充分利用集群资源,太多了则难以分配需要的资源。

   --executor-memory
        设置每个executor的内存,对Spark作业运行的性能影响很大。一般4-8G就差不多了,当然还要看资源队列的情况。num-executor*executor-memory的大小绝不能超过队列的内存总大小。

   --executor-cores
        设置每个executor的cpu核数,其决定了每个executor并行执行task的能力。Executor的CPU core数量设置为2-4个即可。弹药注意,num-executor*executor-cores也不能超过分配队列中cpu核数的大小。具体的核数的设置需要根据分配队列中资源统筹考虑,取得Executor,核数,及任务数的平衡。对于多任务共享的队列,更要注意不能将资源占满    

  --driver-memory
        运行sparkContext的Driver所在所占用的内存,通常不必设置,设置的话1G就足够了,除非是需要使用collect之类算子经常需要将数据提取到driver中的情况。
  --total-executor-cores    
        是所有executor总共使用的cpu核数 standalone default all cores

--conf

 --conf spark.default.parallelism
        此参数用于设置每个stage经TaskScheduler进行调度时生成task的数量,此参数未设置时将会根据读到的RDD的分区生成task,即根据源数据在hdfs中的分区数确定,若此分区数较小,则处理时只有少量task在处理,前述分配的executor中的core大部分无任务可干。通常可将此值设置为num-executors*executor-cores的2-3倍为宜,如果与其相近的话,则对于先完成task的core则无任务可干。2-3倍数量关系的话即不至于太零散,又可是的任务执行更均衡。!!个人建议配置该参数      

    --conf spark.storage.memoryFraction
        参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
        参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。个人不太建议调该参数

    --conf spark.shuffle.memoryFraction
        参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
        参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。个人不太建议调该参数

    --conf spark.sql.codegen
        默认值为false,当它设置为true时,Spark SQL会把每条查询的语句在运行时编译为java的二进制代码。这有什么作用呢?它可以提高大型查询的性能,但是如果进行小规模的查询的时候反而会变慢,就是说直接用查询反而比将它编译成为java的二进制代码快。所以在优化这个选项的时候要视情况而定。
        这个选项可以让Spark SQL把每条查询语句在运行前编译为java二进制代码,由于生成了专门运行指定查询的代码,codegen可以让大型查询或者频繁重复的查询明显变快,然而在运行特别快(1-2秒)的即时查询语句时,codegen就可能增加额外的开销(将查询语句编译为java二进制文件)。codegen还是一个实验性的功能,但是在大型的或者重复运行的查询中使用codegen

    --conf spark.sql.inMemoryColumnStorage.compressed
        默认值为false 它的作用是自动对内存中的列式存储进行压缩

    --conf spark.sql.inMemoryColumnStorage.batchSize    
        默认值为1000 这个参数代表的是列式缓存时的每个批处理的大小。如果将这个值调大可能会导致内存不够的异常,所以在设置这个的参数的时候得注意你的内存大小
        在缓存SchemaRDD(Row RDD)时,Spark SQL会安照这个选项设定的大小(默认为1000)把记录分组,然后分批次压缩。
        太小的批处理会导致压缩比过低,而太大的话,比如当每个批处理的数据超过内存所能容纳的大小时,也有可能引发问题。
        如果你表中的记录比价大(包含数百个字段或者包含像网页这样非常大的字符串字段),就可能需要调低批处理的大小来避免内存不够(OOM)的错误。如果不是在这样的场景下,默认的批处理 的大小是比较合适的,因为压缩超过1000条压缩记录时也基本无法获得更高的压缩比了。

    --conf spark.sql.parquet.compressed.codec
        默认值为snappy 这个参数代表使用哪种压缩编码器。可选的选项包括uncompressed/snappy/gzip/lzo        uncompressed这个顾名思义就是不用压缩的意思

    --conf spark.speculation 
        推测执行优化机制采用了典型的以空间换时间的优化策略,它同时启动多个相同task(备份任务)处理相同的数据块,哪个完成的早,则采用哪个task的结果,这样可防止拖后腿Task任务出现,进而提高作业计算速度,但是,这样却会占用更多的资源,在集群资源紧缺的情况下,设计合理的推测执行机制可在多用少量资源情况下,减少大作业的计算时间。
        检查逻辑代码中注释很明白,当成功的Task数超过总Task数的75%(可通过参数spark.speculation.quantile设置)时,再统计所有成功的Tasks的运行时间,得到一个中位数,用这个中位数乘以1.5(可通过参数spark.speculation.multiplier控制)得到运行时间门限,如果在运行的Tasks的运行时间超过这个门限,则对它启用推测。简单来说就是对那些拖慢整体进度的Tasks启用推测,以加速整个Stage的运行
        spark.speculation.interval    100毫秒    Spark经常检查要推测的任务。
        spark.speculation.multiplier    1.5    任务的速度比投机的中位数慢多少倍。
        spark.speculation.quantile    0.75    在为特定阶段启用推测之前必须完成的任务的分数。
        
    --conf spark.shuffle.consolidateFiles
        默认值:false
        参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
        调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。    

    --conf spark.shuffle.file.buffer    
        默认值:32k
        参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
        调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k,一定是成倍的增加),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

    --conf spark.reducer.maxSizeInFlight
        默认值:48m
        参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
        调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

    --conf spark.shuffle.io.maxRetries
        默认值:3
        参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
        调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
        shuffle file not find    taskScheduler不负责重试task,由DAGScheduler负责重试stage

    --conf spark.shuffle.io.retryWait
        默认值:5s
        参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
        调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

    --conf spark.shuffle.memoryFraction
        默认值:0.2
        参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
        调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。    

    --conf spark.shuffle.manager
        默认值:sort|hash
        参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
        调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

    --conf spark.shuffle.sort.bypassMergeThreshold
        默认值:200
        参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
        调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

    --conf spark.shuffle.consolidateFiles
        默认值:false
        参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
        调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。                

2020-05-14 16:52:29 shufangreal 阅读数 52

SPARK-SQL性能调优

SparkSQL的官方优化可以参考(本文主要从Spark2.4.0来概述):

http://spark.apache.org/docs/2.4.0/sql-performance-tuning.html#caching-data-in-memory

For some workloads, it is possible to improve performance by either caching data in memory, or by turning on some experimental options.

SparkSQL的优化主要可以从3个方向去考虑:

  • 在内存中缓存数据(Caching Data In Memory
  • 相关的优化调整配置options(Other Configuration Options
  • 广播变量优化(Broadcast Hint for SQL Queries

那么我们分别从这三个方面去概述:

1、内存中缓存数据

SPARK-SQL可以通过spark.catalog.cacheTable("tableName") or dataFrame.cache()来将表以列式存储的形似缓存在内存中。缓存之后,SPARK-SQL只会去扫描相应查询的列,类似于Hive中的列裁剪与分区裁剪,而且还会自动将数据进行压缩去减少内存的消耗及GC性能压力,同时,假如一张表用完之后,可以将该缓存通过

spark.catalog.uncached("tableName")进行内存释放

相关的缓存压缩配置可以通过以下2个参数进行指定

#设置为true时,Spark SQL将根据数据统计信息自动为每一列选择一个压缩编解码器
spark.sql.inMemoryColumnarStorage.compressed=true
#控制用于列式缓存的批处理的大小。较大的批处理大小可以提高内存利用率和压缩率,但是在缓存数据时可能会出现OOM
spark.sql.inMemoryColumnarStorage.batchSize=10000

2、其他优化配置options

Note:以下的配置选项也能够被选择去优化执行性能,但是这些选项可能会因为在未来的版本中被自动优化而被遗弃使用(deprecated)

Options 默认Value 含义
spark.sql.files.maxPartitionBytes 134217728(128 MB) 读取文件时打包到单个分区中的最大字节数。
spark.sql.files.openCostInBytes 4194304(4 MB) 可以同时扫描以字节数衡量的打开文件的估计成本。将多个文件放入分区时使用。最好高估一下,然后,具有较小文件的分区将比具有较大文件的分区(首先安排)更快。
spark.sql.broadcastTimeout 300 广播加入中广播等待时间的秒数超时
spark.sql.autoBroadcastJoinThreshold 10485760(10 MB) 配置表的最大大小(以字节为单位),该表在执行联接时将广播到所有工作程序节点。通过将此值设置为-1,可以禁用广播。请注意,当前仅ANALYZE TABLE COMPUTE STATISTICS noscan运行命令的Hive Metastore表支持统计信息 。
spark.sql.shuffle.partitions 200 配置在对联接或聚集进行数据混排时要使用的分区数。

3、广播变量优化提示

当与其他的table或者view进行join时,BROADCAST暗示会指引SPARKSQL去广播指定的table,当SPARK决定连接方法时,即使统计数据高于配置,也会首选BROAD-HASH-JOIN连接,spark.sql.autoBroadcastJoinThreshold,当指定了JOIN的两端时,SPARK会广播较小的一方,注意:SPARK不能始终保证选择BROAD-HASH-JOIN,因为并非所有情况下(比如 FULL OUTER JOIN)都会支持BHJ,当选择广播嵌套循环连接 ,我们依然遵守Hint提示

import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()

spark-sql调优

阅读数 1821