2017-10-21 19:45:41 ZYC88888 阅读数 1432
  • Spark内核机制解析及性能调优教程(含资料)

    由于Spark基于内存计算的特性,集群的任何资源都可以成为Spark程序的瓶颈:CPU,网络带宽,或者内存。通常,如果内存容得下数据,瓶颈会是网络带宽。不过有时你同样需要做些优化,例如将RDD以序列化到磁盘,来降低内存占用。 本教程通过源码引导读者深入理解Spark的集群部署的内部机制、Spark内部调度的机制、Executor的内部机制、Shuffle的内部机制,进而讲述Tungsten的内部机制,让学员知其然知其所以然。教程的后部分,是任何Spark应用者都很好关注的Spark性能调优的内容。

    22 人正在学习 去看看 张长志

sparksql性能调优

性能优化参数 
这里写图片描述

在spark中,Spark SQL性能调优只要是通过下面的一些选项进行优化的:

1 spark.sql.codegen 默认值为false,当它设置为true时,Spark SQL会把每条查询的语句在运行时编译为java的二进制代码。这有什么作用呢?它可以提高大型查询的性能,但是如果进行小规模的查询的时候反而会变慢,就是说直接用查询反而比将它编译成为java的二进制代码快。所以在优化这个选项的时候要视情况而定。

2 spark.sql.inMemoryColumnStorage.compressed 默认值为false 它的作用是自动对内存中的列式存储进行压缩

3 spark.sql.inMemoryColumnStorage.batchSize 默认值为1000 这个参数代表的是列式缓存时的每个批处理的大小。如果将这个值调大可能会导致内存不够的异常,所以在设置这个的参数的时候得注意你的内存大小

4 spark.sql.parquet.compressed.codec 默认值为snappy 这个参数代表使用哪种压缩编码器。可选的选项包括uncompressed/snappy/gzip/lzo

uncompressed这个顾名思义就是不用压缩的意思

                          下面是总结的几种压缩选项的特点

格式 可分割 平均压缩速度 文本文件压缩效率 Hadoop压缩编解码器 纯java实现 原生 备注
snappy 非常快 org.apache.hadoop.io.
compress.SnappyCodec
 
Snappy有纯java
的移植版,
但是在Spark/
Hadoop中
不能用
gzip org.apache.hadoop.io.
compress.GzipCodec


lzo 非常快 中等 org.apache.hadoop.io.
compress.LzoCodec

需要在每个节点上
安装LZO

当然在设置上面这些参数的时候需要给予特别的考量。第一spark.sql.codegen,这个选项可以让Spark SQL把每条查询语句在运行前编译为java二进制代码,由于生成了专门运行指定查询的代码,codegen可以让大型查询或者频繁重复的查询明显变快,然而在运行特别快(1-2秒)的即时查询语句时,codegen就可能增加额外的开销(将查询语句编译为java二进制文件)。

codegen还是一个实验性的功能,但是在大型的或者重复运行的查询中使用codegen。

调优时可能还需要考虑第二个选项是spark.sql.inMemoryColumnarStorage.batchSize,在缓存SchemaRDD(Row RDD)时,Spark SQL会安照这个选项设定的大小(默认为1000)把记录分组,然后分批次压缩。

太小的批处理会导致压缩比过低,而太大的话,比如当每个批处理的数据超过内存所能容纳的大小时,也有可能引发问题。

如果你表中的记录比价大(包含数百个字段或者包含像网页这样非常大的字符串字段),就可能需要调低批处理的大小来避免内存不够(OOM)的错误。如果不是在这样的场景下,默认的批处理 的大小是比较合适的,因为压缩超过1000条压缩记录时也基本无法获得更高的压缩比了。


以下是在学习和使用spark过程中遇到的一些问题,记录下来。

1、首先来说说spark任务运行完后查错最常用的一个命令,那就是把任务运行日志down下来。 程序存在错误,将日志down下来查看具体原因!down日志命令:yarn logs -applicationId app_id

2、Spark性能优化的9大问题及其解决方案

Spark程序优化所需要关注的几个关键点——最主要的是数据序列化和内存优化

问题1:reduce task数目不合适

解决方法:需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism。通常,reduce数目设置为core数目的2到3倍。数量太大,造成很多小任务,增加启动任务的开销;数目太少,任务运行缓慢。

问题2:shuffle磁盘IO时间长

解决方法:设置spark.local.dir为多个磁盘,并设置磁盘为IO速度快的磁盘,通过增加IO来优化shuffle性能;

问题3:map|reduce数量大,造成shuffle小文件数目多

解决方法:默认情况下shuffle文件数目为map tasks * reduce tasks. 通过设置spark.shuffle.consolidateFiles为true,来合并shuffle中间文件,此时文件数为reduce tasks数目;

问题4:序列化时间长、结果大

解决方法:Spark默认使.用JDK.自带的ObjectOutputStream,这种方式产生的结果大、CPU处理时间长,可以通过设置spark.serializer为org.apache.spark.serializer.KryoSerializer。另外如果结果已经很大,可以使用广播变量;

问题5:单条记录消耗大

解决方法:使用mapPartition替换map,mapPartition是对每个Partition进行计算,而map是对partition中的每条记录进行计算;

问题6:collect输出大量结果时速度慢

解决方式:collect源码中是把所有的结果以一个Array的方式放在内存中,可以直接输出到分布式?文件系统,然后查看文件系统中的内容;

问题7:任务执行速度倾斜

解决方式:如果是数据倾斜,一般是partition key取的不好,可以考虑其它的并行处理方式 ,并在中间加上aggregation操作;如果是Worker倾斜,例如在某些worker上的executor执行缓慢,可以通过设置spark.speculation=true 把那些持续慢的节点去掉;

问题8:通过多步骤的RDD操作后有很多空任务或者小任务产生

