sparksql优化

2019-01-14 18:40:06 zxl333 阅读数 1439

最近一直由于公司一个重要的作业,从Tez切换到sparksql,需要对sparksql进行优化。这个表都是left join,慢就慢在join阶段

Tez之前根据优化参数,执行时间在7分钟到12分钟之间浮动,sparksql进行一些参数优化,一直在17到24分钟浮动,效率太低。最后查看sparksql的执行时的shuffle阶段发现,每个表参与的shuffle数据量相差很大,最大的612GB,最小的2.7GB,然而2.7GB的表却不能进行广播,内存也不可能那么大,我们配置的是200M表进行广播。最后根据shuffle表的数据大小调整join顺序,把最小的表放在最前面join,最大的表放在最后join,性能立即提升50%,直接到10分钟左右。下面针对一些优化中使用的参数进行说明:

设置动态资源分配

spark.dynamicAllocation.enabled  是否开启动态资源配置,根据工作负载来衡量是否应该增加或减少executor,默认false
spark.shuffle.service.enabled  启用外部随机shuffle服务。此服务保留执行者编写的无序处理文件,以便安全地删除执行者spark.dynamicAllocation.enabled设置为true了,必须设置这个参数也为true,默认是false
spark.dynamicAllocation.executorIdleTimeout  当某个executor空闲超过这个设定值,就会被kill,默认60s
spark.dynamicAllocation.schedulerBacklogTimeout  这个参数的默认值是1秒,即当任务调度延迟超过1秒的时候,会请求增加executor,而且是指数形式的请求
spark.dynamicAllocation.maxExecutors  动态分配最大executor个数,默认infinity
spark.dynamicAllocation.initialExecutors  动态分配初始executor个数默认值=spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.cachedExecutorIdleTimeout  当某个缓存数据的executor空闲时间超过这个设定值,就会被kill,默认infinity

spark.executor.memory 设置每个executor多少内存
spark.executor.cores   设置每个executor的cpu数量

spark.executor.memoryOverhead  设置executor申请堆外内存大小,默认是executor的10%
spark.memory.fraction                                   0.7
spark.shuffle.file.buffer  设置shuffle时的buffer大小默认是32k
spark.memory.offHeap.enabled  是否可以设置堆外内存,默认是false
spark.memory.offHeap.size  申请堆外内存大小,他的设置不会影响堆内存大小,但是会受机器的整个内存大小影响,这个不能设置太大,我这测试1024mb这个值刚好
spark.sql.autoBroadcastJoinThreshold  根据表大小是否进行广播,默认10M,测试发现按照分区获取数据,小于10M,仍然不会进行广播,所以设置稍晚大点,我们的是209715200(200M)
spark.sql.statistics.fallBackToHdfs 广播时是否下推到hdfs获取数据大小,默认是false,false时广播时是从metastore获取的大小

spark.sql.join.preferSortMergeJoin 开启尝试使用hash join的开关,默认是false,使用的是sort merge join,当前SparkSQL支持三种Join算法:shuffle hash join、broadcast hash join以及sort merge join
spark.reducer.maxSizeInFlight 该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据,默认是48m
spark.sql.inMemoryColumnarStorage.batchSize  默认值是10000
spark.sql.orc.filterPushdown 默认值是false

之前一直增加executor的数量和cpu,但是执行性能没有提升,查看sparksql页面发现task的并行度没有上去,默认并行度是200,sparksql需要同时设置以下两个参数才生效,设置为executor数量*cpu核数的2~3倍

spark.sql.shuffle.partitions=2700   (默认值200)并行度太低就没有充分利用cpu和executor
spark.default.parallelism=2700

以下是我们线查看每个表shuffle时数据量的大小,A表是主表,然后按顺序left join后面的所有表,我们这个join没有数据倾斜,这个对相关参数进行优化后执行一直维持17~24分钟

db1.T_IFS_PS_CUSTS_ATTRIB A 61.5G
db1.T_IFS_CUSTS_ATTR_F B 70.4GB
db2.FS_CUSTS_LEVEL C 42.3GB
db2.FS_CUSTS_BAL_SUM D 612GB
db2.FS_CUSTS_PRD_TRADE_VALUE E 38.3GB 
db2.FS_CUSTS_BAL_SUM F 10.7GB
db2.FS_ORANGE_TRADE_SUM H 2.7GB 
tmp_FS_CUSTS_PROD_FLAG I 38.3GB 
db3.B_IFS_CUSTS_CARD_LEVEL_INFO J 5.8GB
db2.FS_CUSTS_BAL_SUM K 7.6GB
db2.FS_CUSTS_ASSET_LEVEL_M L 7.5GB
db2.FS_CUSTS_YH_ASSET_LEVEL_M M  13.1GB
db2.FS_CUSTS_BAL_SUM_HIS N  16.8GB 
db2.FS_DF_CUSTS_TRANS_SUM O 7.5GB

