精华内容
下载资源
问答
  • sparkshuffle原理
    2021-04-08 18:46:55

    Shuffle详解

    以Shuffle为边界,Spark将一个Job划分为不同的Stage。Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步

    Spark 的 Stage 分为两种

    • ResultStage。负责返回计算结果
    • ShuffleMapStage。其他的均为ShuffleMapStage

    如果按照 map 端和 reduce 端来分析的话:

    • ShuffleMapStage可以即是map端任务,又是reduce端任务
    • ResultStage只能充当reduce端任务

    Spark Shuffle的流程简单抽象为以下几步:

    • Shuffle Write

    • Map side combine (if needed)

    • Write to local output file

    • Shuffle Read

    • Block fetch

    • Reduce side combine

    • Sort (if needed)

    • Shuffle涉及到了本地磁盘(非hdfs)的读写和网络的传输类的磁盘IO以及序列化等耗时操作。

    Spark的Shuffle经历了Hash、Sort、Tungsten-Sort(堆外内存)三阶段发展历程:

    • Spark 0.8及以前 Hash Based Shuffle
    • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
    • Spark 0.9 引入ExternalAppendOnlyMap
    • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
    • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
    • Spark 1.4 引入Tungsten-Sort Based Shuffle
    • Spark 1.6 Tungsten-sort并入Sort Based Shuffle
    • Spark 2.0 Hash Based Shuffle退出历史舞台

    Shuffle的发展历程简述

    1、Hash Base Shuffle V1

    • Shuffle Map Task过程按照 Hash 的方式重组 Partition 的数据,不进行排序。每个Map Task需要为每个下游的Reduce Task创建一个单独的文件

    • Shuffle过程中会生成海量的小文件。同时打开过多文件、伴随大量的随机磁盘 I/O 操作与大量的内存开销

      • 存在问题:

      • 生成大量文件,占用文件描述符,同时引入 DiskObjectWriter 带来的 Writer Handler 的缓存也非常消耗内存

      • 如果在 Reduce Task 时需要合并操作的话,会把数据放在一个 HashMap 中进行合并,若数据量较大,很容易引发 OOM

    2、Hash Bash Shuffle V2--File Consolidation

     - 针对第一问题,spark引入了File Consolidation机制。一个 Executor 上所有的 Map Task 生成的分区文件只有一份,即将所有的 Map Task 相同的分区文件合并,这样每个 Executor 上最多只生成 N 个分区文件
     - 允许不同的task复用同一批磁盘文件,有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。一定程度上解决了Hash V1中的问题,但不彻底。
     - Hash Shuffle 规避了排序,提高了性能;
    

    3、sort Base Shuffles

    • Sort Base Shuffle大大减少了shuffle过程中产生的文件数,提高Shuffle的效率;

    • 每个 Task 不会为后续的每个 Task 创建单独的文件,而是将所有对结果写入同一个文件。该文件中的记录首先是按照Partition Id 排序,每个 Partition 内部再按照 Key 进行排序,Map Task 运行期间会顺序写每个 Partition 的数据,同时生成一个索引文件记录每个 Partition 的大小和偏移量

    • 在 Reduce 阶段,Reduce Task 拉取数据做 Combine 时不再采用 HashMap,而是采用ExternalAppendOnlyMap,该数据结构在做 Combine 时,如果内存不足,会刷写磁盘,避免大数据情况下的 OOM

    • Sort Shuffle 解决了 Hash Shuffle 的所有弊端,但是因为需要其 Shuffle 过程需要对记录进行排序,所以在性能上有所损失。

    • Tungsten-Sort Based Shuffle / Unsafe Shuffle

      • 它的做法是将数据记录用二进制的方式存储,直接在序列化的二进制数据上 Sort 而不是在 Java 对象上,这样一方面可以减少内存的使用和 GC 的开销,另一方面避免 Shuffle 过程中频繁的序列化以及反序列化。在排序过程中,它提
        供 cache-efficient sorter,使用一个 8 bytes 的指针,把排序转化成了一个指针数组的排序,极大的优化了排序性能。

      • 限制:

      • Shuffle 阶段不能有 aggregate 操作

      • 分区数不能超过一定大小(2^24-1,这是可编码的最大 Parition Id),

      • 像 reduceByKey 这类有 aggregate 操作的算子是不能使用Tungsten-Sort Based Shuffle,它会退化采用 Sort Shuffle

    SortShuffleV2

    • 从 Spark1.6.0 开始,把 Sort Shuffle 和 Tungsten-Sort Based Shuffle 全部统一到 Sort Shuffle 中,如果检测到满足Tungsten-Sort Based Shuffle 条件会自动采用 Tungsten-Sort Based Shuffle,否则采用 Sort Shuffle
    • 从Spark2.0 开始,Spark 把 Hash Shuffle 移除, Spark2.x 中只有一种 Shuffle,即为 Sort Shuffle

    Shuffle Writer

    在DAG阶段以shuffle为界,划分stage,上游stage做map task,每个map task将计算结果数据分成多份,每一份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;

    • ShuffleWriter(抽象类),有3个具体的实现:

      • SortShuffleWriter。sortShulleWriter 需要在 Map 排序
      • UnsafeShuffleWriter。使用 Java Unsafe 直接操作内存,避免Java对象多余的开销和GC 延迟,效率高
      • BypassMergeSortShuffleWriter。和Hash Shuffle的实现基本相同,区别在于map task输出汇总一个文件,同时还会产生一个index file
    • ShuffleWriter 有各自的应用场景。分别如下:

      • 不满足以下条件使用 SortShuffleWriter
      • 没有map端聚合,RDD的partitions分区数小于16,777,216,且 Serializer支持 relocation,使用UnsafeShuffleWriter
      • 没有map端聚合操作 且 RDD的partition分区数小于200个,使用 BypassMergerSortShuffleWriter
    • bypass运行机制

    • shuffle map task数量 <= spark.shuffle.sort.bypassMergeThreshold (缺省200)

    • 不是聚合类的shuffle算子

    • Bypass机制 Writer 流程如下:

      • 每个Map Task为每个下游 reduce task 创建一个临时磁盘文件,并将数据按key进行hash然后根据hash值写入内存缓冲,缓冲写满之后再溢写到磁盘文件

      • 最后将所有临时磁盘文件都合并成一个磁盘文件,并创建索引文件

      • Bypass方式的Shuffle Writer机制与Hash Shuffle是类似的,在shuffle过程中会创建很多磁盘文件,最后多了一个磁盘文件合并的过程。Shuffle Read的性能会更好

      • Bypass方式与普通的Sort Shuffle方式的不同在于

      • 磁盘写机制不同

      • 根据key求hash,减少了数据排序操作,提高了性能

    • Shuffle Writer 流程

      • 数据先写入一个内存数据结构中:不同的shuffle算子,可能选用不同的数据结构(reduceByKey会选用Map结构;join类的shuffle算子,选用Array数据结构)
      • 检查是否达到内存阈值。
      • 数据排序:在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认是10000条
      • 数据写入缓冲区:写入磁盘文件是通过Java的 BufferedOutputStream 实现的
      • 重复写多个临时文件:一个 Task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,会产生多个临时文件
      • 临时文件合并:将所有的临时磁盘文件进行合并,这就是merge过程。
      • 写索引文件:标识了下游各个 Task 的数据在文件中的 start offset 与 end offset

    Shuffle MapOutputTracker

    • MapOutputTracker负责管理Writer和Reader的沟通.

    • Shuffle Writer会将中间数据保存到Block里面,然后将数据的位置发送给MapOutputTracker

    • Shuffle Reader通过向 MapOutputTracker 获取中间数据的位置之后,才能读取到数据。

    • Shuffle Reader 需要提供 shuffleId、mapId、reduceId 才能确定一个中间数据

      • shuffleId,表示此次shuffle的唯一id
      • mapId,表示map端 rdd 的分区索引,表示由哪个父分区产生的数据
      • reduceId,表示reduce端的分区索引,表示属于子分区的那部分数据
    • MapOutputTracker在executor和driver端都存在:

      • MapOutputTrackerMaster 和 MapOutputTrackerMasterEndpoint(负责通信) 存在于driver
      • MapOutputTrackerWorker 存在于 executor 端
      • MapOutputTrackerMaster 负责管理所有 shuffleMapTask 的输出数据,每个 shuffleMapTask 执行完后会把执行结果(MapStatus对象)注册到MapOutputTrackerMaster
      • MapOutputTrackerMaster 会处理 executor 发送的 GetMapOutputStatuses 请求,并返回serializedMapStatus 给 executor 端
      • MapOutputTrackerWorker 负责为 reduce 任务提供 shuffleMapTask 的输出数据信息(MapStatus对象)
      • 如果MapOutputTrackerWorker在本地没有找到请求的 shuffle 的 mapStatus,则会向MapOutputTrackerMasterEndpoint 发送 GetMapOutputStatuses 请求获取对应的 mapStatus

    Shuffle Reader

    • 下游stage做reduce task,每个reduce task通过网络拉取上游stage中所有map task的指定分区结果数据,该过程叫做shuffle read

    • Map Task 执行完毕后会将文件位置、计算状态等信息封装到 MapStatus 对象中,再由本进程中的MapOutPutTrackerWorker 对象将其发送给Driver进程的MapOutPutTrackerMaster对象

    • Reduce Task开始执行之前会先让本进程中的 MapOutputTrackerWorker 向 Driver 进程中的MapOutputTrackerMaster 发动请求,获取磁盘文件位置等信息

    • 当所有的Map Task执行完毕后,Driver进程中的 MapOutputTrackerMaster 就掌握了所有的Shuffle文件的信息。此时MapOutPutTrackerMaster会告诉MapOutPutTrackerWorker磁盘小文件的位置信息

    • 完成之前的操作之后,由 BlockTransforService 去 Executor 所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M

    Hadoop Shuffle 与 Spark Shuffle 的区别

    共同点:

    • 二者从功能上看是相似的;从High Level来看,没有本质区别,实现(细节)上有区别

    区别:

    • Hadoop中有一个Map完成,Reduce便可以去fetch数据了,不必等到所有Map任务完成;而Spark的必须等到父stage完成,也就是父stage的 map 操作全部完成才能去fetch数据。这是因为spark必须等到父stage执行完,才能执行子stage,主要是为了迎合stage规则
    • Hadoop的Shuffle是sort-base的,那么不管是Map的输出,还是Reduce的输出,都是partition内有序的,而spark不要求这一点
    • Hadoop的Reduce要等到fetch完全部数据,才将数据传入reduce函数进行聚合,而 Spark是一边fetch一边聚合

    Shuffle优化

    Spark作业的性能主要就是消耗了shuffle过程,因为该环节包含了众多低效的IO操作:磁盘IO、序列化、网络数据传输等;如果要让作业的性能更上一层楼,就有必要对 shuffle 过程进行调优

    开发过程中对 Shuffle 的优化:

    • 减少Shuffle过程中的数据量
    • 避免Shuffle

    Shuffle 的优化主要是参数优化

    • 优化一:调节 map 端缓冲区大小

      • spark.shuffle.file.buffer 默认值为32K。调节map端缓冲的大小,避免频繁的磁盘IO操作,进而提升任务整体性能
    • 优化二:调节 reduce 端拉取数据缓冲区大小

      • spark.reducer.maxSizeInFlight 默认值为48M,设置shuffle read阶段buffer缓冲区大小,这个buffer缓冲决定了每次能够拉取多少数据
      • 在内存资源充足的情况下,可适当增加参数的大小(如96m),减少拉取数据的次数及网络传输次数,进而提升性能
    • 优化三:调节 reduce 端拉取数据重试次数及等待间隔

      • spark.shuffle.io.maxRetries,默认值3。最大重试次数
      • spark.shuffle.io.retryWait,默认值5s。每次重试拉取数据的等待间隔
      • 一般调高最大重试次数,不调整时间间隔
    • 优化四:调节 Sort Shuffle 排序操作阈值

      • spark.shuffle.sort.bypassMergeThreshold,默认值为200
      • 当使用SortShuffleManager时,如果的确不需要排序操作,建议将这个参数调大
    • 优化五:调节 Shuffle 内存大小

      • Spark给 Shuffle 阶段分配了专门的内存区域,这部分内存称为执行内存
      • 如果内存充足,而且很少使用持久化操作,建议调高这个比例,给 shuffle 聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘
      • 合理调节该参数可以将性能提升10%左右
      • 尽量减少shuffle次数
    // 两次shuffle
    rdd.map(...).repartition(1000).reduceByKey(_ + _, 3000)
    // 一次shuffle
    rdd.map(...).repartition(3000).reduceByKey(_ + _)
    
    • 必要时主动shuffle,通常用于改变并行度,提高后续分布式运行速度
      rdd.repartiton(largerNumPartition).map(…)…

    • 使用treeReduce & treeAggregate替换reduce & aggregate。数据量较大时,reduce & aggregate一次性聚合,shuffle量太大,而treeReduce & treeAggregate是分批聚合,更为保险。


    思维导图

     

    更多相关内容
  • Shuffle的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。为什么MapReduce计算...
  • Spark Shuffle 原理

    2021-06-23 19:23:38
    Spark Shuffle 原理 文章目录Spark Shuffle 原理Shuffle 中的两个 stageShuffle 中的任务数reduce 拉取数据的过程HashShuffle未经优化的 HashShuffleManager优化的 ...

    Spark Shuffle 原理

    Shuffle 中的两个 stage

    在 stage 划分时,最后一个 stage 为 ResultStage,其他的则都为 ShuffleMapStage;

    ResultStage : 代表着由一个 action 算子的触发,并代表 Spark 作业的结束;

    ShuffleMapStage : 代表着一个引起 Shuffle 的算子的执行,为下游的 stage 进行 shuffle write;

    Shuffle 中的任务数

    • MapTask : 最开始的 MapTask 如果是读的 hdfs 上的数据,那么分区的数量由 split 的数量决定;
    • ReduceTask : reduce 端的分区默认取 spark.default.parallelism 的值; 如果没有设置,那么在没有发生重分区的情况下,reduce 端的分区数等于 map 端最有一个 RDD 的分区数量; 如果发生重分区,以重分区数量为准;

    reduce 拉取数据的过程

    map task 为下游 stage 进行 shuffle write,reduce task 进行 shuffle read; 那么 reduce task 就需要知道在哪些地方拉取 map task 产生的数据:

    1. map task 执行完毕后会将计算状态以及磁盘小文件位置等信息封装到 MapStatus 对象中,然后由本进程中的 MapOutPutTrackerWorker 对象将 MapStatus 对象发送给Driver进程的 MapOutPutTrackerMaster 对象;

    2. 在reduce task开始执行之前会先让本进程中的 MapOutputTrackerWorker 向Driver进程中的 MapoutPutTrakcerMaster 发动请求,请求磁盘小文件位置信息;

    3. 当所有的Map task执行完毕后,Driver进程中的 MapOutPutTrackerMaster 就掌握了所有的磁盘小文件的位置信息。此时 MapOutPutTrackerMaster 会告诉 MapOutPutTrackerWorker 磁盘小文件的位置信息;

    4. 完成之前的操作之后,由 BlockTransforService 去Executor所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M(reduce task每次最多拉取48M数据,将拉来的数据存储到Executor内存的执行内存中)。

    HashShuffle

    未经优化的 HashShuffleManager

    shuffle write 为下游的 stage 进行数据的准备和划分; 每个 task 将 key 按照 hash 进行数据的划分,将相同 key 的数据写入同一个磁盘文件,而每一个磁盘文件只属于下游的一个 task。

    而在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

    使用未经优化的 HashShuffleManager 会产生大量的磁盘文件主要体现在: 下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件

    所以总共生成的文件数为:当前task的数量 * 下个 stage 的 task 数量

    未优化的HashShuffleManager工作原理如图所示:

    优化的 HashShuffleManager

    为了优化 HashShuffleManager,可以通过设置 spark.shuffle.consolidateFiles,默认为false,将其设置为 true 打开;

    开启了之后,和未经优化的 HashShuffleManager 的主要区别为 : 下一个 stage 的 task 有多少个,当前 stage 的每个CPU Core 就要创建多少份磁盘文件;

    每个Executor创建的磁盘文件的数量的计算公式为 : CPU core的数量 * 下一个stage的task数量

    SortShuffle

    SortShuffleManager

    SortShuffleManager 的工作模式为,先将数据写入内存数据结构; 如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。

    每写入一条会判断是否达到阈值,达到阈值了之后会进行溢写磁盘;

    在溢写磁盘之前会进行对 key 的排序,写入到内存缓冲区,然后按照每批次默认 10000 条的数量进行多次溢写磁盘; 每次溢写会产生一个临时文件;

    而最后结束的时候,会将所有的临时文件进行 merge,并产生一个索引文件,标识了下游各个task的数据在文件中的start offset与end offset;

    所以每个 executor 上,只会产生当前 executor 上的 task 数的磁盘文件数量

    BypassSortShuffleManager

    bypass运行机制的触发条件如下:

    • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值
    • 不是聚合类的shuffle算子

    此时,每个 task 会为下游的每一个 task 产生一份临时文件,按照 key 进行 hash 将 相同 key 放入同一个文件; 在写磁盘之前,会写到内存的缓冲区,缓冲区满了之后
    才会进行数据的溢写;

    上面的步骤和 未经优化的 HashShuffleManager 相同,而 bypass 机制下,后续会进行临时文件的合并,并生成一份索引文件;

    和 SortShuffleManager的不同是

    1. 写入磁盘的机制不同
      • bypass 是内存缓冲区满了才溢写
      • sort 是按照批次 10000 条进行溢写
    2. 不需要排序

    每个 executor 上,只会产生当前 executor 上的 task 数的磁盘文件数量

    展开全文
  • Spark Shuffle原理详解

    2021-05-18 15:40:33
    目录(1)Shuffle概述(2)Hash Shuffle机制(2.1)Hash Shuffle概述(2.2)没有优化之前的Hash Shuffle机制(2.3)优化后的Hash Shuffle机制(3)Sort Shuffle机制(4)Spark Shuffle调优 (1)Shuffle概述 Shuffle...

    (1)Shuffle概述

    Shuffle 就是对数据进行重组,是把一组无规则的数据尽量转换成一组具有一定规则的数 据。由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。

    在 MapReduce 框架,Shuffle 是连接 Map 和 Reduce 之间的桥梁,Map 阶段通过 shuffle 读取 数据并输出到对应的 Reduce。而 Reduce 阶段负责从 Map 端拉取数据并进行计算。在整个 shuffle 过程中,往往伴随着大量的磁盘和网络 I/O。所以 shuffle 性能的高低也直接决定 了整个程序的性能高低。下图为 Hadoop Shuffle 过程。

    在这里插入图片描述

    在这里插入图片描述

    Spark 也有自己的 shuffle 实现过程。在 DAG 调度的过程中,Stage 阶段的划分是根据是否 有 shuffle 过程,也就是存在 ShuffleDependency 宽依赖的时候,需要进行 shuffle。 并且在划分 Stage 并构建 ShuffleDependency 的时候进 行 shuffle 注册,获取后续数据读取所需要的 ShuffleHandle, 最终每一个 job 提交后都会 生成一个 ResultStage 和若干个 ShuffleMapStage,其中 ResultStage 表示生成作业的最终 结果所在的 Stage。ResultStage 与 ShuffleMapStage 中的 task 分别对应着 ResultTask 与 ShuffleMapTask。一个作业,除了最终的 ResultStage 外,其他若干 ShuffleMapStage 中各 个 ShuffleMapTask 都需要将最终的数据根据相应的 Partitioner 对数据进行分组,然后持 久化分区的数据。

    在这里插入图片描述

    (2)Hash Shuffle机制

    (2.1)Hash Shuffle概述

    在 spark-1.6 版本之前,一直使用 HashShuffle,在 spark-1.6 版本之后使用 Sort Shuffle,因为 HashShuffle 存在的不足所以就替换了 HashShuffle。

    我们知道,Spark 的运行主要分为 2 部分:一部分是驱动程序,其核心是 SparkContext。另 一部分是 Worker 节点上 Task,它是运行实际任务的。程序运行的时候,Driver 和 Executor 进程相互交互,Driver 会分配 Task 到 Executor,也就是 Driver 跟 Executor 会进行网络 传输。另外,当前 Task 要抓取其他上游的 Task 的数据结果,所以这个过程中就不断的产 生网络结果。其中下一个 Stage 向上一个 Stage 要数据这个过程,我们就称之为 Shuffle。

    (2.2)没有优化之前的Hash Shuffle机制

    在这里插入图片描述
    HashShuffle没有优化之前的细节过程:

    在 HashShuffle 没有优化之前,每一个 ShufflleMapTask 会为每一个 ReduceTask 创建一个bucket 缓存,并且会为每一个 bucket 创建一个文件。这个 bucket 存放的数据就是经过 Partitioner 操作(默认是 HashPartitioner)之后找到对应的 bucket 然后放进去,最后将数 据刷新 bucket 缓存的数据到磁盘上,即对应的 block file。

    然 后 ShuffleMapTask 将 输 出 作 为 MapStatus 发 送 到 DAGScheduler 的 MapOutputTrackerMaster,每一个 MapStatus 包含了每一个 ResultTask 要拉取的数据的位 置和大小。

    接下来 ResultTask 去利用 BlockStoreShuffleFetcher 向 MapOutputTrackerMaster 获取 MapStatus,看哪一份数据是属于自己的,然后底层通过 BlockManager 将数据拉取过来。

    拉取过来的数据会组成一个内部的 ShuffleRDD,优先放入内存,内存不够用则放入磁盘, 然后 ResulTask 开始进行聚合,最后生成我们希望获取的那个 MapPartitionRDD。

    自己的话总结:

    每一个上游ShufflleMapTask 根据下游 ReduceTask数量,产生对应多个的bucket内存,这个bucket存放的数据是经过Partition操作(默认是Hashpartition)之后找到对应的 bucket 然后放进去,bucket内存大小默认是32k,最后将bucket缓存的数据溢写到磁盘,即为对应的block file。接下来Reduce Task底层通过 BlockManager 将数据拉取过来。拉取过来的数据会组成一个内部的 ShuffleRDD,优先放入内存,内存不够用则放入磁盘。

    没有优化的Hash Shuffle的缺点

    如上图所示:在这里有 1 个 worker, 2 个 executor, 每一个 executor 运行 2 个 ShuffleMapTask, 有三个 ReduceTask, 计算方式为:executor 数量每个 executor 的 ShuffleMapTask 数量ReduceTask 数量。所以总共就有 2 * 2 * 3=12 个 bucket 以及对应 12 个 block file(分区文件)。

    • 如果数据量较大,将会生成 M*R 个小文件,比如 ShuffleMapTask 有 100 个,ResultTask 有 100 个,这就会产生 100 * 100=10000 个小文件
    • bucket 缓存很重要,需要将 ShuffleMapTask 所有数据都写入 bucket,然后再刷到磁盘。 那么如果 Map 端数据过多,这就很容易造成内存溢出。尽管后面有优化,bucket 写入的数 据达到刷新到磁盘的阀值之后,就会将数据一点一点的刷新到磁盘,但是这样磁盘 I/O 就多 了。

    (2.3)优化后的Hash Shuffle机制

    在这里插入图片描述
    HashShuffle优化之后的细节过程:
    每一个 Executor 进程根据核数,决定 Task 的并发数量,比如 executor 核数是 2,那就可 以并发运行两个 task,如果是一个则只能运行一个 task。

    假设 executor 核数是 1,ShuffleMapTask 数量是 M,那么它依然会根据 ResultTask 的数量 R, 创建 R 个 bucket 缓存,然后对 key 进行 hash,数据进入不同的 bucket 中,每一个 bucket 对应着一个 block file,用于刷新 bucket 缓存里的数据。

    然后下一个 task 运行的时候,就不会再创建新的 bucket 和 block file,而是复用之前的 task 已经创建好的 bucket 和 block file。即所谓同一个 Executor 进程里所有 Task 都会把 相同的 key 放入相同的 bucket 缓冲区中。

    这样的话, 生成文件的数量就是(本地 worker 的所有 executor 对应的 cores 的总数 *ResultTask 数量)如上图所示,即 2 * 3 = 6 个文件,每一个 Executor 的 shuffleMapTask 数量 100,ReduceTask 数量即为 100。

    接下来举例比较一下,未优化的 HashShuffle 的文件数是 2 * 100 * 100 =20000,优化之后的 数量是 2*100 = 200 文件,相当于少了 100 倍。

    自己的话总结:

    每一个 Executor 进程根据核数,决定 Task 的并发数量,如果executor 核数是1,则只能运行一个task。ShuffleMapTask 会根据 ResultTask 的数量,产生对应多个的bucket内存,然后对 key 进行 hash分区,数据进入不同的 bucket 中,每一个 bucket 对应着一个 block file,用于刷新 bucket缓存里的数据。然后下一个 task 运行的时候,就不会再创建新的 bucket 和 block file,而是复用之前的 task 已经创建好的 bucket 和 block file。即所谓同一个 Executor 进程里所有 Task 都会把 相同的 key放入相同的 bucket 缓冲区中。

    优化过的Hash Shuffle的缺点
    如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧 过大,也会产生很多小文件。

    (3)Sort Shuffle机制

    为了缓解 Shuffle 过程产生文件数过多和 Writer 缓存开销过大的问题,spark 引入了类似 于 hadoop Map-Reduce 的 shuffle 机制。该机制每一个 ShuffleMapTask 不会为后续的任务 创建单独的文件,而是会将所有的 Task 结果写入同一个文件,并且对应生成一个索引文件。 以前的数据是放在内存缓存中,等到缓存读取完数据后再刷到磁盘,现在为了减少内存的使 用,在内存不够用的时候,可以将输出溢写到磁盘。结束的时候,再将这些不同的文件联合 内存(缓存)的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另 一方面减少 Writer 缓存所占用的内存大小,而且同时避免 GC 的风险和频率。

    在这里插入图片描述

    Sort Shuffle原理: 普通的SortShuffle
    在这里插入图片描述

    在普通模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可以选用不同的数据结构。如果是由聚合操作的shuffle算子,就是用map的数据结构(边聚合边写入内存),如果是join的算子,就使用array的数据结构(直接写入内存)。接着,每写一条数据进入内存数据结构之后,就会判断是否达到了某个临界值,如果达到了临界值的话,就会尝试的将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

    在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,排序之后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批次1万条数据的形式分批写入磁盘文件,写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

    此时task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写,会产生多个临时文件,最后会将之前所有的临时文件都进行合并,最后会合并成为一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件(标识了下游各个task的数据在文件中的start offset与end offset)。最终再由下游的task根据索引文件读取相应的数据文件。

    Sort Shuffle原理: bypass运行机制
    在这里插入图片描述

    此时上游stage的task会为每个下游stage的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

    bypass机制与普通SortShuffleManager运行机制的不同在于:

    a、磁盘写机制不同;
    b、不会进行排序。

    也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

    自己的话总结:
    普通的SortShuffle机制

    在普通模式下,数据会先写入一个内存数据结构中,如果是由聚合操作的shuffle算子用map数据结构,如果是join算子就用Array数据结构。在写入的过程中如果达到了临界值,就会将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。在溢写磁盘之前,会先根据key对内存数据结构中的数据进行排序,排序好的数据,会以每批次1万条数据的形式分批写入磁盘文件。在task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写,会产生多个临时文件,最后会将之前所有的临时文件都进行合并,最后会合并成为一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件,最终再由下游的task根据索引文件读取相应的数据文件。

    bypass运行机制

    bypass的就是不排序,还是用hash去为key分磁盘文件,分完之后再合并,形成一个索引文件和一个合并后的key hash文件。省掉了排序的性能。

    Sort Shuffle 有几种不同的策略:

    • BypassMergeSortShuffleWriter(Bypass 机制)
    • SortShuffleWriter(普通机制)
    • UnsafeShuffleWriter

    对于 BypassMergeSortShuffleWriter,使用这个模式的特点为:

    • 主要用于处理不需要排序和聚合的 Shuffle 操作,所以数据是直接写入文件,数据量较大 的时候,网络 I/O 和内存负担较重。
    • 主要适合处理 Reducer 任务数量比较少的情况。
    • 将每一个分区写入一个单独的文件,最后将这些文件合并,减少文件数量。但是这种方式 需要并发打开多个文件,对内存消耗比较大。

    因为 BypassMergeSortShuffleWriter 这种方式比 SortShuffleWriter 更快,所以如果在 Reducer 数 量 不 大 , 又 不 需 要 在 map 端 聚 合 和 排 序 , 而 且 Reducer 的 数 目 小 于 spark.shuffle.sort.bypassMergeThreshold 指定的阀值(默认 200)时,就是用的是这种 方式(即启用条件)。

    对于 SortShuffleWriter(普通机制),使用这个模式的特点为:

    • 比较适合数据量很大的场景或者集群规模很大。
    • 引入了外部排序器,可以支持在 Map 端进行本地聚合或者不聚合。
    • 如果外部排序器 enable 了 spill 功能,如果内存不够,可以先将输出溢写到本地磁盘, 最后将内存结果和本地磁盘的溢写文件进行合并。

    另外,这个 Sort-Based Shuffle 跟 Executor 核数没有关系,即跟并发度没有关系,它是每 一个 ShuffleMapTask 都会产生一个 data 文件和 index 文件, 所谓合并也只是将该 ShuffleMapTask 的各个 partition 对应的分区文件合并到 data 文件而已。所以这个就需要 和 Hash-BasedShuffle 的 consolidation 机制区别开来。

    (4)Spark Shuffle调优

    • 调节map端缓冲区大小

      • map端缓冲的默认配置是32KB
      • val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
    • 调节reduce端拉取数据缓冲区大小

      • 如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能
      • reduce端数据拉取缓冲区大小默认48MB
      • val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")
    • 调节reduce端拉取数据重试次数

      • reduce端拉取数据重试次数默认为3
      • val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6")
    • 调节reduce端拉取数据等待间隔

      • reduce端拉取数据等待间隔默认为5s
      • val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s")
    • 调节SortShuffle排序操作阈值

      • SortShuffleManager排序操作阈值默认为200
      • val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")

    以上内容仅供参考学习,如有侵权请联系我删除!
    如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
    您的鼓励就是博主最大的动力!

    展开全文
  •   今天学习 Spark Shuffle。昨天文章提到了 Spark 划分 stage 时,分为了 ShuffleMapStage 和 ResultStage。没看过的可以看昨天的文章。 【SparkSpark 任务调度 在划分 stage 时: 前面的所有 stage 被称为 ...

      今天学习 Spark Shuffle。昨天文章提到了 Spark 划分 stage 时,分为了 ShuffleMapStage 和 ResultStage。没看过的可以看昨天的文章。

    【Spark】Spark 任务调度

    在这里插入图片描述

    在划分 stage 时:

    • 前面的所有 stage 被称为 ShuffleMapStage。ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘。
    • 最后一个 stage 称为 finalStage,它本质上是一个 ResultStage 对象,ResultStage 对应代码中的 action 算子,将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束。

    下面讲 Spark 的两种 Shuffle。

    1.HashShuffle

    1.1 未优化的 HashShuffle

    假设每个 Executor 只有 1 个 CPU core,无论这个 Executor上分配多少个 Task 线程,同一时间都只能执行一个 Task 线程。例如 3 个 Reducer,具体过程如下:

    • 在 Task 中进行 Hash 计算,分区器计算分区(hash 值 % num_reduce,这里是 3),得到 3 个不同的分区(3 类数据);
    • 想把不同的数据汇聚然后计算出最终的结果,Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合;
    • 总文件数:每 1 个 Task 输出 3 份本地文件,这里有 4 个 Mapper Tasks,所以总共输出了 4 x 3 = 12 个本地小文件。

    在这里插入图片描述

    1.2 优化后的 HashShuffle

    优化的 HashShuffle 过程就是启用合并机制,合并机制就是复用 buffer,开启合并机制的配置是 spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。还是上面的例子,过程如下:

    • 在同一个进程中,无论是有多少过 Task,都会把同样的 Key 放在同一个 Buffer 里;
    • 然后把 Buffer 中的数据写入以 Core 数量为单位的本地文件中,所以这里是每个 Core 只产出了 3 份本地文件;
    • 这里有 2 个 Core 4个 Mapper Tasks,所以总共输出是 2 x 3 = 6 个本地小文件。

    在这里插入图片描述

    2.SortShuffle 解析

    2.1 普通 SortShuffle

    在该模式下,数据会先写入一个数据结构:

    • reduceByKey 写入 Map,一边通过 Map 局部聚合,一边写入内存;
    • Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。

    之后开始溢写磁盘,具体步骤:

    • 先根据 key 进行排序;
    • 排序过后的数据,会分批写入到磁盘文件中。默认数据会以每批 10000 条写入到磁盘文件。
    • 写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个 Task 过程会产生多个临时文件。
    • 最后在每个Task中,将所有的临时文件合并,这就是 merge 过程,此过程将所有临时文件读取出来,一次写入到最终文件。

    最终一个 Task 的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个 Task 的数据在文件中的索引(start offset和end offset)。

    在这里插入图片描述

    2.2 bypass SortShuffle

    bypass 运行机制的触发条件如下:

    • shuffle reduce task数量小于 spark.shuffle.sort.bypassMergeThreshold 参数的值,默认为 200;
    • 不是聚合类的shuffle算子(比如reduceByKey)。

    执行过程:

    • Task 为每个 reduce 端的 Task 都创建一个临时磁盘文件;
    • 将数据按 key 进行 hash,然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中;
    • 写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的;
    • 最后,将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

    与 HashShuffler 对比:

    • 该过程的磁盘写机制跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建大量磁盘文件;
    • bypass SortShuffle 在最后会做一个磁盘文件的合并,因此只生成少量的最终磁盘文件,因此相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

    与普通的 ShortShuffler 对比:

    • 不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

    在这里插入图片描述
    欢迎关注。
    在这里插入图片描述

    展开全文
  • Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端...
  • sparkshuffle原理、shuffle操作问题解决和参数调优.doc
  • Spark Shuffle分为Write和Read两个过程。 在Spark中负责shuffle过程的执行、计算、处理的组件主要是 ShuffleManager,其是一个trait,负责管理本地以及远程的block数据的shuffle操作。 所有方法如下图所示: ...
  • Spark Shuffle运行原理

    千次阅读 2020-09-07 12:48:19
    1.什么是spark shuffleShuffle中文意思就是“洗牌”,在SparkShuffle的目的是为了保证每一个key所对应的value都会汇聚到同一个分区上去聚合和处理。 Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行...
  • 1 shuffle原理  1.1 mapreduce的shuffle原理  1.1.1 map task端操作  1.1.2 reduce task端操作  1.2 spark现在的SortShuffleManager  2 Shuffle操作问题解决  2.1 数据倾斜原理  2.2 数据倾斜问题...
  • MapReduce Shuffle原理Spark Shuffle原理

    千次阅读 2016-11-26 20:15:03
    MapReduce的Shuffle过程介绍   Shuffle的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有...
  • Spark shuffle一、Spark Shuffle二、Spark 1.6.x三、Sort ShuffleSpark 2.3.x,Spark 1.6.x)1.普通运行机制2..bypass优化机制四、Spark Shuffle总结细节解释五、Spark shuffle调优 一、Spark Shuffle 在DAG调度的...
  • Spark Shuffle原理解析

    2017-10-11 14:11:00
    Spark Shuffle原理解析 一:到底什么是Shuffle? Shuffle中文翻译为“洗牌”,需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。 二:Shuffle可能面临的问题?运行...
  • 目录一、基本介绍1.1 Lineage1.2 窄依赖1.3 宽依赖二、Spark Shuffle原理2.1 ShuffleWriter2.1.1 BypassMergeSortShuffleWriter与SortShuffleWriter的区别2.2 Spark Shuffle2.3 Shuffle相关参数三、源码参考资料 ...
  • 详解 sparkshuffle原理

    千次阅读 2021-03-31 07:40:58
    sparkshuffle计算模型与hadoop的shuffle模型原理相似,其计算模型都是来自于MapReduce计算模型,将计算分成了2个阶段,map和reduce阶段。 一 多维度理解shuffle: 二 shuffle过程的内存分配 三 shuffle过程中的...
  • 剖析 Spark Shuffle 原理

    2020-09-22 19:17:15
    剖析 Spark Shuffle 原理,主要从两个层面讲解 Shuffle,主要分为: 逻辑层面 物理层面 逻辑层面主要从 RDD 的血统机制出发,从 DAG 的角度来介绍 Shuffle,另外也会介绍Spark 容错机制,而物理层面是从执行角度来...
  • sparkshuffle原理分析 1 、概述 Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。 在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据...
  • ShuffleManager里有四个接口,register,reader,writer和stop。 核心接口则是reader和writer,当前版本reader接口只有1个实现,... 输出的分区数小于spark.shuffle.sort.bypassMergeThreshold,默认是200 sh..
  • 1 Shuffle原理 1.1 MapReduce的shuffle原理 1.1.1 map task端操作 1.1.2 reduce task端操作 1.2 Spark现在的SortShuffleManager 1.2.1 SortShuffleManager运行原理 1.2.2 普通运行机制 1.2.3 bypass机制 2 ...
  • 通过文章“Spark Scheduler内部原理剖析”我们知道,Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的...
  • 1 shuffle原理  1.1 mapreduce的shuffle原理  1.1.1 map task端操作  1.1.2 reduce task端操作  1.2 spark现在的SortShuffleManager 2 Shuffle操作问题解决  2.1 数据倾斜原理 2.2 数据倾斜问题发现与...
  • 逻辑层面主要从 RDD 的血统机制出发,从 DAG 的角度来讲解 Shuffle,另外也会讲解 Spark 容错机制,而物理层面是从执行角度来剖析 Shuffle 是如何发生的。 RDD 血统与 Spark 容错 在 DAG 中,最初的 RDD 被称为...
  • 通过文章“Spark Scheduler内部原理剖析”我们知道,Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的...
  • 什么时候需要 shuffle writer 假如我们有个 spark job 依赖关系如下 我们抽象出来其中的rdd和依赖关系: E <-------n------, C <--n---D---n-----F--s---, A <-------s------ B <--n----`-- G ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,207
精华内容 4,482
关键字:

sparkshuffle原理