解决方式:使用coalesce或repartition去减少RDD中partition数量;

问题9:Spark Streaming吞吐量不高

解决方式:可以设置spark.streaming.concurrentJobs

3、intellij idea直接编译spark源码及问题解决:

http://blog.csdn.net/tanglizhe1105/article/details/50530104

http://stackoverflow.com/questions/18920334/output-path-is-shared-between-the-same-module-error

Spark编译:clean package -Dmaven.test.skip=true

参数:-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m

4、import Spark source code into intellj, build Error:

not found: type SparkFlumeProtocol and EventBatch

http://stackoverflow.com/questions/33311794/import-spark-source-code-into-intellj-build-error-not-found-type-sparkflumepr

spark_complie_config.png

5、org.apache.spark.SparkException: Exception thrown in awaitResult

set "spark.sql.broadcastTimeout" to increase the timeout

6、Apache Zeppelin编译安装:

Apache Zeppelin installation grunt build error:

解决方案:进入web模块npm install;

http://stackoverflow.com/questions/33352309/apache-zeppelin-installation-grunt-build-error?rq=1

7、Spark源码编译遇到的问题解决:http://www.tuicool.com/articles/NBVvai

内存不够,这个错误是因为编译的时候内存不够导致的,可以在编译的时候加大内存。

[ERROR] PermGen space -> [Help 1]

[ERROR]

[ERROR] To see the full stack trace of the errors,re-run Maven with the -e switch.

[ERROR] Re-run Maven using the -X switch to enable full debug logging.

[ERROR]

[ERROR] For more information about the errors and possible solutions,

please read the following articles:

[ERROR] [Help 1]http://cwiki.apache.org/confluence/display/MAVEN/OutOfMemoryError

export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"

8、Exception in thread "main" java.lang.UnsatisfiedLinkError: no jnind4j in java.library.path

解决方案:I’m using a 64-Bit Java on Windows and still get the no jnind4j in java.library.path error It may be that you have incompatible DLLs on your PATH. In order to tell DL4J to ignore those you have to add the following as a VM parameter (Run -> Edit Configurations -> VM Options in IntelliJ): -Djava.library.path=""

9、spark2.0本地运行源码报错解决办法:

修改对应pom中的依赖jar包,将scope级别由provided改为compile

运行类之前,去掉make选项;在运行vm设置中增加-Dspark.master=local

Win7下运行spark example代码报错:

java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:D:/SourceCode/spark-2.0.0/spark-warehouse修改SQLConf类中WAREHOUSE_PATH变量,将file:前缀改为file:/或file:///

createWithDefault("file:/${system:user.dir}/spark-warehouse")

local模式运行:-Dspark.master=local

10、解决Task not serializable Exception错误

方法1:将RDD中的所有数据通过JDBC连接写入数据库,若使用map函数,可能要为每个元素都创建connection,这样开销很大,如果使用mapPartitions,那么只需要针对每个分区建立connection;mapPartitions处理后返回的是Iterator。

方法2:对未序列化的对象加@transisent引用,在进行网络通信时不对对象中的属性进行序列化

11、这个函数在func("11")调用时候正常,但是在执行func(11)或func(1.1)时候就会报error: type mismatch的错误. 这个问题很好解决

针对特定的参数类型, 重载多个func函数,这个不难, 传统JAVA中的思路, 但是需要定义多个函数

使用超类型, 比如使用AnyVal,Any;这样的话比较麻烦,需要在函数中针对特定的逻辑做类型转化,从而进一步处理上面两个方法使用的是传统JAVA思路,虽然都可以解决该问题,但是缺点是不够简洁;在充满了语法糖的Scala中,针对类型转换提供了特有的implicit隐式转化的功能;

12、org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle

解决方案:这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,直到application失败。一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。

13、Spark ML PipeLine GBT/RF预测时报错,java.util.NoSuchElementException: key not found: 8.0

错误原因:由于GBT/RF模型输入setFeaturesCol,setLabelCol参数列名不一致导致。

解决方案:只保存训练算法模型,不保存PipeLineModel

14、linux删除乱码文件,step1. ls -la; step2. find . -inum inode num -exec rm {} -rf \;

15、Caused by: java.lang.RuntimeException: Failed to commit task Caused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit

如果你比较了解spark中的stage是如何划分的,这个问题就比较简单了。一个Stage中包含的task过大,一般由于你的transform过程太长,因此driver给executor分发的task就会变的很大。所以解决这个问题我们可以通过拆分stage解决。也就是在执行过程中调用cache.count缓存一些中间数据从而切断过长的stage。



代码实例

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;


public class PerformanceTuneDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");
        conf.set("spark.sql.codegen", "false");
        conf.set("spark.sql.inMemoryColumnarStorage.compressed", "false");
        conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "1000");
        conf.set("spark.sql.parquet.compression.codec", "snappy");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaSQLContext sqlCtx = new JavaSQLContext(sc);
        JavaHiveContext hiveCtx = new JavaHiveContext(sc);

        List<Row> result = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

Beenline命令行设置参数

beeline> set spark.sql.codegen=true;
SET spark.sql.codegen=true
spark.sql.codegen=true
Time taken: 1.196 seconds
  • 1
  • 2
  • 3
  • 4

参数说明

spark.sql.codegen Spark SQL在每次执行次,先把SQL查询编译JAVA字节码。针对执行时间长的SQL查询或频繁执行的SQL查询,此配置能加快查询速度,因为它产生特殊的字节码去执行。但是针对很短(1 - 2秒)的临时查询,这可能增加开销,因为它必须先编译每一个查询。

spark.sql.inMemoryColumnarStorage.batchSize:

When caching SchemaRDDs, Spark SQL groups together the records in the RDD in batches of the size given by this option (default: 1000), and compresses each batch. Very small batch sizes lead to low compression, but on the other hand very large sizes can also be problematic, as each batch might be too large to build up in memory.