后来直接把db2.F_CUST_BAL_SUM这个大表放到最后进行join,直接10分钟就执行完成了。为什么调整顺序之后就提升了50%性能,我的理解是这个放到最后执行,就不会一直参与计算,之后最后才进行参与计算,耗费的资源就很少。没有调整顺序时最短的task执行1.4min,最长的5.7min,浮动很大,调整顺序后最短的1min,最长的2.4min。

sparksql优化在路上,还需继续努力研究原理。

2018-07-30 21:41:09 jy02268879 阅读数 7069

一、代码优化

1.在数据统计的时候选择高性能算子。

例如Dataframe使用foreachPartitions将数据写入数据库,不要每个record都去拿一次数据库连接。通常写法是每个partition拿一次数据库连接。

      /**
        * 将统计结果写入MySQL中
        * 代码优化:
        * 在进行数据库操作的时候,不要每个record都去操作一次数据库
        * 通常写法是每个partition操作一次数据库
        **/
      try {
        videoLogTopNDF.foreachPartition(partitionOfRecords => {
          val list = new ListBuffer[DayVideoAccessStat]
          partitionOfRecords.foreach(info => {
            val day = info.getAs[String]("day")
            val cmsId = info.getAs[Long]("cmsId")
            val times = info.getAs[Long]("times")

            list.append(DayVideoAccessStat(day, cmsId, times))
          })
          StatDao.insertDayVideoTopN(list)
        })
      }catch{
        case e:Exception =>e.printStackTrace()
      }

2.写数据库的时候,关闭自动提交,不要每条提交一次,自己手动每个批次提交一次。

 var connection:Connection = null
    var pstmt : PreparedStatement = null
    try{
      connection = MySQLUtils.getConnection()

      connection.setAutoCommit(false)//关闭自动提交
      val sql = "insert into day_video_access_topn_stat(day,cms_id,times) values(?,?,?)"
      pstmt = connection.prepareStatement(sql)

      for(ele <- list){
        pstmt.setString(1,ele.day)
        pstmt.setLong(2,ele.cmsId)
        pstmt.setLong(3,ele.times)
        //加入到批次中,后续再执行批量处理 这样性能会好很多
        pstmt.addBatch()
      }
      //执行批量处理
      pstmt.executeBatch()

      connection.commit() //手工提交

    }catch {
      case e :Exception =>e.printStackTrace()
    }finally {
      MySQLUtils.release(connection,pstmt)
    }

3.复用已有的数据。

三个统计方法都是只要当天的视频数据,所以在调用方法前过滤出当天视频数据,缓存到内存中。

然后传到三个统计方法中使用。

不要在每个统计方法都去做一次相同的过滤。

val logDF = spark.read.format("parquet")
      .load("file:///F:\\mc\\SparkSQL\\data\\afterclean")
    val day = "20170511"


    /**
      * 代码优化:复用已有数据
      * 既然每次统计都是统计的当天的视频,
      * 先把该数据拿出来,然后直接传到每个具体的统计方法中
      * 不要在每个具体的统计方法中都执行一次同样的过滤
      *
      * 用$列名得到列值,需要隐式转换 import spark.implicits._
      * */
    import spark.implicits._
    val dayVideoDF = logDF.filter($"day" ===day&&$"cmsType"==="video")
    /**
      * 将这个在后文中会复用多次的dataframe缓存到内存中
      * 这样后文在复用的时候会快很多
      *
      * default storage level (`MEMORY_AND_DISK`).
      * */
    dayVideoDF.cache()

    //logDF.printSchema()
    //logDF.show()

    StatDao.deletaDataByDay(day)

    //统计每天最受欢迎(访问次数)的TopN视频产品
    videoAccessTopNStatDFAPI(spark,dayVideoDF)

    //按照地势统计每天最受欢迎(访问次数)TopN视频产品 每个地市只要最后欢迎的前三个
    cityAccessTopNStat(spark,dayVideoDF)

    //统计每天最受欢迎(流量)TopN视频产品
    videoTrafficsTopNStat(spark,dayVideoDF)

    //清除缓存
    dayVideoDF.unpersist(true)

