精华内容
下载资源
问答
  • spark.sql.shuffle.partitions则是对sparks SQL专用的设置 方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read t...

    spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效。
    spark.sql.shuffle.partitions则是对sparks SQL专用的设置
    在这里插入图片描述

    方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

      方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。
    
      方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。
    
      方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。
    
      方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。
    
    展开全文
  • 2.spark.sql.shuffle.partitions对sparksql中的joins和aggregations有效,但其他的无效(对这种情况下,上述的两种配置都无效,我们应该怎么办呢?看第三点) 3.我们可以使用repartition算子对dataframe进行重分区。...

    在这里插入图片描述
    stack overflow链接
    在这里插入图片描述

    总结:

    • 1.spark.default.parallelism只对RDD有效,对sparksql(DataFrame、DataSet)无效
    • 2.spark.sql.shuffle.partitions对sparksql中的joins和aggregations有效,但其他的无效(对这种情况下,上述的两种配置都无效,我们应该怎么办呢?看第三点)
    • 3.我们可以使用repartition算子对dataframe进行重分区。
    展开全文
  • 谈谈spark.sql.shuffle.partitionsspark.default.parallelism 的区别及spark并行度的理解spark.sql.shuffle.partitionsspark.default.parallelism 的区别spark并行度的理解如何设置spark.sql.shuffle....

    spark.sql.shuffle.partitions和 spark.default.parallelism 的区别

    首先两者最直观的区别:
    spark.default.parallelism只有在处理RDD时有效.
    spark.sql.shuffle.partitions则是只对SparkSQL有效.
    看一下官网给出的两者定义:
    在这里插入图片描述
    以我的四级水准乱翻译一下:
     spark.sql.shuffle.partitions: 设置的是 RDD1做shuffle处理后生成的结果RDD2的分区数.
      默认值: 200
     spark.default.parallelism: 设置的是 RDD1做shuffle处理/并行处理(窄依赖算子)后生成的结果RDD2的分区数
      默认值:
       对于分布式的shuffle算子, 默认值使用了结果RDD2所依赖的所有父RDD中分区数最大的, 作为自己的分区数.
       对于并行处理算子(窄依赖的), 有父依赖的, 结果RDD分区数=父RDD分区数, 没有父依赖的看集群配置:
        Local mode:给定的core个数
        Mesos fine grained mode: 8
        Others: max(RDD分区数为总core数, 2)

    spark并行度的理解

     并行度其实就是指的是spark作业中, 各个stage的taskset中的task的数量, 代表了spark作业中各个阶段的并行度, 而taskset中的task数量 = task任务的父RDD中分区数

    如何设置spark.sql.shuffle.partitions和spark.default.parallelism的值

    官网建议: 设置为当前spark job的总core数量的2~3倍. 理由如下:
     背景: spark作业是 1 core 1 task的
     假设我们给当前Spark job 设置总Core数为 100, 那么依据1 core 1 task, 当前spark集群中最多并行运行100task任务, 那么通过设置上述两个参数为100, 使得我们结果RDD的分区数为100, 一个分区 1task 1core, 完美! 但是实际生产中会有这样的情况, 100个task中有些task的处理速度快, 有些处理慢, 假设有20个task很快就处理完毕了, 此时就会出现 我们集群中有20个core处理闲置状态, 不符合spark官网所说的最大化压榨集群能力.
     而如果我们设置上述参数值为199, 此时的现象: 虽然集群能并行处理199个task, 奈何总core只有100, 所以会出现有99个task处于等待处理的情况. 处理较快的那20task闲置下来的20个core就可以接着运行99个中的20个task, 这样就最大化spark集群的计算能力

    展开全文
  • 当不跟随父对象partition数目的shuffle过程发生后,结果的partition会发生改变,这两个...spark.sql.shuffle.partitions 作用于dataframe(val df2=df1.shuffle算子(如df1.orderBy()),的df2的partition就是这个参...

    当不跟随父对象partition数目的shuffle过程发生后,结果的partition会发生改变,这两个参数就是控制这类shuffle过程后,返回对象的partition的

    经过实测,得到结论:

    spark.sql.shuffle.partitions 作用于dataframe(val df2=df1.shuffle算子(如df1.orderBy()),的df2的partition就是这个参数的值)

    spark.default.parallelism 作用于rdd(val rdd2=rdd1.shuffle算子(如rdd1.reduceByKey()),的rdd2的partition就是这个参数的值)

    如何查看操作是否有shuffle?善用rdd的toDebugString函数,详见Spark中的shuffle算子

    df也可以先df.rdd.toDebugString查看是否有shuffle发生

    修改方法:

    代码中设定:

    sqlContext.setConf("spark.sql.shuffle.partitions", "500")
    sqlContext.setConf("spark.default.parallelism", "500")
    

    提交任务时设定:

    ./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500

    官方说明和默认值:

    spark.default.parallelismFor distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:
    • Local mode: number of cores on the local machine
    • Mesos fine grained mode: 8
    • Others: total number of cores on all executor nodes or 2, whichever is larger
    Default number of partitions in RDDs returned by transformations like joinreduceByKey, and parallelize when not set by user.
    spark.sql.shuffle.partitions200(default)Configures the number of partitions to use when shuffling data for joins or aggregations.

     

    跟随父对象partition数目的shuffle?比如df的join,df1.join(df2) 返回partition数目根据df1定




    参考资料:

    https://spark.apache.org/docs/2.1.0/configuration.html

    https://spark.apache.org/docs/latest/sql-performance-tuning.html

    https://www.jianshu.com/p/7442deb21ae0

    https://stackoverflow.com/questions/45704156/what-is-the-difference-between-spark-sql-shuffle-partitions-and-spark-default-pa 

    展开全文
  • 遇到一个常见sql使用场景 insert overwrite table xxx(date='${hiveconf:date}') select * from table1 union all select * from table2 观察发现任务耗时慢,大概10多分钟,其中主要耗时花在合并小...
  • 在关于spark任务并行度的设置中,有两个参数我们会经常遇到,spark.sql.shuffle.partitionsspark.default.parallelism, 那么这两个参数到底有什么区别的? 首先,让我们来看下它们的定义 Property Name ...
  • 见:https://stackoverflow.com/questions/45704156/what-is-the-difference-between-spark-sql-shuffle-partitions-and-spark-default-pa
  • Spark SQL 中org.apache.spark.sql.functions归纳 注意,这里使用的是scala 2.12.12,spark版本是最新的3.0.1版本 1. Sort functions /** * Returns a sort expression based on ascending order of the column. *...
  • 先看我们的spark-sql版本: [hadoop@666 ~]$ spark-sql --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/ Using Scala version ...
  • 今天尝试了一个新的算子 repartitionAndSortWithinPartitions , 遇到了一个问题。 具体异常报错如下: ... at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMe...
  • http://spark.apache.org/docs/2.4.4/sql-performance-tuning.html 今天使用spark对一组大数据进行合并作join操作,一直都报下面的错: Exception in thread “broadcast-exchange-0” java.lang....
  • spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregations. spark.default.parallelism is the default number of...
  • 今天使用spark对一组大数据进行合并作join操作,一直都报下面的错: Exception in thread “broadcast-exchange-0” java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all ...
  • 报错提示报错1: FetchFailed(BlockManagerId(846, xxx.hadoop.com, 7337, None), shuffleId=262, mapId=96, reduceId=122, message=org.apache.spark.shuffle.FetchFailedException: Failed to conn...
  • 因为这样,RDD 转化为 DataFrame 去执行SQL , 在做 JOIN 和 Shuffle 的时候会自动使用 spark.sql.shuffle.partitions 的分区数。 spark.sql.shuffle.partitions 200 用于配置 join 或aggregate混洗(shuffle...
  • spark-SQL跑任务报错 错误信息如下 19/10/17 18:06:50 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_e122_1568025476000_38356_01_000022 on host: node02. Exit ...
  • org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 13 原因 shuffle分为shuffle write和shuffle read两部分。 shuffle write的分区数由上一阶段的RDD分区数控制...
  • Spark性能调优-Shuffle调优及故障排除篇

    千次阅读 多人点赞 2021-03-26 11:42:52
    Spark调优之Shuffle调优 本节开始先讲解Shuffle核心概念;然后针对HashShuffle、SortShuffle进行调优;接下来对map端、reduce端调优;再针对Spark中的数据倾斜问题进行剖析及调优;最后是Spark运行过程中的故障排除...
  • SparkShuffle参数调优解析

    千次阅读 2020-07-07 10:13:04
    在分布式系统中,数据分布在不同的节点上,每个节点计算...下面是spark2.2.0版本的shuffle的属性表,http://spark.apache.org/docs/2.2.0/configuration.html 一、Shuffle 参数 Property Name Default Meaning
  • Spark Shuffle参数调优的原理与建议

    千次阅读 2020-04-30 17:07:19
    文章目录Shuffle对性能消耗的原理详解Spark Shuffle过程中影响性能的操作:Spark 压缩算法的比较如何调优Spark配置参数的源码详解(Spark2.3)spark.shuffle.managerspark.reducer.maxReqsInFlight与spark.reducer....
  • spark_sql 参数调优

    2019-07-26 10:58:17
    spark Sql 参数调优 目录 前言 异常调优 spark.sql.hive.convertMetastoreParquet spark.sql.files.ignoreMissingFiles && spark.sql.files.ignoreCorruptFiles spark.sql.hive.verifyPartitionPath...
  • Spark调优 | Spark SQL参数调优

    万次阅读 2019-07-26 09:45:29
    Spark SQL里面有很多的参数,而且这些参数在Spark官网中没有明确的解释,可能是太多了吧,可以通过在spark-sql中使用set -v 命令显示当前spark-sql版本支持的参数。 本文讲解最近关于在参与hive往spark迁移过程中...
  • [2021-09-30 08:22:01,451] {ssh.py:141} INFO - 21/09/30 16:22:01 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.AnalysisException: org.apache....
  • Spark SQL 自适应执行优化引擎

    千次阅读 2020-02-22 17:13:21
    在本篇文章中,笔者将给大家带来 Spark SQL 中关于自适应执行引擎(Spark Adaptive Execution)的内容。在之前的文章中,笔者介绍过 Flink SQL,目前 ...
  • 1.1 Spark Shuffle 原理 Spark Shuffle 一般用于将上游 Stage 中的数据按 Key 分区,保证来自不同 Mapper (表示上游 Stage 的 Task)的相同的 Key 进入相同的 Reducer (表示下游 Stage 的 Task)。一般用于 group ...
  • 首先,还是抛出官网文档吧 参考2.0版本 http://spark.apache.org/docs/2.0.2/tuning.html#level-of-parallelism

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,213
精华内容 1,685
关键字:

spark.sql.shuffle.partitions