spark是一个快速的内存计算框架;同时是一个并行运算的框架。在计算性能调优的时候,除了要考虑广为人知的木桶原理外,还要考虑平行运算的Amdahl定理。 
木桶原理又称短板理论,其核心思想是:一只木桶盛水的多少,并不取决于桶壁上最高的那块木块,而是取决于桶壁上最短的那块。将这个理论应用到系统性能优化上,系统的最终性能取决于系统中性能表现最差的组件。例如,即使系统拥有充足的内存资源和CPU资源,但是如果磁盘I/O性能低下,那么系统的总体性能是取决于当前最慢的磁盘I/O速度,而不是当前最优越的CPU或者内存。在这种情况下,如果需要进一步提升系统性能,优化内存或者CPU资源是毫无用处的。只有提高磁盘I/O性能才能对系统的整体性能进行优化。 
Amdahl定理,一个计算机科学界的经验法则,因吉恩·阿姆达尔而得名。它代表了处理器平行运算之后效率提升的能力。并行计算中的加速比是用并行前的执行速度和并行后的执行速度之比来表示的,它表示了在并行化之后的效率提升情况。阿姆达尔定律是固定负载(计算总量不变时)时的量化标准。可用公式:\frac{W_s + W_p}{W_s + \frac{W_p}{p}}来表示。式中W_s, W_p分别表示问题规模的串行分量(问题中不能并行化的那一部分)和并行分量,p表示处理器数量。当p\to \infty时,上式的极限是\frac{W}{W_s},其中,{W}={W_s}+{W_p}。这意味着无论我们如何增大处理器数目,加速比是无法高于这个数的。

  SparkSQL作为Spark的一个组件,在调优的时候,也要充分考虑到上面的两个原理,既要考虑如何充分的利用硬件资源,又要考虑如何利用好分布式系统的并行计算。由于测试环境条件有限,本篇不能做出更详尽的实验数据来说明,只能在理论上加以说明。
  • 1
  • 2

1:并行性

SparkSQL在集群中运行,将一个查询任务分解成大量的Task分配给集群中的各个节点来运行。通常情况下,Task的数量是大于集群的并行度。比如前面第六章和第七章查询数据时,shuffle的时候使用了缺省的spark.sql.shuffle.partitions,即200个partition,也就是200个Task: 
这里写图片描述 
而实验的集群环境却只能并行3个Task,也就是说同时只能有3个Task保持Running: 
这里写图片描述 
这时大家就应该明白了,要跑完这200个Task就要跑200/3=67批次。如何减少运行的批次呢?那就要尽量提高查询任务的并行度。查询任务的并行度由两方面决定:集群的处理能力和集群的有效处理能力。

  • 对于Spark Standalone集群来说,集群的处理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES参数、SPARK_WORKER_CORES参数决定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超过物理机器的实际CPU core;
  • 集群的有效处理能力是指集群中空闲的集群资源,一般是指使用spark-submit或spark-shell时指定的–total-executor-cores,一般情况下,我们不需要指定,这时候,Spark Standalone集群会将所有空闲的core分配给查询,并且在Task轮询运行过程中,Standalone集群会将其他spark应用程序运行完后空闲出来的core也分配给正在运行中的查询。

2,高效的数据格式

高效的数据格式,一方面是加快了数据的读入速度,另一方面可以减少内存的消耗。高效的数据格式包括多个方面:
  • 1
  • 2

2.1 数据本地性 
分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。 
下面是spark webUI监控Stage的一个图:

  • -PROCESS_LOCAL是指读取缓存在本地节点的数据
  • NODE_LOCAL是指读取本地节点硬盘数据
  • ANY是指读取非本地节点数据
  • 通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关。 
    这里写图片描述

2.2 合适的数据类型 
对于要查询的数据,定义合适的数据类型也是非常有必要。对于一个tinyint可以使用的数据列,不需要为了方便定义成int类型,一个tinyint的数据占用了1个byte,而int占用了4个byte。也就是说,一旦将这数据进行缓存的话,内存的消耗将增加数倍。在SparkSQL里,定义合适的数据类型可以节省有限的内存资源。

2.3 合适的数据列 
对于要查询的数据,在写SQL语句的时候,尽量写出要查询的列名,如Select a,b from tbl,而不是使用Select * from tbl;这样不但可以减少磁盘IO,也减少缓存时消耗的内存。

2.4 更优的数据存储格式 
在查询的时候,最终还是要读取存储在文件系统中的文件。采用更优的数据存储格式,将有利于数据的读取速度。查看sparkSQL的stage,可以发现,很多时候,数据读取消耗占有很大的比重。对于sqlContext来说,支持 textFiile、SequenceFile、ParquetFile、jsonFile;对于hiveContext来说,支持AvroFile、ORCFile、Parquet File,以及各种压缩。根据自己的业务需求,测试并选择合适的数据存储格式将有利于提高sparkSQL的查询效率。

3:内存的使用

spark应用程序最纠结的地方就是内存的使用了,也是最能体现“细节是魔鬼”的地方。Spark的内存配置项有不少,其中比较重要的几个是:

  • SPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,可以充分的利用节点的内存资源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超过节点本身具备的内存容量;
  • executor-memory,在spark-shell或spark-submit提交spark应用程序时申请使用的内存数量;不要超过节点的SPARK_WORKER_MEMORY;
  • spark.storage.memoryFraction spark应用程序在所申请的内存资源中可用于cache的比例
  • spark.shuffle.memoryFraction spark应用程序在所申请的内存资源中可用于shuffle的比例

    在实际使用上,对于后两个参数,可以根据常用查询的内存消耗情况做适当的变更。另外,在SparkSQL使用上,有几点建议:

  • 对于频繁使用的表或查询才进行缓存,对于只使用一次的表不需要缓存;
  • 对于join操作,优先缓存较小的表;
  • 要多注意Stage的监控,多思考如何才能更多的Task使用PROCESS_LOCAL;
  • 要多注意Storage的监控,多思考如何才能Fraction cached的比例更多 
    ## 4:合适的Task ## 
    对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle。