二、集群存储格式选择

    列式/行式存储简介

这里建议选择用parquet格式,之前公司中用的也是这种格式。

三、集群压缩格式选择

Hadoop压缩实现分析

Spark中选择用哪个方式压缩文件

SparkSession.builder().config("spark.sql.parquet.compression.codec",snappy).getOrCreate()默认是snappy。

四、参数优化

1.并行度:spark.sql.shuffle.partitions

一个partitions相当于一个task。这是配置当shuffle数据去join或者聚合的时候的partitions的数量。200一般情况下在生产上是不够的,需要做相应的调整。

调整并行度的方式

bin/spark-submit --class XXX.XXX.XX --name XXX --master local[2] --conf spark.sql.shuffle.partitions=230 XXX.jar

2.不必要的情况下,关闭分区字段类型自动推导

2019-09-29 11:17:59 qq_29329981 阅读数 73
shuffle缓冲区大小
set spark.shuffle.file.buffer=128k
合并小文件
set spark.shuffle.consolidateFiles=true
哈希shuffle
set spark.shuffle.manager=hash
shuffle内存占比
set spark.shuffle.memoryFraction=0.5
序列化
set spark.serializer=org.apache.spark.serializer.KryoSerialization
分区数
set spark.sql.shuffle.partitions=100
本地化
set spark.locality.wait=6000
。。。不知道  嘤嘤嘤
set spark.driver.maxResultSize=512m
2018-01-06 15:23:07 weixin_37136725 阅读数 10463

SparkSQL总体流程介绍

在阐述Join实现之前,我们首先简单介绍SparkSQL的总体流程,一般地,我们有两种方式使用SparkSQL,一种是直接写sql语句,这个需要有元数据库支持,例如Hive等,另一种是通过Dataset/DataFrame编写Spark应用程序。如下图所示,sql语句被语法解析(SQL AST)成查询计划,或者我们通过Dataset/DataFrame提供的APIs组织成查询计划,查询计划分为两大类:逻辑计划和物理计划,这个阶段通常叫做逻辑计划,经过语法分析(Analyzer)、一系列查询优化(Optimizer)后得到优化后的逻辑计划,最后被映射成物理计划,转换成RDD执行。

更多关于SparkSQL的解析与执行请参考文章【sql的解析与执行】。对于语法解析、语法分析以及查询优化,本文不做详细阐述,本文重点介绍Join的物理执行过程。

Join基本要素

如下图所示,Join大致包括三个要素:Join方式、Join条件以及过滤条件。其中过滤条件也可以通过AND语句放在Join条件中。

Spark支持所有类型的Join,包括:

  • inner join
  • left outer join
  • right outer join
  • full outer join
  • left semi join
  • left anti join

下面分别阐述这几种Join的实现。

Join基本实现流程

总体上来说,Join的基本实现流程如下图所示,Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。

在实际计算时,spark会基于streamIter来遍历,每次取出streamIter中的一条记录rowA,根据Join条件计算keyA,然后根据该keyA去buildIter中查找所有满足Join条件(keyB==keyA)的记录rowBs,并将rowBs中每条记录分别与rowAjoin得到join后的记录,最后根据过滤条件得到最终join的记录。

从上述计算过程中不难发现,对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。

sort merge join实现

要让两条记录能join到一起,首先需要将具有相同key的记录在同一个分区,所以通常来说,需要做一次shuffle,map阶段根据join条件确定每条记录的key,基于该key做shuffle write,将可能join到一起的记录分到同一个分区中,这样在shuffle read阶段就可以将两个表中具有相同key的记录拉到同一个分区处理。前面我们也提到,对于buildIter一定要是查找性能较优的数据结构,通常我们能想到hash表,但是对于一张较大的表来说,不可能将所有记录全部放到hash表中,另外也可以对buildIter先排序,查找时按顺序查找,查找代价也是可以接受的,我们知道,spark shuffle阶段天然就支持排序,这个是非常好实现的,下面是sort merge join示意图。

在shuffle read阶段,分别对streamIter和buildIter进行merge sort,在遍历streamIter时,对于每条记录,都采用顺序查找的方式从buildIter查找对应的记录,由于两个表都是排序的,每次处理完streamIter的一条记录后,对于streamIter的下一条记录,只需从buildIter中上一次查找结束的位置开始查找,所以说每次在buildIter中查找不必重头开始,整体上来说,查找性能还是较优的。

broadcast join实现

