精华内容
下载资源
问答
  • spark的shuffle过程
    2020-10-28 16:01:36

    Spark Shuffle过程

    https://www.xuebuyuan.com/3228633.html

    https://zhuanlan.zhihu.com/p/55954840

    https://www.cnblogs.com/itboys/p/9226479.html

    更多相关内容
  • Spark Shuffle 过程

    2022-07-02 11:17:19
    本篇主要阐述Spark Shuffle过程,在执行 Job 任务时,无论是 MapReduce 或者 Spark Shuffle 过程都是比较消耗性能;因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。因此,在这一过程中进行调参优化,...

    前言

    本篇主要阐述Spark Shuffle过程,在执行 Job 任务时,无论是 MapReduce 或者 Spark Shuffle 过程都是比较消耗性能;因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。因此,在这一过程中进行调参优化,就有可能让 Job 执行效率上更好。

    在 Spark 1.2 以前,默认的 Shuffle 计算引擎是 HashShuffleManager。该 ShuffleManager 会产生大量的中间磁盘文件,进而由大量的磁盘 IO 操作影响了性能。

    到 Spark 1.2 以后的版本中,默认的 ShuffleManager 改成了 SortShuffleManager。 SortShuffleManager 相较于 HashShuffleManager 来说进行了一定的改进。主要就在于,每个 Task 在进行 Shuffle 操作时,会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并 (merge)成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个 Stage 的 Shuffle Read Task 拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

    下面是对 HashShuffleManager 和 SortShuffleManager 的分析与原理阐述。

    Spark Shuffle

    HashShuffleManager

    未被优化的HashShuffleManager

    未经优化的HashShuffleManager

    注:假设每个Executor 只有 1 个 CPU core,无论 Executor 上分配多少个 Task 线程,同一时间都只能执行一个 Task 线程。

    1. 首先 Shuffle Write 阶段,主要就是在一个 Stage 结束计算之后,为了下一个 Stage 可以执行 Shuffle 类的算子(比如 reduceBy Key ),而将每个 Task 处理的数据按 Key 进行分类。就是对相同的 Key 执行 Hash算法,从而将相同 Key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 Stage 的一个 Task 。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去;
    2. 每个执行 Shuffle Write 的 Task ,要为下一个 Stage 创建多少个磁盘文件呢?下一个 Stage 的 Task 有多少个,当前 Stage 的每个 Task 就要创建多少份磁盘文件。比如下一个 Stage 总共有 100 个 Task ,那么当前 Stage 的每个 Task 都要创建 100 份磁盘文件。如果当前 Stage 有 50 个 Task ,总共有 10 个 Executor,每个 Executor 执行 5 个 Task ,那么每个 Executor 上总共就要创建 500 个磁盘文件,所有 Executor 上会创建5000 个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的;
    3. Shuffle Read阶段,通常是一个 Stage 刚开始运行的时候。 Stage 中每一个 Task 需要将上一个 Stage 的计算结果( Key 值相同),从各个节点上通过网络都拉取到自己所在的节点上;然后进行 Key 的聚合或连接等操作。由于 Shuffle Write 的过程中, Task 给下游 Stage 的每个 Task 都创建了一个磁盘文件,因此 Shuffle Read 的过程中,每个 Task 只要从上游 Stage 的所有 Task 所在节点上,拉取属于自己的那一个磁盘文件即可;
    4. Shuffle Read 拉取过程是一边拉取一边进行聚合的。每个 Shuffle Read Task 都会有一个自己的 Buffer 缓冲,每次都只能拉取与 Buffer 缓冲相同大小的数据;然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 Buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

    优化的HashShuffleManager

    优化后的HashShuffleManager

    注:设置 spark.shuffle.consolidateFiles为 true 即可开启优化机制(默认为 false)

    1. 开启 consolidate 机制之后,在 Shuffle Write 过程中,Task 就不是为下游 Stage 的每个 Task 创建一个磁盘文件了。此时会出现 ShuffleFileGroup 的概念,每个 ShuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 Stage 的 Task 数量是相同的。一个 Executor 上有多少个 CPU core,就可以并行执行多少个 Task 。而第一批并行执行的每个 Task 都会创建一个 ShuffleFileGroup ,并将数据写入对应的磁盘文件内;
    2. 当 Executor 的 CPU core 执行完一批 Task ,接着执行下一批 Task 时,下一批 Task 就会复用之前已有的 ShuffleFileGroup ,包括其中的磁盘文件。也就是说,此时 Task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 Task 复用同一批磁盘文件,这样就可以有效将多个 Task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 Shuffle Write的性能;
    3. 假设第二个 Stage 有100个 Task ,第一个 Stage 有 50 个 Task ,总共还是有 10 个 Executor,每个 Executor 执行5个 Task 。那么原本使用未经优化的 HashShuffleManager 时,每个 Executor 会产生500个磁盘文件,所有 Executor 会产生 5000 个磁盘文件的。但是此时经过优化之后,每个 Executor 创建的磁盘文件的数量的计算公式为:CPU Core 的数量 * 下一个 Stage 的 Task 数量。也就是说,每个 Executor 此时只会创建 100 个磁盘文件,所有 Executor 只会创建 1000 个磁盘文件。

    SortShuffleManager

    普通运行机制

    普通运行机制

    1. 首先数据会先写入一个内存数据结构中,此时根据不同的 Shuffle 算子,可能选用不同的数据结构。如果是 reduceBy Key 这种聚合类的 Shuffle 算 子,那么会选用 Map数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 Shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之 后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构;
    2. 在溢写到磁盘文件之前,会先根据 Key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数 据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的 形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再 一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能;
    3. Task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文 件。最后会将之前所有的临时磁盘文件都进行合并,这就是 Merge 过程,此时会将之前所有临时磁盘 文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 Task 就只对应一个磁盘 文件,也就意味着该 Task 为下游 Stage 的 Task 准备的数据都在这一个文件中,因此还会单独写一份索引 文件,其中标识了下游各个 Task 的数据在文件中的 Start offset 与 End offset;
    4. SortShuffleManager 由于有一个磁盘文件 Merge 的过程,因此大大减少了文件数量。比如第一个 Stage 有 50 个 Task ,总共有 10 个 Executor ,每个 Executor 执行 5 个 Task ,而第二个 Stage 有 100 个 Task 。由于每 个 Task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁 盘文件。

    bypass 运行机制

    在这里插入图片描述

    1. Task 会为每个下游 Task 都创建一个临时磁盘文件,并将数据按 Key 进行 Hash 然后根据 Key 的 Hash 值,将 Key 写入对应的磁盘文件之中。写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件;
    2. 该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,Shuffle Read的性能会更好;
    3. 而该机制与普通 SortShuffleManager 运行机制的不同在于:磁盘写机制不同、不会进行排序。启用该机制的最大好处在于,Shuffle Write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

    SortShuffleManager 源码说明

    执行 runTask
    org.apache.spark.scheduler.Task
    
    final def run(
          taskAttemptId: Long,
          attemptNumber: Int,
          metricsSystem: MetricsSystem): T = {
        SparkEnv.get.blockManager.registerTask(taskAttemptId)
            
        ................. 中间省略部分代码 ....................
    
        try {
    
          /**
           * @Author: Small_Ran
           * @Date: 2022/6/20
           * @Description: 开始运行 Task(ResultTask、ShuffleMapTask)
           */
          runTask(context)
        } catch {
          case e: Throwable =>
            // Catch all errors; run task failure callbacks, and rethrow the exception.
            try {
              context.markTaskFailed(e)
            } catch {
              case t: Throwable =>
                e.addSuppressed(t)
            }
            context.markTaskCompleted(Some(e))
            throw e
        } finally {
            
          	................. 中间省略部分代码 ....................
        }
      }
    
    org.apache.spark.scheduler.ResultTask
    
    override def runTask(context: TaskContext): U = {
        // Deserialize the RDD and the func using the broadcast variables.
        val threadMXBean = ManagementFactory.getThreadMXBean
        val deserializeStartTime = System.currentTimeMillis()
        val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        val ser = SparkEnv.get.closureSerializer.newInstance()
    
        /**
         * @Author: Small_Ran
         * @Date: 2022/6/20
         * @Description:  ResultTask 是 JOb 的最后一个环节;主要的作用就是输出结果
         * 包括:
         *      1、collect 收集结果到 Driver
         *      2、foreach 打印输出
         *      3、saveAsTextFile 写出到外部系统 HDFS、MySQL、Hive
         */
        val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
        _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
        _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
        } else 0L
    
        func(context, rdd.iterator(partition, context))
      }
    

    注:接下来主要说明 ShuffleMapTask 的实现方式

    选择 Shuffle 写入器
    org.apache.spark.scheduler.ShuffleMapTask.runTask(context: TaskContext)
    
    //      这里的 shuffleManager 就是SparkContext初始化中默认值 manager = org.apache.spark.shuffle.sort.SortShuffleManager
          val manager = SparkEnv.get.shuffleManager
    //      获取 Shuffle 选择器(SerializedShuffleHandle、BypassMergeSortShuffleHandle、BaseShuffleHandle)
          writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    
    org.apache.spark.shuffle.sort.SortShuffleManager
    
    override def getWriter[K, V](
          handle: ShuffleHandle,
          mapId: Int,
          context: TaskContext): ShuffleWriter[K, V] = {
        numMapsForShuffle.putIfAbsent(
          handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    
    //    根据条件判断选择不同的 ShuffleManager,handle值是通过上面的 registerShuffle 方法进行判断
        val env = SparkEnv.get
        handle match {
    
    //      不需要进行 Map 阶段的预聚合、Partition 数量小于16777216
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
            new UnsafeShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf)
    //      不需要进行 Map 阶段的预聚合、Partition 数量小于 200,返回 BypassMergeSortShuffleHandle 对象
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              bypassMergeSortHandle,
              mapId,
              context,
              env.conf)
    //        其他情况,通用情况
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
            new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
        }
      }
    
    ShuffleManager 选择判断
    org.apache.spark.shuffle.sort.SortShuffleManager
    
    override def registerShuffle[K, V, C](
          shuffleId: Int,
          numMaps: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    //    对 BypassMergeSortShuffleHandle 进行的判断
        if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
          // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
          // need map-side aggregation, then write numPartitions files directly and just concatenate
          // them at the end. This avoids doing serialization and deserialization twice to merge
          // together the spilled files, which would happen with the normal code path. The downside is
          // having multiple files open at a time and thus more memory allocated to buffers.
          new BypassMergeSortShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    //    对 SerializedShuffleHandle 进行的判断
        } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
          new SerializedShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else {
          // Otherwise, buffer map outputs in a deserialized form:
          new BaseShuffleHandle(shuffleId, numMaps, dependency)
        }
      }
    
    org.apache.spark.shuffle.sort.SortShuffleWriter
    
    def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
        // We cannot bypass sorting if we need to do map-side aggregation.
    //    首先使用 bypass 运行机制的前提是,在map阶段是否存在预聚合操作(Combine)
        if (dep.mapSideCombine) {
          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
          false
        } else {
    //      这里就是判断是否使用 bypass 运行机制,如果任务数量(分区数量) <= 200(参数:spark.shuffle.sort.bypassMergeThreshold)则使用 bypass 运行机制。
          val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
          dep.partitioner.numPartitions <= bypassMergeThreshold
        }
      }
    
    org.apache.spark.shuffle.sort.SortShuffleManager
    
    def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
        val shufId = dependency.shuffleId
        val numPartitions = dependency.partitioner.numPartitions
    //    判断支不支持序列化器
        if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
          log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
            s"${dependency.serializer.getClass.getName}, does not support object relocation")
          false
    //      判断是否有聚合操作
        } else if (dependency.aggregator.isDefined) {
          log.debug(
            s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined")
          false
    //      判断分区数是否大于16777216
        } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
          log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
            s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
          false
        } else {
          log.debug(s"Can use serialized shuffle for shuffle $shufId")
          true
        }
      }
    
    ShuffleManager 读写数据
    org.apache.spark.scheduler.ShuffleMapTask
    
    /**
           * @Author: Small_Ran
           * @Date: 2022/6/20
           * @Description:  rdd 计算的核心方法就是 iterator 方法
           * SortShuffleWriter 的 write 方法可以分为几个步骤:
           *    1、将上游 RDD 计算出的数据(通过调用 rdd.iterator方法)写入内存缓冲区,在写的过程中如果超过内存阈值就会溢写磁盘文件(可能会写多个文件);
           *    2、最后将溢写的文件和内存中剩余的数据一起进行归并排序后写入到磁盘中形成一个大的数据文件(按照 Key 分区排序);
           *    3、在归并排序后写的过程中,每写一个分区就会手动刷写一遍,并记录下这个分区数据在文件中的位移;
           *    4、最后写完一个 Task 的数据后,磁盘上会有两个文件:数据文件和记录每个 reduce 端 partition 数据位移的索引文件;
           *
           */
          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    
    org.apache.spark.shuffle.sort.SortShuffleWriter
    
    override def write(records: Iterator[Product2[K, V]]): Unit = {
    
    //    获取一个排序器,根据是否需要 map 端聚合传递不同的参数
        sorter = if (dep.mapSideCombine) {
          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
          new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        } else {
          // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
          // care whether the keys get sorted in each partition; that will be done on the reduce side
          // if the operation being run is sortByKey.
          new ExternalSorter[K, V, V](
            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
        }
    
    //    将数据插入排序器中,这个过程或溢写出多个磁盘文件
        sorter.insertAll(records)
    
        // Don't bother including the time to open the merged output file in the shuffle write time,
        // because it just opens a single file, so is typically too fast to measure accurately
        // (see SPARK-3570).
    //    根据 ShuffleID 和分区 ID 获取一个磁盘文件名,MapId 就是 ShuffleMap 端 RDD 的 PartitionID
    //    将多个溢写的磁盘文件和内存中的排序数据进行归并排序并写到一个文件中,同时返回每个reduce端分区的数据在这个文件中的位移
        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    
    //    将索引写入一个索引文件,并将数据文件的文件名由临时文件名改成正式的文件名;为输出文件名加一个uuid后缀
        val tmp = Utils.tempFileWith(output)
        try {
          val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    
    //      这一步将溢写到的磁盘的文件和内存中的数据进行归并排序,并溢写到一个文件中,这一步写的文件是临时文件名
          val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
    
    //      写入索引文件,使用 File.renameTo 方法将临时索引和临时数据文件重命名为正常的文件名
          shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
    
    //      返回一个状态对象,包含 shuffle 服务 Id 和各个分区数据在文件中的位移
          mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
          }
        }
      }
    
    org.apache.spark.util.collection.ExternalSorter
    
    def insertAll(records: Iterator[Product2[K, V]]): Unit = {
        // TODO: stop combining if we find that the reduction factor isn't high
        val shouldCombine = aggregator.isDefined
    
    
        if (shouldCombine) {
          // Combine values in-memory first using our AppendOnlyMap
          val mergeValue = aggregator.get.mergeValue
          val createCombiner = aggregator.get.createCombiner
          var kv: Product2[K, V] = null
          val update = (hadValue: Boolean, oldValue: C) => {
            if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
          }
          while (records.hasNext) {
            addElementsRead()
            kv = records.next()
    
    //        向内存缓冲中插入一条数据,map的键为(partitionId, key)
            map.changeValue((getPartition(kv._1), kv._1), update)
    
    //        每写入一条数据就检查一遍内存。如果缓冲超过阈值,就会溢写到磁盘生成一个文件
            maybeSpillCollection(usingMap = true)
          }
        } else {
          // Stick values into our buffer
          while (records.hasNext) {
            addElementsRead()
            val kv = records.next()
            buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
            maybeSpillCollection(usingMap = false)
          }
        }
      }
    
    展开全文
  • Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章SparkShuffle过程分析:Map阶段处理流程了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个...
  • Spark Shuffle过程详解

    2021-04-15 00:14:31
    点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源‍‍你需要预习:《Spark的Cache和Checkpoint区别和联系拾遗》《Spark Job 逻辑执行图和数据依赖解析》《S...

    点击上方蓝色字体,选择“设为星标

    回复”资源“获取更多资源

    ‍你需要预习:

    《Spark的Cache和Checkpoint区别和联系拾遗

    《Spark Job 逻辑执行图和数据依赖解析

    《Spark Job 物理执行图详解

    上一章里讨论了 job 的物理执行图,也讨论了流入 RDD 中的 records 是怎么被 compute() 后流到后续 RDD 的,同时也分析了 task 是怎么产生 result,以及 result 怎么被收集后计算出最终结果的。然而,我们还没有讨论数据是怎么通过 ShuffleDependency 流向下一个 stage 的?

    对比 Hadoop MapReduce 和 Spark 的 Shuffle 过程

    如果熟悉 Hadoop MapReduce 中的 shuffle 过程,可能会按照 MapReduce 的思路去想象 Spark 的 shuffle 过程。然而,它们之间有一些区别和联系。

    从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。

    从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。

    从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。

    如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑?以及两个处理逻辑应该怎么高效实现?

    Shuffle write

    由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

    shuffle write 的任务很简单,那么实现也很简单:将 shuffle write 的处理逻辑加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最后,该 stage 的 final RDD 每输出一个 record 就将其 partition 并持久化。图示如下:

    上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。每个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为spark.shuffle.file.buffer.kb ,默认是 32KB(Spark 1.1 版本以前是 100KB)。

    其实 bucket 是一个广义的概念,代表 ShuffleMapTask 输出结果经过 partition 后要存放的地方,这里为了细化数据存放位置和数据名称,仅仅用 bucket 表示缓冲区。

    ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算得到 finalRDD 中对应 partition 的 records。每得到一个 record 就将其送到对应的 bucket 里,具体是哪个 bucket 由partitioner.partition(record.getKey()))决定。每个 bucket 里面的数据会不断被写到本地磁盘上,形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 reducer 会去 fetch 属于自己的 FileSegment,进入 shuffle read 阶段。

    这样的实现很简单,但有几个问题:

    1. 产生的 FileSegment 过多。每个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。

    2. 缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了cores * R * 32 KB。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。

    目前来看,第二个问题还没有好的方法解决,因为写磁盘终究是要开缓冲区的,缓冲区太小会影响 IO 速度。但第一个问题有一些方法去解决,下面介绍已经在 Spark 里面实现的 FileConsolidation 方法。先上图:

    可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。这样,每个 worker 持有的文件数降为 cores * R。FileConsolidation 功能可以通过spark.shuffle.consolidateFiles=true来开启。

    Shuffle read

    先看一张包含 ShuffleDependency 的物理执行图,来自 reduceByKey:

    很自然地,要计算 ShuffleRDD 中的数据,必须先把 MapPartitionsRDD 中的数据 fetch 过来。那么问题就来了:

    • 在什么时候 fetch,parent stage 中的一个 ShuffleMapTask 执行完还是等全部 ShuffleMapTasks 执行完?

    • 边 fetch 边处理还是一次性 fetch 完再处理?

    • fetch 来的数据存放到哪里?

    • 怎么获得要 fetch 的数据的存放位置?

    解决问题:

    • 在什么时候 fetch?当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。理论上讲,一个 ShuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 如果其 parent stages 没有执行完,自己是不能被提交执行的),还是选择全部 ShuffleMapTasks 执行完再去 fetch。因为 fetch 来的 FileSegments 要先在内存做缓冲,所以一次 fetch 的 FileSegments 总大小不能太大。Spark 规定这个缓冲界限不能超过 spark.reducer.maxMbInFlight,这里用 softBuffer 表示,默认大小为 48MB。一个 softBuffer 里面一般包含多个 FileSegment,但如果某个 FileSegment 特别大的话,这一个就可以填满甚至超过 softBuffer 的界限。

    • 边 fetch 边处理还是一次性 fetch 完再处理?边 fetch 边处理。本质上,MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到全部数据都 shuffle-sort 后再开始 reduce()。因为 Spark 不要求 shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理。那么如何实现边 shuffle 边处理,而且流入的 records 是无序的?答案是使用可以 aggregate 的数据结构,比如 HashMap。每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 \ record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value),比如上面 WordCount 例子中的 func 就是 hashMap.get(Key) + Value,并将 func 的结果重新 put(key) 到 HashMap 中去。这个 func 功能上相当于 reduce(),但实际处理数据的方式与 MapReduce reduce() 有差别,差别相当于下面两段程序的差别。

        // MapReduce
        reduce(K key, Iterable<V> values) { 
            result = process(key, values)
            return result    
        }
      
        // Spark
        reduce(K key, Iterable<V> values) {
            result = null 
            for (V value : values) 
                result  = func(result, value)
            return result
        }

      MapReduce 可以在 process 函数里面可以定义任何数据结构,也可以将部分或全部的 values 都 cache 后再进行处理,非常灵活。而 Spark 中的 func 的输入参数是固定的,一个是上一个 record 的处理结果,另一个是当前读入的 record,它们经过 func 处理后的结果被下一个 record 处理时使用。因此一些算法比如求平均数,在 process 里面很好实现,直接sum(values)/values.length,而在 Spark 中 func 可以实现sum(values),但不好实现/values.length。更多的 func 将会在下面的章节细致分析。

    • fetch 来的数据存放到哪里?刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果spark.shuffle.spill = false就只用内存。内存使用的是AppendOnlyMap ,类似 Java 的HashMap,内存+磁盘使用的是ExternalAppendOnlyMap,如果内存空间不足时,ExternalAppendOnlyMap可以将 \

       records 进行 sort 后 spill 到磁盘上,等到需要它们的时候再进行归并,后面会详解。使用“内存+磁盘”的一个主要问题就是如何在两者之间取得平衡?在 Hadoop MapReduce 中,默认将 reducer 的 70% 的内存空间用于存放 shuffle 来的数据,等到这个空间利用率达到 66% 的时候就开始 merge-combine()-spill。在 Spark 中,也适用同样的策略,一旦 ExternalAppendOnlyMap 达到一个阈值就开始 spill,具体细节下面会讨论。
    • 怎么获得要 fetch 的数据的存放位置?在上一章讨论物理执行图中的 stage 划分的时候,我们强调 “一个 ShuffleMapStage 形成后,会将该 stage 最后一个 final RDD 注册到 MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size),这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置”。因此,reducer 在 shuffle 的时候是要去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置的。每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给 MapOutputTrackerMaster。

    至此,我们已经讨论了 shuffle write 和 shuffle read 设计的核心思想、算法及某些实现。接下来,我们深入一些细节来讨论。

    典型 transformation() 的 shuffle read

    1. reduceByKey(func)

    上面初步介绍了 reduceByKey() 是如何实现边 fetch 边 reduce() 的。需要注意的是虽然 Example(WordCount) 中给出了各个 RDD 的内容,但一个 partition 里面的 records 并不是同时存在的。比如在 ShuffledRDD 中,每 fetch 来一个 record 就立即进入了 func 进行处理。MapPartitionsRDD 中的数据是 func 在全部 records 上的处理结果。从 record 粒度上来看,reduce() 可以表示如下:

    可以看到,fetch 来的 records 被逐个 aggreagte 到 HashMap 中,等到所有 records 都进入 HashMap,就得到最后的处理结果。唯一要求是 func 必须是 commulative 的(参见上面的 Spark 的 reduce() 的代码)。

    ShuffledRDD 到 MapPartitionsRDD 使用的是 mapPartitionsWithContext 操作。

    为了减少数据传输量,MapReduce 可以在 map 端先进行 combine(),其实在 Spark 也可以实现,只需要将上图 ShuffledRDD => MapPartitionsRDD 的 mapPartitionsWithContext 在 ShuffleMapStage 中也进行一次即可,比如 reduceByKey 例子中 ParallelCollectionRDD => MapPartitionsRDD 完成的就是 map 端的 combine()。

    对比 MapReduce 的 map()-reduce() 和 Spark 中的 reduceByKey():

    • map 端的区别:map() 没有区别。对于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上进行 combine()。

    • reduce 端区别:MapReduce 的 shuffle 阶段先 fetch 数据,数据量到达一定规模后 combine(),再将剩余数据 merge-sort 后 reduce(),reduce() 非常灵活。Spark 边 fetch 边 reduce()(在 HashMap 上执行 func),因此要求 func 符合 commulative 的特性。

    从内存利用上来对比:

    • map 端区别:MapReduce 需要开一个大型环形缓冲区来暂存和排序 map() 的部分输出结果,但 combine() 不需要额外空间(除非用户自己定义)。Spark 需要 HashMap 内存数据结构来进行 combine(),同时输出 records 到磁盘上时也需要一个小的 buffer(bucket)。

    • reduce 端区别:MapReduce 需要一部分内存空间来存储 shuffle 过来的数据,combine() 和 reduce() 不需要额外空间,因为它们的输入数据分段有序,只需归并一下就可以得到。在 Spark 中,fetch 时需要 softBuffer,处理数据时如果只使用内存,那么需要 HashMap 来持有处理后的结果。如果使用内存+磁盘,那么在 HashMap 存放一部分处理后的数据。

    2. groupByKey(numPartitions)

    与 reduceByKey() 流程一样,只是 func 变成 result = result ++ record.value,功能是将每个 key 对应的所有 values 链接在一起。result 来自 hashMap.get(record.key),计算后的 result 会再次被 put 到 hashMap 中。与 reduceByKey() 的区别就是 groupByKey() 没有 map 端的 combine()。对于 groupByKey() 来说 map 端的 combine() 只是减少了重复 Key 占用的空间,如果 key 重复率不高,没必要 combine(),否则,最好能够 combine()。

    3. distinct(numPartitions)

    与 reduceByKey() 流程一样,只是 func 变成 result = result == null? record.value : result,如果 HashMap 中没有该 record 就将其放入,否则舍弃。与 reduceByKey() 相同,在map 端存在 combine()。

    4. cogroup(otherRDD, numPartitions)

    CoGroupedRDD 可能有 0 个、1 个或者多个 ShuffleDependency。但并不是要为每一个 ShuffleDependency 建立一个 HashMap,而是所有的 Dependency 共用一个 HashMap。与 reduceByKey() 不同的是,HashMap 在 CoGroupedRDD 的 compute() 中建立,而不是在 mapPartitionsWithContext() 中建立。

    粗线表示的 task 首先 new 出一个 Array[ArrayBuffer(), ArrayBuffer()],ArrayBuffer() 的个数与参与 cogroup 的 RDD 个数相同。func 的逻辑是这样的:每当从 RDD a 中 shuffle 过来一个 \ record 就将其添加到 hashmap.get(Key) 对应的 Array 中的第一个 ArrayBuffer() 中,每当从 RDD b 中 shuffle 过来一个 record,就将其添加到对应的 Array 中的第二个 ArrayBuffer()。

    CoGroupedRDD => MappedValuesRDD 对应 mapValues() 操作,就是将 [ArrayBuffer(), ArrayBuffer()] 变成 [Iterable[V], Iterable[W]]。

    5. interp(otherRDD) 和 join(otherRDD, numPartitions)

     这两个操作中均使用了 cogroup,所以 shuffle 的处理方式与 cogroup 一样。

    6. sortByKey(ascending, numPartitions)

    sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的处理逻辑与 reduceByKey() 不太一样,没有使用 HashMap 和 func 来处理 fetch 过来的 records。

    sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的处理逻辑是:将 shuffle 过来的一个个 record 存放到一个 Array 里,然后按照 Key 来对 Array 中的 records 进行 sort。

    7. coalesce(numPartitions, shuffle = true)

    coalesce() 虽然有 ShuffleDependency,但不需要对 shuffle 过来的 records 进行 aggregate,所以没有建立 HashMap。每 shuffle 一个 record,就直接流向 CoalescedRDD,进而流向 MappedRDD 中。

    Shuffle read 中的 HashMap

    HashMap 是 Spark shuffle read 过程中频繁使用的、用于 aggregate 的数据结构。Spark 设计了两种:一种是全内存的 AppendOnlyMap,另一种是内存+磁盘的 ExternalAppendOnlyMap。下面我们来分析一下两者特性及内存使用情况

    1. AppendOnlyMap

    AppendOnlyMap 的官方介绍是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是类似 HashMap,但没有remove(key)方法。其实现原理很简单,开一个大 Object 数组,蓝色部分存储 Key,白色部分存储 Value。如下图:

    当要 put(K, V) 时,先 hash(K) 找存放位置,如果存放位置已经被占用,就使用 Quadratic probing 探测方法来找下一个空闲位置。对于图中的 K6 来说,第三次查找找到 K4 后面的空闲位置,放进去即可。get(K6) 的时候类似,找三次找到 K6,取出紧挨着的 V6,与先来的 value 做 func,结果重新放到 V6 的位置。

    迭代 AppendOnlyMap 中的元素的时候,从前到后扫描输出。

    如果 Array 的利用率达到 70%,那么就扩张一倍,并对所有 key 进行 rehash 后,重新排列每个 key 的位置。

    AppendOnlyMap 还有一个 destructiveSortedIterator(): Iterator[(K, V)] 方法,可以返回 Array 中排序后的 (K, V) pairs。实现方法很简单:先将所有 (K, V) pairs compact 到 Array 的前端,并使得每个 (K, V) 占一个位置(原来占两个),之后直接调用 Array.sort() 排序,不过这样做会破坏数组(key 的位置变化了)。

    2. ExternalAppendOnlyMap

    相比 AppendOnlyMap,ExternalAppendOnlyMap 的实现略复杂,但逻辑其实很简单,类似 Hadoop MapReduce 中的 shuffle-merge-combine-sort 过程:

    ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 过程与原始的 AppendOnlyMap 一模一样。如果 AppendOnlyMap 快被装满时检查一下内存剩余空间是否可以够扩展,够就直接在内存中扩展,不够就 sort 一下 AppendOnlyMap,将其内部所有 records 都 spill 到磁盘上。图中 spill 了 4 次,每次 spill 完在磁盘上生成一个 spilledMap 文件,然后重新 new 出来一个 AppendOnlyMap。最后一个 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 来的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已经被处理完,因为每次 insert 的时候,新来的 record 只与 AppendOnlyMap 中的 records 进行 aggregate,并不是与所有的 records 进行 aggregate(一些 records 已经被 spill 到磁盘上了)。因此当需要 aggregate 的最终结果时,需要对 AppendOnlyMap 和所有的 spilledMaps 进行全局 merge-aggregate。

    全局 merge-aggregate 的流程也很简单:先将 AppendOnlyMap 中的 records 进行 sort,形成 sortedMap。然后利用 DestructiveSortedIterator 和 DiskMapIterator 分别从 sortedMap 和各个 spilledMap 读出一部分数据(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key),所以图中第一个 spilledMap 只读出前三个 records 进入 StreamBuffer。mergeHeap 顾名思义就是使用堆排序不断提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并将其一个个放入 mergeBuffers 中,放入的时候与已经存在于 mergeBuffers 中的 StreamBuffer 进行 merge-combine,第一个被放入 mergeBuffers 的 StreamBuffer 被称为 minBuffer,那么 minKey 就是 minBuffer 中第一个 record 的 key。当 merge-combine 的时候,与 minKey 相同的 records 被 aggregate 一起,然后输出。整个 merge-combine 在 mergeBuffers 中结束后,StreamBuffer 剩余的 records 随着 StreamBuffer 重新进入 mergeHeap。一旦某个 StreamBuffer 在 merge-combine 后变为空(里面的 records 都被输出了),那么会使用 DestructiveSortedIterator 或 DiskMapIterator 重新装填 hash(key) 相同的 records,然后再重新进入 mergeHeap。

    整个 insert-merge-aggregate 的过程有三点需要进一步探讨一下:

    • 内存剩余空间检测

      与 Hadoop MapReduce 规定 reducer 中 70% 的空间可用于 shuffle-sort 类似,Spark 也规定 executor 中 spark.shuffle.memoryFraction * spark.shuffle.safetyFraction 的空间(默认是0.3 * 0.8)可用于 ExternalOnlyAppendMap。Spark 略保守是不是?更保守的是这 24% 的空间不是完全用于一个 ExternalOnlyAppendMap 的,而是由在 executor 上同时运行的所有 reducer 共享的。为此,exectuor 专门持有一个 ShuffleMemroyMap: HashMap[threadId, occupiedMemory] 来监控每个 reducer 中 ExternalOnlyAppendMap 占用的内存量。每当 AppendOnlyMap 要扩展时,都会计算 ShuffleMemroyMap 持有的所有 reducer 中的 AppendOnlyMap 已占用的内存 + 扩展后的内存 是会否会大于内存限制,大于就会将 AppendOnlyMap spill 到磁盘。有一点需要注意的是前 1000 个 records 进入 AppendOnlyMap 的时候不会启动是否要 spill 的检查,需要扩展时就直接在内存中扩展。

    • AppendOnlyMap 大小估计

      为了获知 AppendOnlyMap 占用的内存空间,可以在每次扩展时都将 AppendOnlyMap reference 的所有 objects 大小都算一遍,然后加和,但这样做非常耗时。所以 Spark 设计了粗略的估算算法,算法时间复杂度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小变化及一共 insert 的 records 的个数来估算大小,具体见 SizeTrackingAppendOnlyMap 和 SizeEstimator

    • Spill 过程

      与 shuffle write 一样,在 spill records 到磁盘上的时候,会建立一个 buffer 缓冲区,大小仍为 spark.shuffle.file.buffer.kb ,默认是 32KB。另外,由于 serializer 也会分配缓冲区用于序列化和反序列化,所以如果一次 serialize 的 records 过多的话缓冲区会变得很大。Spark 限制每次 serialize 的 records 个数为 spark.shuffle.spill.batchSize,默认是 10000。

    Discussion

    通过本章的介绍可以发现,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加灵活,会根据不同的 transformation() 的语义去设计不同的 shuffle-aggregate 策略,再加上不同的内存数据结构来混搭出合理的执行流程。

    这章主要讨论了 Spark 是怎么在不排序 records 的情况下完成 shuffle write 和 shuffle read,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通信角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。

    Spark的Cache和Checkpoint区别和联系拾遗

    Spark Job 逻辑执行图和数据依赖解析

    Spark Job 物理执行图详解

    欢迎点赞+收藏+转发朋友圈素质三连

    文章不错?点个【在看】吧! 

    展开全文
  • SparkShuffle过程和MR-shuffle过程
  • spark shuffle过程

    千次阅读 2018-01-19 11:23:01
    1. SparkShuffle1. SparkShuffle概念reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是对的形式,这样每一个key对应一个聚合起来的value。问题:聚合之前,...
  • shuffle过程:由ShuffleManager负责,计算引擎HashShuffleManager(Spark 1.2)—>SortShuffleManager spark根据shuffle类算子进行stage的划分,当执行某个shuffle类算子(reduceByKey、join)时,算子之前的...
  • Sparkshuffle过程

    2021-07-06 14:45:22
    Shuffle过程发生在宽依赖切分Stage的过程中,前一个Stage称作ShuffleMap Stage,后一个Stage称作Result Stage。 HashShuffle原理 未经优化的HashShuffle 1.Map Task将数据写入buffer缓冲区,待缓冲区达到阈值时开始...
  • SparkShuffle过程详解

    千次阅读 2021-08-05 17:11:17
    由于分布式计算中,每个阶段的各个计算节点只处理任务的一部分数据,若下一个阶段需要依赖前面阶段的所有计算结果时,则需要对前面阶段的所有计算结果进行重新整合和分类,这就需要经历shuffle过程。 在spark中,RDD...
  • Spark中的shuffle过程与Mapreduce的Shuffle过程很多概念都很类似。在spakr中,如果发生了宽依赖,前一个stage 的 ShuffleMapTask 进行 shuffle write, 把数据存储在 blockManager 上面, 并且把数据位置元信息上报...
  • 一: 理解shuffle 二: shuffle write 2.1 shuffle write的目标 2.2 shuffle write的位置 2.3 桶(bucket) 2.4 默认分区算法 2.5 bucket数量太多的解决方案 三: shuffle read 3.1 在什么时候fetch 3.2 边...
  • Most recent failure reason: **org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 18** at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2....
  • 关于spark shuffle过程的理解

    千次阅读 2018-07-03 22:26:20
    shuffle过程:由ShuffleManager负责,计算引擎HashShuffleManager(Spark 1.2)—&gt;SortShuffleManagerspark根据shuffle类算子进行stage的划分,当执行某个shuffle类算子(reduceByKey、join)时,算子之前的...
  • Shuffle描述着数据从map task输出到reduce task输入的这段过程shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。因为...
  • spark rdd 在shuffle过程中,涉及数据重组和重新分区,主要是根据key值,把相同的key值分配到同一个区。需要注意的是,因为分区的数量是有限的,所以会有不同的key分到相同的分区,这个过程主要是用hash算法实现。 ...
  • Spark 在DAG调度阶段会将一个 Job 划分为多个 Stage,上游 Stage 做 map 工作,下游 Stage 做 reduce 工作,其本质上还是 MapReduce 计算框架。Shuffle 是连接 map 和 reduce 之间的桥梁,它将 map 的输出对应到 ...
  • Sparkshuffle过程详解

    千次阅读 2018-11-29 11:47:57
    sparkshuffle有几种方式: 什么是shuffle Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。 前一个stage的ShuffleMapTask进行shuffle write,把数据存储在...
  • spark shuffle对比hive shuffle

    千次阅读 2019-04-07 22:26:45
    Spark 和 Hadoop一直是大数据离线计算的必经之路,自己在工作中也经常用到,所以学习一下原理还是很有必要的,不然碰到问题很容易一脸懵逼,其中感觉shuffle是两者的核心之一,故整理下,方便以后回顾。 大数据的...
  • MapReduce的Shuffle和SparkShuffle过程对比MapReduce MapReduce MapReduce计算模型分为map和reduce两个重要阶段,map是映射,负责数据的过滤分发。reduce是规约,负责数据的计算归并,map将数据传递给reduce,...
  • spark1.1以前只有hashshuffle,1.1版本引入了sortshuffle,1.2版本以后默认方式改为sort方式,2.0版本以后移除了hashshuffle。 HashShuffle 执行原理: Map阶段的shuffle是为了下一个stage的task拉取数据作的。 ...
  • SparkShuffle总结分析

    2021-01-07 10:14:33
    在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。而Spark也会有自己的shuffle实现过程。 1.2 Spark中的 shuffle 介绍 在DAG调度的过程中,Stage 阶段的
  • 目录shuffle为什么要有shuffleshuffle分类Shuffle WriteShuffle Readshuffle可能会面临的问题HashShuffle优化解决问题reduce分区数决定因素SortShuffle shuffle 为什么要有shuffle shuffle:为了让相同的key进入同一...
  • Spark shuffle 过程详解

    2019-10-24 10:24:17
    Shuffle 过程分为map 端的write 和 reducer 端的read 两阶段 Shuffle write 端发展史从 hashShuffleManager(默认spark1.2之前) 和 到 sortShuffleManger HashShuffleManager 分为普通shuffle 和 consilodate机制...
  • Spark Shuffle 优化

    千次阅读 2022-04-30 16:18:15
    Spark Shuffle
  • Spark Shuffle Shuffle Write Shuffle Fetch and Aggregator  在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序...
  • Spark Shuffle

    千次阅读 2022-04-01 13:50:56
    Spark Shuffle Spark Shuffle是发生在宽依赖(Shuffle Dependency)的情况下,上游Stage和下游Stage之间传递数据的一种机制。Shuffle解决的问题是如何将数据重新组织,使其能够在上游和下游task之间进行传递和计算。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 21,517
精华内容 8,606
关键字:

spark的shuffle过程

友情链接: GABPNET.rar