2017-03-08 20:47:42 u013939918 阅读数 1973
  • Spark内核机制解析及性能调优教程(含资料)

    由于Spark基于内存计算的特性,集群的任何资源都可以成为Spark程序的瓶颈:CPU,网络带宽,或者内存。通常,如果内存容得下数据,瓶颈会是网络带宽。不过有时你同样需要做些优化,例如将RDD以序列化到磁盘,来降低内存占用。 本教程通过源码引导读者深入理解Spark的集群部署的内部机制、Spark内部调度的机制、Executor的内部机制、Shuffle的内部机制,进而讲述Tungsten的内部机制,让学员知其然知其所以然。教程的后部分,是任何Spark应用者都很好关注的Spark性能调优的内容。

    22 人正在学习 去看看 张长志
性能调优:


并行度调节


性能调优首先是增加资源,增加Application对应的executor的数量,增加executor里面的cpu core,然后
增加executor里面的内存大小!


这节课也是非常重要的,因为分配完你所能分配的最大资源了!然后对应你的资源调节你程序的并行度!


Spark并行度指的是什么?
Spark作业,Application,Jobs,action(collect)触发一个job,1个job;每个job拆成多个stage,
发生shuffle的时候,会拆分出一个stage,reduceByKey;


stage0
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)


stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()


reduceByKey,stage0的task,在最后,执行到reduceByKey的时候,会为每个stage1的task,
都创建一份文件(也可能是合并在少量的文件里面);每个stage1的task,会去各个节点上的各个
task创建的属于自己的那一份文件里面,拉取数据;每个stage1的task,拉取到的数据,
一定是相同key对应的数据。对相同的key,对应的values,才能去执行我们自定义的function操作(_ + _)




并行度:其实就是指的是,Spark作业中,各个stage的task数量,也就代表了Spark作业的在各个阶段
(stage)的并行度。


如果不调节并行度,导致并行度过低,会怎么样?


假设,现在已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor,
每个executor有10G内存,每个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。


task没有设置,或者设置的很少,比如就设置了,100个task。50个executor,每个executor有3个
cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,
可以并行运行。但是你现在,只有100个task,平均分配一下,每个executor分配到2个task,ok,
那么同时在运行的task,只有100个,每个executor只会并行运行2个task。每个executor剩下的一个
cpu core,就浪费掉了。


你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。
合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,
总共集群有150个cpu core,可以并行运行150个task。那么就应该将你的Application的并行度,
至少设置成150,才能完全有效的利用你的集群资源,让150个task,并行执行;而且task增加到150个以后,
即可以同时并行运行,还可以让每个task要处理的数据量变少;比如总共150G的数据要处理,
如果是100个task,每个task计算1.5G的数据;现在增加到150个task,可以并行运行,
而且每个task主要处理1G的数据就可以。


很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,
并且减少每个task要处理的数据量,最终,就是提升你的整个Spark作业的性能和运行速度。


1、task数量,至少设置成与Spark application的总cpu core数量相同(最理想情况,比如总共150个
cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)


2、官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,比如150个cpu core,
基本要设置task数量为300~500;
实际情况,与理想情况不同的,有些task会运行的快一点,比如50s就完了,有些task,可能会慢一点,
要1分半才运行完,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费,
因为,比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,
就导致了浪费。那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,
另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,
提升性能。


3、如何设置一个Spark Application的并行度?
spark.default.parallelism 
SparkConf conf = new SparkConf()
  .set("spark.default.parallelism", "500")
2019-07-26 09:45:29 yuanbingze 阅读数 1209
  • Spark内核机制解析及性能调优教程(含资料)

    由于Spark基于内存计算的特性,集群的任何资源都可以成为Spark程序的瓶颈:CPU,网络带宽,或者内存。通常,如果内存容得下数据,瓶颈会是网络带宽。不过有时你同样需要做些优化,例如将RDD以序列化到磁盘,来降低内存占用。 本教程通过源码引导读者深入理解Spark的集群部署的内部机制、Spark内部调度的机制、Executor的内部机制、Shuffle的内部机制,进而讲述Tungsten的内部机制,让学员知其然知其所以然。教程的后部分,是任何Spark应用者都很好关注的Spark性能调优的内容。

    22 人正在学习 去看看 张长志

前言
Spark SQL里面有很多的参数,而且这些参数在Spark官网中没有明确的解释,可能是太多了吧,可以通过在spark-sql中使用set -v 命令显示当前spark-sql版本支持的参数。

本文讲解最近关于在参与hive往spark迁移过程中遇到的一些参数相关问题的调优。

内容分为两部分,第一部分讲遇到异常,从而需要通过设置参数来解决的调优;第二部分讲用于提升性能而进行的调优。

异常调优
spark.sql.hive.convertMetastoreParquet
parquet是一种列式存储格式,可以用于spark-sql 和hive 的存储格式。在spark中,如果使用using parquet的形式创建表,则创建的是spark 的DataSource表;而如果使用stored as parquet则创建的是hive表。

spark.sql.hive.convertMetastoreParquet默认设置是true, 它代表使用spark-sql内置的parquet的reader和writer(即进行反序列化和序列化),它具有更好地性能,如果设置为false,则代表使用 Hive的序列化方式。

但是有时候当其设置为true时,会出现使用hive查询表有数据,而使用spark查询为空的情况.

但是,有些情况下在将spark.sql.hive.convertMetastoreParquet设为false,可能发生以下异常(spark-2.3.2)。