为了能具有相同key的记录分到同一个分区,我们通常是做shuffle,那么如果buildIter是一个非常小的表,那么其实就没有必要大动干戈做shuffle了,直接将buildIter广播到每个计算节点,然后将buildIter放到hash表中,如下图所示。

从上图可以看到,不用做shuffle,可以直接在一个map中完成,通常这种join也称之为map join。那么问题来了,什么时候会用broadcast join实现呢?这个不用我们担心,spark sql自动帮我们完成,当buildIter的估计大小不超过参数spark.sql.autoBroadcastJoinThreshold设定的值(默认10M),那么就会自动采用broadcast join,否则采用sort merge join。

hash join实现

除了上面两种join实现方式外,spark还提供了hash join实现方式,在shuffle read阶段不对记录排序,反正来自两格表的具有相同key的记录会在同一个分区,只是在分区内不排序,将来自buildIter的记录放到hash表中,以便查找,如下图所示。

不难发现,要将来自buildIter的记录放到hash表中,那么每个分区来自buildIter的记录不能太大,否则就存不下,默认情况下hash join的实现是关闭状态,如果要使用hash join,必须满足以下四个条件:

  • buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件
  • 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
  • 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中
  • streamIter的大小是buildIter三倍以上

所以说,使用hash join的条件其实是很苛刻的,在大多数实际场景中,即使能使用hash join,但是使用sort merge join也不会比hash join差很多,所以尽量使用hash

下面我们分别阐述不同Join方式的实现流程。

inner join

inner join是一定要找到左右表中满足join条件的记录,我们在写sql语句或者使用DataFrmae时,可以不用关心哪个是左表,哪个是右表,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。其基本实现流程如下图所示,在查找阶段,如果右表不存在满足join条件的记录,则跳过。

left outer join

left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。我们在写sql语句或者使用DataFrmae时,一般让大表在左边,小表在右边。其基本实现流程如下图所示。

right outer join

right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。所以说,右表是streamIter,左表是buildIter,我们在写sql语句或者使用DataFrmae时,一般让大表在右边,小表在左边。其基本实现流程如下图所示。

full outer join

full outer join相对来说要复杂一点,总体上来看既要做left outer join,又要做right outer join,但是又不能简单地先left outer join,再right outer join,最后union得到最终结果,因为这样最终结果中就存在两份inner join的结果了。因为既然完成left outer join又要完成right outer join,所以full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter,其基本实现流程如下图所示。

由于左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较key,如果key相等,则joinrowA和rowB,并将rowA和rowB分别更新到左表和右表的下一条记录;如果keyA<keyB,则说明右表中没有与左表rowA对应的记录,那么joinrowA与nullRow,紧接着,rowA更新到左表的下一条记录;如果keyA>keyB,则说明左表中没有与右表rowB对应的记录,那么joinnullRow与rowB,紧接着,rowB更新到右表的下一条记录。如此循环遍历直到左表和右表的记录全部处理完。

left semi join

left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回null,其基本实现流程如下图所示。

left anti join

left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录,其基本实现流程如下图所示。

总结

Join是数据库查询中一个非常重要的语法特性,在数据库领域可以说是“得join者的天下”,SparkSQL作为一种分布式数据仓库系统,给我们提供了全面的join支持,并在内部实现上无声无息地做了很多优化,了解join的实现将有助于我们更深刻的了解我们的应用程序的运行轨迹。

2017-11-12 10:30:12 zhanglh046 阅读数 5410

一 设置shuffle的并行度

我们可以通过属性spark.sql.shuffle.partitions设置shuffle并行度

 

二 Hive数据仓库建设的时候,合理设置数据类型,比如你设置成INT的就不要设置成BIGINT,减少数据类型不必要的内存开销

 

三 SQL优化

 

四 并行的处理查询结果

对于Spark SQL查询的结果,如果数据量比较大,比如超过1000条,那么就不要使用collect到driver再处理,使用foreach算子并行处理查询结果

 

五 缓存表

对于一条SQL语句中可能多次使用到的表,可以对其进行缓存,使用SQLContext.cacheTable(tableName)或者DataFrame.cache即可。Spark SQL会用内存 列存储的格式进行表的缓存。然后SparkSQL就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销。可以通过spark.sql.inMemoryColumnarStorage.batchSize这个参数,默认10000,配置列存储单位

 

六 广播JOIN表

spark.sql.autoBroadcastJoinThreshold,默认10485760(10M),在内存够用的情况下,提高其大小,可以将join中的较小的表广播出去,而不用进行网络数据传输