java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable
    at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36)

这是因为在其为false时候,是使用hive-metastore使用的元数据进行读取数据,而如果此表是使用spark sql DataSource创建的parquet表,其数据类型可能出现不一致的情况,例如通过metaStore读取到的是IntWritable类型,其创建了一个WritableIntObjectInspector用来解析数据,而实际上value是LongWritable类型,因此出现了类型转换异常。

与该参数相关的一个参数是spark.sql.hive.convertMetastoreParquet.mergeSchema, 如果也是true,那么将会尝试合并各个parquet 文件的schema,以使得产生一个兼容所有parquet文件的schema。

spark.sql.files.ignoreMissingFiles && spark.sql.files.ignoreCorruptFiles
这两个参数是只有在进行spark DataSource 表查询的时候才有效,如果是对hive表进行操作是无效的。

在进行spark DataSource 表查询时候,可能会遇到非分区表中的文件缺失/corrupt 或者分区表分区路径下的文件缺失/corrupt 异常,这时候加这两个参数会忽略这两个异常,这两个参数默认都是false,建议在线上可以都设为true.

其源码逻辑如下,简单描述就是如果遇到FileNotFoundException, 如果设置了ignoreMissingFiles=true则忽略异常,否则抛出异常;如果不是FileNotFoundException 而是IOException(FileNotFoundException的父类)或者RuntimeException,则认为文件损坏,如果设置了ignoreCorruptFiles=true则忽略异常。

catch {
case e: FileNotFoundException if ignoreMissingFiles =>
  logWarning(s"Skipped missing file: $currentFile", e)
  finished = true
  null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
  logWarning(
  s"Skipped the rest of the content in the corrupted file: $currentFile", e)
  finished = true
  null
  }

spark.sql.hive.verifyPartitionPath
上面的两个参数在分区表情况下是针对分区路径存在的情况下,分区路径下面的文件不存在或者损坏的处理。而有另一种情况就是这个分区路径都不存在了。这时候异常信息如下:

java.io.FileNotFoundException: File does not exist: hdfs://hz-cluster10/user/da_haitao/da_hivesrc/haitao_dev_log/integ_browse_app_dt/day=2019-06-25/os=Android/000067_0

而spark.sql.hive.verifyPartitionPath参数默认是false,当设置为true的时候会在获得分区路径时对分区路径是否存在做一个校验,过滤掉不存在的分区路径,这样就会避免上面的错误。

spark.files.ignoreCorruptFiles && spark.files.ignoreMissingFiles
这两个参数和上面的spark.sql.files.ignoreCorruptFiles很像,但是区别是很大的。在spark进行DataSource表查询时候spark.sq.files.*才会生效,而spark如果查询的是一张hive表,其会走HadoopRDD这条执行路线。

所以就会出现,即使你设置了spark.sql.files.ignoreMissingFiles的情况下,仍然报FileNotFoundException的情况,异常栈如下, 可以看到这里面走到了HadoopRDD,而且后面是org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrappe可见是查询一张hive表。

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 107052 in stage 914.0 failed 4 times, most recent failure: Lost task 107052.3 in stage 914.0 (TID 387381, hadoop2698.jd.163.org, executor 266): java.io.FileNotFoundException: File does not exist: hdfs://hz-cluster10/user/da_haitao/da_hivesrc/haitao_dev_log/integ_browse_app_dt/day=2019-06-25/os=Android/000067_0
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
        at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
        at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:371)
        at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.getSplit(ParquetRecordReaderWrapper.java:252)
        at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:99)
        at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
        at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)

此时可以将spark.files.ignoreCorruptFiles && spark.files.ignoreMissingFiles设为true,其代码逻辑和上面的spark.sql.file.*逻辑没明显区别,此处不再赘述。

性能调优

除了遇到异常需要被动调整参数之外,我们还可以主动调整参数从而对性能进行调优。

spark.hadoopRDD.ignoreEmptySplits
默认是false,如果是true,则会忽略那些空的splits,减小task的数量。

spark.hadoop.mapreduce.input.fileinputformat.split.minsize
是用于聚合input的小文件,用于控制每个mapTask的输入文件,防止小文件过多时候,产生太多的task.

spark.sql.autoBroadcastJoinThreshold && spark.sql.broadcastTimeout
用于控制在spark sql中使用BroadcastJoin时候表的大小阈值,适当增大可以让一些表走BroadcastJoin,提升性能,但是如果设置太大又会造成driver内存压力,而broadcastTimeout是用于控制Broadcast的Future的超时时间,默认是300s,可根据需求进行调整。

spark.sql.adaptive.enabled && spark.sql.adaptive.shuffle.targetPostShuffleInputSize
该参数是用于开启spark的自适应执行,这是spark比较老版本的自适应执行,后面的targetPostShuffleInputSize是用于控制之后的shuffle 阶段的平均输入数据大小,防止产生过多的task。

intel大数据团队开发的adaptive-execution相较于目前spark的ae更加实用,该特性也已经加入到社区3.0之后的roadMap中,令人期待。

spark.sql.parquet.mergeSchema
默认false。当设为true,parquet会聚合所有parquet文件的schema,否则是直接读取parquet summary文件,或者在没有parquet summary文件时候随机选择一个文件的schema作为最终的schema。

spark.sql.files.opencostInBytes
该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
1或者2,默认是1. MapReduce-4815 详细介绍了 fileoutputcommitter 的原理,实践中设置了 version=2 的比默认 version=1 的减少了70%以上的 commit 时间,但是1更健壮,能处理一些情况下的异常。

Spark SQL 参数表(spark-2.3.2)

key value meaning
spark.sql.adaptive.enabled TRUE When true, enable adaptive query execution.
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 67108864b The target post-shuffle input size in bytes of a task.
spark.sql.autoBroadcastJoinThreshold 209715200 Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscanhas been run, and file-based data source tables where the statistics are computed directly on the files of data.
spark.sql.broadcastTimeout 300000ms Timeout in seconds for the broadcast wait time in broadcast joins.
spark.sql.cbo.enabled FALSE Enables CBO for estimation of plan statistics when set true.
spark.sql.cbo.joinReorder.dp.star.filter FALSE Applies star-join filter heuristics to cost based join enumeration.
spark.sql.cbo.joinReorder.dp.threshold 12 The maximum number of joined nodes allowed in the dynamic programming algorithm.
spark.sql.cbo.joinReorder.enabled FALSE Enables join reorder in CBO.
spark.sql.cbo.starSchemaDetection FALSE When true, it enables join reordering based on star schema detection.
spark.sql.columnNameOfCorruptRecord _corrupt_record The name of internal column for storing raw/un-parsed JSON and CSV records that fail to parse.
spark.sql.crossJoin.enabled TRUE When false, we will throw an error if a query contains a cartesian product without explicit CROSS JOIN syntax.
spark.sql.execution.arrow.enabled FALSE When true, make use of Apache Arrow for columnar data transfers. Currently available for use with pyspark.sql.DataFrame.toPandas, and pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame. The following data types are unsupported: BinaryType, MapType, ArrayType of TimestampType, and nested StructType.
spark.sql.execution.arrow.maxRecordsPerBatch 10000 When using Apache Arrow, limit the maximum number of records that can be written to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.
spark.sql.extensions Name of the class used to configure Spark Session extensions. The class should implement Function1[SparkSessionExtension, Unit], and must have a no-args constructor.
spark.sql.files.ignoreCorruptFiles FALSE Whether to ignore corrupt files. If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned.
spark.sql.files.ignoreMissingFiles FALSE Whether to ignore missing files. If true, the Spark jobs will continue to run when encountering missing files and the contents that have been read will still be returned.
spark.sql.files.maxPartitionBytes 134217728 The maximum number of bytes to pack into a single partition when reading files.
spark.sql.files.maxRecordsPerFile 0 Maximum number of records to write out to a single file. If this value is zero or negative, there is no limit.
spark.sql.function.concatBinaryAsString FALSE When this option is set to false and all inputs are binary,functions.concat returns an output as binary. Otherwise, it returns as a string.
spark.sql.function.eltOutputAsString FALSE When this option is set to false and all inputs are binary, elt returns an output as binary. Otherwise, it returns as a string.
spark.sql.groupByAliases TRUE When true, aliases in a select list can be used in group by clauses. When false, an analysis exception is thrown in the case.
spark.sql.groupByOrdinal TRUE When true, the ordinal numbers in group by clauses are treated as the position in the select list. When false, the ordinal numbers are ignored.
spark.sql.hive.caseSensitiveInferenceMode INFER_AND_SAVE Sets the action to take when a case-sensitive schema cannot be read from a Hive table’s properties. Although Spark SQL itself is not case-sensitive, Hive compatible file formats such as Parquet are. Spark SQL must use a case-preserving schema when querying any table backed by files containing case-sensitive field names or queries may not return accurate results. Valid options include INFER_AND_SAVE (the default mode– infer the case-sensitive schema from the underlying data files and write it back to the table properties), INFER_ONLY (infer the schema but don’t attempt to write it to the table properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).
spark.sql.hive.convertMetastoreParquet TRUE When set to true, the built-in Parquet reader and writer are used to process parquet tables created by using the HiveQL syntax, instead of Hive serde.
spark.sql.hive.convertMetastoreParquet.mergeSchema FALSE When true, also tries to merge possibly different but compatible Parquet schemas in different Parquet data files. This configuration is only effective when “spark.sql.hive.convertMetastoreParquet” is true.
spark.sql.hive.filesourcePartitionFileCacheSize 262144000 When nonzero, enable caching of partition file metadata in memory. All tables share a cache that can use up to specified num bytes for file metadata. This conf only has an effect when hive filesource partition management is enabled.
spark.sql.hive.manageFilesourcePartitions TRUE When true, enable metastore partition management for file source tables as well. This includes both datasource and converted Hive tables. When partition management is enabled, datasource tables store partition in the Hive metastore, and use the metastore to prune partitions during query planning.
spark.sql.hive.metastore.barrierPrefixes A comma separated list of class prefixes that should explicitly be reloaded for each version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a prefix that typically would be shared (i.e. org.apache.spark.*).
spark.sql.hive.metastore.jars builtin Location of the jars that should be used to instantiate the HiveMetastoreClient. This property can be one of three options: “ 1. “builtin” Use Hive 1.2.1, which is bundled with the Spark assembly when -Phive is enabled. When this option is chosen, spark.sql.hive.metastore.versionmust be either 1.2.1 or not defined. 2. “maven” Use Hive jars of specified version downloaded from Maven repositories. 3. A classpath in the standard format for both Hive and Hadoop.
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc, A comma separated list of class prefixes that should be loaded using the classloader that is shared between Spark SQL and a specific version of Hive. An example of classes that should be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need to be shared are those that interact with classes that are already shared. For example, custom appenders that are used by log4j.
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc
spark.sql.hive.metastore.version 1.2.1 Version of the Hive metastore. Available options are0.12.0 through 2.1.1.
spark.sql.hive.metastorePartitionPruning TRUE When true, some predicates will be pushed down into the Hive metastore so that unmatching partitions can be eliminated earlier. This only affects Hive tables not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and HiveUtils.CONVERT_METASTORE_ORC for more information).
spark.sql.hive.thriftServer.async TRUE When set to true, Hive Thrift server executes SQL queries in an asynchronous way.
spark.sql.hive.thriftServer.singleSession FALSE When set to true, Hive Thrift server is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.
spark.sql.hive.verifyPartitionPath FALSE When true, check all the partition paths under the table’s root directory when reading data stored in HDFS.
spark.sql.hive.version 1.2.1 deprecated, please use spark.sql.hive.metastore.version to get the Hive version in Spark.
spark.sql.inMemoryColumnarStorage.batchSize 10000 Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
spark.sql.inMemoryColumnarStorage.compressed TRUE When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.
spark.sql.inMemoryColumnarStorage.enableVectorizedReader TRUE Enables vectorized reader for columnar caching.
spark.sql.optimizer.metadataOnly TRUE When true, enable the metadata-only query optimization that use the table’s metadata to produce the partition columns instead of table scans. It applies when all the columns scanned are partition columns and the query has an aggregate operator that satisfies distinct semantics.
spark.sql.orc.compression.codec snappy Sets the compression codec used when writing ORC files. If either compression or orc.compress is specified in the table-specific options/properties, the precedence would be compression, orc.compress,spark.sql.orc.compression.codec.Acceptable values include: none, uncompressed, snappy, zlib, lzo.
spark.sql.orc.enableVectorizedReader TRUE Enables vectorized orc decoding.
spark.sql.orc.filterPushdown FALSE When true, enable filter pushdown for ORC files.
spark.sql.orderByOrdinal TRUE When true, the ordinal numbers are treated as the position in the select list. When false, the ordinal numbers in order/sort by clause are ignored.
spark.sql.parquet.binaryAsString FALSE Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
spark.sql.parquet.compression.codec snappy Sets the compression codec used when writing Parquet files. If either compression or parquet.compression is specified in the table-specific options/properties, the precedence would be compression,parquet.compression, spark.sql.parquet.compression.codec. Acceptable values include: none, uncompressed, snappy, gzip, lzo.
spark.sql.parquet.enableVectorizedReader TRUE Enables vectorized parquet decoding.
spark.sql.parquet.filterPushdown TRUE Enables Parquet filter push-down optimization when set to true.
spark.sql.parquet.int64AsTimestampMillis FALSE (Deprecated since Spark 2.3, please set spark.sql.parquet.outputTimestampType.) When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the extended type. In this mode, the microsecond portion of the timestamp value will betruncated.
spark.sql.parquet.int96AsTimestamp TRUE Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
spark.sql.parquet.int96TimestampConversion FALSE This controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different timezone offset than Hive & Spark.
spark.sql.parquet.mergeSchema FALSE When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.
spark.sql.parquet.outputTimestampType INT96 Sets which Parquet timestamp type to use when Spark writes data to Parquet files. INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS is a standard timestamp type in Parquet, which stores number of microseconds from the Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which means Spark has to truncate the microsecond portion of its timestamp value.
spark.sql.parquet.recordLevelFilter.enabled FALSE If true, enables Parquet’s native record-level filtering using the pushed down filters. This configuration only has an effect when ‘spark.sql.parquet.filterPushdown’ is enabled.
spark.sql.parquet.respectSummaryFiles FALSE When true, we make assumption that all part-files of Parquet are consistent with summary files and we will ignore them when merging schema. Otherwise, if this is false, which is the default, we will merge all part-files. This should be considered as expert-only option, and shouldn’t be enabled before knowing what it means exactly.
spark.sql.parquet.writeLegacyFormat FALSE Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior versions, when converting Parquet schema to Spark SQL schema and vice versa.
spark.sql.parser.quotedRegexColumnNames FALSE When true, quoted Identifiers (using backticks) in SELECT statement are interpreted as regular expressions.
spark.sql.pivotMaxValues 10000 When doing a pivot without specifying values for the pivot column this is the maximum number of (distinct) values that will be collected without error.
spark.sql.queryExecutionListeners List of class names implementing QueryExecutionListener that will be automatically added to newly created sessions. The classes should have either a no-arg constructor, or a constructor that expects a SparkConf argument.
spark.sql.redaction.options.regex (?i)url Regex to decide which keys in a Spark SQL command’s options map contain sensitive information. The values of options whose names that match this regex will be redacted in the explain output. This redaction is applied on top of the global redaction configuration defined by spark.redaction.regex.
spark.sql.redaction.string.regex Regex to decide which parts of strings produced by Spark contain sensitive information. When this regex matches a string part, that string part is replaced by a dummy value. This is currently used to redact the output of SQL explain commands. When this conf is not set, the value fromspark.redaction.string.regex is used.
spark.sql.session.timeZone Asia/Shanghai The ID of session local timezone, e.g. “GMT”, “America/Los_Angeles”, etc.
spark.sql.shuffle.partitions 4096 The default number of partitions to use when shuffling data for joins or aggregations.
spark.sql.sources.bucketing.enabled TRUE When false, we will treat bucketed table as normal table
spark.sql.sources.default parquet The default data source to use in input/output.
spark.sql.sources.parallelPartitionDiscovery.threshold 32 The maximum number of paths allowed for listing files at driver side. If the number of detected paths exceeds this value during partition discovery, it tries to list the files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and LibSVM data sources.
spark.sql.sources.partitionColumnTypeInference.enabled TRUE When true, automatically infer the data types for partitioned columns.
spark.sql.sources.partitionOverwriteMode STATIC When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. In static mode, Spark deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. In dynamic mode, Spark doesn’t delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. By default we use static mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn’t affect Hive serde tables, as they are always overwritten with dynamic mode.
spark.sql.statistics.fallBackToHdfs TRUE If the table statistics are not available from table metadata enable fall back to hdfs. This is useful in determining if a table is small enough to use auto broadcast joins.
spark.sql.statistics.histogram.enabled FALSE Generates histograms when computing column statistics if enabled. Histograms can provide better estimation accuracy. Currently, Spark only supports equi-height histogram. Note that collecting histograms takes extra cost. For example, collecting column statistics usually takes only one table scan, but generating equi-height histogram will cause an extra table scan.
spark.sql.statistics.size.autoUpdate.enabled FALSE Enables automatic update for table size once table’s data is changed. Note that if the total number of files of the table is very large, this can be expensive and slow down data change commands.
spark.sql.streaming.checkpointLocation The default location for storing checkpoint data for streaming queries.
spark.sql.streaming.metricsEnabled FALSE Whether Dropwizard/Codahale metrics will be reported for active streaming queries.
spark.sql.streaming.numRecentProgressUpdates 100 The number of progress updates to retain for a streaming query
spark.sql.thriftserver.scheduler.pool Set a Fair Scheduler pool for a JDBC client session.
spark.sql.thriftserver.ui.retainedSessions 200 The number of SQL client sessions kept in the JDBC/ODBC web UI history.
spark.sql.thriftserver.ui.retainedStatements 200 The number of SQL statements kept in the JDBC/ODBC web UI history.
spark.sql.ui.retainedExecutions 1000 Number of executions to retain in the Spark UI.
spark.sql.variable.substitute TRUE This enables substitution using syntax like ${var} ${system:var} and ${env:var}.
spark.sql.warehouse.dir /user/warehouse The default location for managed databases and tables.
2019-11-30 09:39:21 weidajiangjiang 阅读数 10
  • Spark内核机制解析及性能调优教程(含资料)

    由于Spark基于内存计算的特性,集群的任何资源都可以成为Spark程序的瓶颈:CPU,网络带宽,或者内存。通常,如果内存容得下数据,瓶颈会是网络带宽。不过有时你同样需要做些优化,例如将RDD以序列化到磁盘,来降低内存占用。 本教程通过源码引导读者深入理解Spark的集群部署的内部机制、Spark内部调度的机制、Executor的内部机制、Shuffle的内部机制,进而讲述Tungsten的内部机制,让学员知其然知其所以然。教程的后部分,是任何Spark应用者都很好关注的Spark性能调优的内容。

    22 人正在学习 去看看 张长志

常规性能调优中我们讲解了并行度的调节策略,但是,并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage生效。
Spark SQL的并行度不允许用户自己指定,Spark SQL自己会默认根据hive表对应的HDFS文件的split个数自动设置Spark SQL所在的那个stage的并行度,用户自己通spark.default.parallelism参数指定的并行度,只会在没Spark SQL的stage中生效。
由于Spark SQL所在stage的并行度无法手动设置,如果数据量较大,并且此stage中后续的transformation操作有着复杂的业务逻辑,而Spark SQL自动设置的task数量很少,这就意味着每个task要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有Spark SQL的stage速度很慢,而后续的没有Spark SQL的stage运行速度非常快。
为了解决Spark SQL无法设置并行度和task数量的问题,我们可以使用repartition算子。

在这里插入图片描述图2-7 repartition算子使用前后对比图
Spark SQL这一步的并行度和task数量肯定是没有办法去改变了,但是,对于Spark SQL查询出来的RDD,立即使用repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再设计Spark SQL,因此stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。使用repartition算子的前后对比如图2-7所示。

2019-01-12 15:30:46 qq_32297447 阅读数 9279
  • Spark内核机制解析及性能调优教程(含资料)

    由于Spark基于内存计算的特性,集群的任何资源都可以成为Spark程序的瓶颈:CPU,网络带宽,或者内存。通常,如果内存容得下数据,瓶颈会是网络带宽。不过有时你同样需要做些优化,例如将RDD以序列化到磁盘,来降低内存占用。 本教程通过源码引导读者深入理解Spark的集群部署的内部机制、Spark内部调度的机制、Executor的内部机制、Shuffle的内部机制,进而讲述Tungsten的内部机制,让学员知其然知其所以然。教程的后部分,是任何Spark应用者都很好关注的Spark性能调优的内容。

    22 人正在学习 去看看 张长志

对于某些工作负载,可以通过在内存中缓存数据或打开一些实验选项来提高性能。

在内存中缓存数据

Spark SQL可以通过调用spark.catalog.cacheTable(“tableName”)或使用内存中的列式格式来缓存表.dataFrame.cache()。然后,Spark SQL将仅扫描所需的列,并自动调整压缩以最小化内存使用和GC压力。可以调用spark.catalog.uncacheTable(“tableName”)从内存中删除表。

  • spark.sql.inMemoryColumnarStorage.compressed 设置为true时,Spark SQL将根据数据统计信息自动为每列选择压缩编解码器。
  • spark.sql.inMemoryColumnarStorage.batchSize 10000 控制柱状缓存的批次大小。较大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时会产生OOM风险。

其他配置选项

以下选项也可用于调整查询执行的性能。由于会自动执行更多优化,因此可能会在将来的版本中弃用这些选项。
物业名称 默认 含义

  • spark.sql.files.maxPartitionBytes 134217728(128 MB) 读取文件时打包到单个分区的最大字节数。
  • spark.sql.files.openCostInBytes 4194304(4 MB) 可以在同一时间扫描通过字节数测量的打开文件的估计成本。将多个文件放入分区时使用。最好过度估计,然后使用较小文件的分区将比具有较大文件的分区(首先安排的分区)更快。
  • spark.sql.broadcastTimeout 300 广播连接中广播等待时间的超时(以秒为单位)
  • spark.sql.autoBroadcastJoinThreshold 10485760(10 MB) 配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为-1,可以禁用广播。请注意,目前只有ANALYZE TABLE COMPUTE STATISTICS noscan运行该命令的Hive Metastore表支持统计信息 。
  • spark.sql.shuffle.partitions 200 配置为连接或聚合数据移动数据时要使用的分区数。

Spark Sql性能测试

阅读数 427

没有更多推荐了,返回首页