精华内容
下载资源
问答
  • spark中shuffle
    2021-03-07 16:28:46

    Spark 1.2以后默认用SortShuffleManager

    不同点:

    Mapreduce ShuffleSpark Shuffle
    map端大文件的索引文件没有
    map端输出的文件是否有序有序如果启用byPass机制则不会排序,反之会排序
    reduce端读取文件直接读取map输出的大文件先读取索引文件

    启用byPass机制的触发条件:

    • shuffle read task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认是200)
    • (或者)不是聚合类的shuffle算子(比如repartition、join;reduceByKey是聚合类算子)

    相同点:

    1 都是先写到内存,达到阈值时溢出多个小文件

    2 有多少个map会产生多少个大文件;上游stage有多个少task,也会产生多少个大文件;

    网上经常说的spark基于内存计算,意思是可以把反复用到的数据cache到内存中,不再反复从磁盘加载到内存,所以快。spark的shuffle,中间文件也是会落磁盘的!

    mr不会把多次用到的同一份数据cache到内存中,是多次从磁盘加载到内存。

    更多相关内容
  • Spark中shuffle类算子

    千次阅读 2019-01-09 13:53:52
    shuffle算子 3.1 去重: def distinct() def distinct(numPartitions: Int) 3.2 聚合 def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner...

    shuffle算子

     3.1 去重:
    

    def distinct()

    def distinct(numPartitions: Int)

      3.2 聚合
    

    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

    def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]

    def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]

    def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]

    def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]

    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

      3.3 排序
    

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

    def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

      3.4 重分区
    

    def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)

      3.5集合或者表操作
    

    def intersection(other: RDD[T]): RDD[T]

    def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

    def intersection(other: RDD[T], numPartitions: Int): RDD[T]

    def subtract(other: RDD[T], numPartitions: Int): RDD[T]

    def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

    def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

    def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

    def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

    def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    展开全文
  • 文章目录前言spark中shuffle机制1 shuffleReader读取数据2 shuffleWriter写数据2.1 shuffle具体写操作3 shuffle的分类3.1 HashShuffle3.1.1 未优化的HashShuffle3.1.2优化的HashShuffle3.2 SortShuffle3.2.1 普通...

    前言

    在执行Task过程中,我们知道有的算子会造成数据的打乱重组,即在这个过程中我们需要将数据落盘并且下一阶段会将数据读取,我们把整个过程叫做shuffle

    就像我们之前学的hadoop中MapReduce差不多,也会有个shuffle阶段,还记得我们当时MapReduce的shuffle是怎么操作的吗?

    如果不记得的,可以看一下我之前写的一篇关于MapReduce中shuffle的文章

    【hadoop中MR的shuffle阶段源码分析】

    好了废话不多说了,我们今天就来谈谈spark底层关于shuffle是怎么操作的以及它与hadoop中MapReduce的shuffle有什么区别

    先赞后看,养成好习惯!
    在这里插入图片描述

    spark中shuffle机制

    在划分stage时,最后一个stage称为finalStage(其实就是一个变量名,我们在源码中看到过),它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。

    ShuffleMapStage的结束伴随着shuffle文件的写磁盘,并且当有多个ShuffleMapStage时肯定也会有读取磁盘数据的操作。

    ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束。

    spark中shuffle也借鉴MapReduce的shuffle,有两个阶段map和reduce,注意这只是逻辑思路中的,和我们的map和reduce算子没有一点关系

    1 shuffleReader读取数据

    因为我们的Task都是发送到Executor端进行执行计算,所以我们先进入CoarseGrainedExecutorBackend类中的receive方法中

        case LaunchTask(data) =>
          if (executor == null) {
            exitExecutor(1, "Received LaunchTask command but executor was null")
          } else {
            val taskDesc = TaskDescription.decode(data.value)
            logInfo("Got assigned task " + taskDesc.taskId)
            taskResources(taskDesc.taskId) = taskDesc.resources
            executor.launchTask(this, taskDesc)
          }
    

    如果executor不为空则,进入launchTask方法

      def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
        val tr = new TaskRunner(context, taskDescription)
        runningTasks.put(taskDescription.taskId, tr)
        threadPool.execute(tr)
      }
    
      class TaskRunner(
          execBackend: ExecutorBackend,
          private val taskDescription: TaskDescription)
        extends Runnable
    

    我们可以看到将我们的Task任务封装为TaskRunner对象,又通过一个线程池来执行我们的任务线程

    所以我们大概应该知道,我们的Task任务肯定会有一个run方法,因为封装为TaskRunner的类继承了Runnable

    我们又知道了,我们在ShuffleMapStage阶段中的创建ShuffleMapTask,在ResultStage阶段创建了ResultTask

    ShuffleMapStage阶段就是rdd产生shuffle操作,所以划分的,所以ResultStage生成的Task肯定会读取数据

    我们进入ResultTask中

    我们发现没有run方法,只有runTask方法,按道理是不可能的,所以我们进入它的父类看看,果然有run方法

     try {
          runTask(context)
        } catch {
          case e: Throwable =>
            // Catch all errors; run task failure callbacks, and rethrow the exception.
            try {
              context.markTaskFailed(e)
            } catch {
              case t: Throwable =>
                e.addSuppressed(t)
            }
            context.markTaskCompleted(Some(e))
            throw e
        } 
    

    我们发现该run方法是final修饰的,子类是不能重写的,但是在里面调用了runTask方法,这就是模板方法设计模式

    我们进入runTask方法

        func(context, rdd.iterator(partition, context))
    
      final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        if (storageLevel != StorageLevel.NONE) {
            //如果存储等级不为空,则调用getOrCompute方法
          getOrCompute(split, context)
        } else {
          computeOrReadCheckpoint(split, context)
        }
      }
    

    进入getOrCompute方法

    computeOrReadCheckpoint(partition, context)
    
    private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
      {
        if (isCheckpointedAndMaterialized) {
          firstParent[T].iterator(split, context)
        } else {
          compute(split, context)
        }
      }
    

    我们进入compute方法

    发现方法是抽象的,我们要找其实现类,因为进行了shuffle所以肯定是ShuffleRDD,所以进入ShuffleRDD的

    compute方法

      override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
        val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
        val metrics = context.taskMetrics().createTempShuffleReadMetrics()
          //spark的环境获取的shuffle管理器获取了reader并调用read方法
        SparkEnv.get.shuffleManager.getReader(
          dep.shuffleHandle, split.index, split.index + 1, context, metrics)
          .read()
          .asInstanceOf[Iterator[(K, C)]]
      }
    

    我们可以看到这个shuffle管理器获取reader对象之后调用了read方法

    2 shuffleWriter写数据

    ShuffleMapStage的结束伴随着shuffle文件的写磁盘,所以最后ShuffleMapTask肯定会有写数据的操作

    我们进入ShuffleMapTask的run方法

    和读取数据一样,是一个模板方法涉及模式,所以我们进入runTask方法

    dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
    

    进入write方法

        var writer: ShuffleWriter[Any, Any] = null
        try {
            //创建shuffle管理器
          val manager = SparkEnv.get.shuffleManager
            //获取writer对象
          writer = manager.getWriter[Any, Any](
            dep.shuffleHandle,
            mapId,
            context,
            createMetricsReporter(context))
            //调用write方法
          writer.write(
            rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    

    创建shuffle管理器然后获取Writer对象调用write方法,这就开始写操作了

    我们应该可以知道ShuffleMapStage阶段之前也可以有ShuffleMapStage阶段所以,并不会只有write方法,肯定应该也有read方法,再仔细看write方法里面

     writer.write(
            rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    

    和上面ResultTask的何其相似,都是调用的rdd的iterator方法,我们可以知道,在写数据之前会先进行读取操作,当然如果没有就说明是自开始阶段,也就是数据来源是内存或者外部存储

    2.1 shuffle具体写操作

    我们进入write方法

     //创建mapOutputWriter对象,内部是对数据的规划以及分区的规划
    val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
          dep.shuffleId, mapId, dep.partitioner.numPartitions)
        
    //排序器对已经分好区的数据进行写操作
    sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
        
    //提交所有的分区
    val partitionLengths = mapOutputWriter.commitAllPartitions()
    

    我们看最后提交的方法commitAllPartitions

        blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
    

    接下来方法我们重点来看

    def writeIndexFileAndCommit(
          shuffleId: Int,
          mapId: Long,
          lengths: Array[Long],
          dataTmp: File): Unit = {
        
        //创建索引文件
        val indexFile = getIndexFile(shuffleId, mapId)
       
        //创建索引文件的临时文件
        val indexTmp = Utils.tempFileWith(indexFile)
        try {
            
            //创建数据文件
          val dataFile = getDataFile(shuffleId, mapId)
          synchronized {
              
              //检查索引文件和数据文件
              //如果索引文件第一个元素不为0L则返回null或者数据文件的长度等于所有块大小就返回null
            val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
            if (existingLengths != null) {
                  System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
    
                    //如果索引临时文件不为空且存在就删除
                  if (dataTmp != null && dataTmp.exists()) {
                    dataTmp.delete()
                  }
    		else{
                    //如果索引文件存在就删除
                  if (indexFile.exists()) {
                    indexFile.delete()
                  }
    
                    //如果数据文件存在就删除
                  if (dataFile.exists()) {
                    dataFile.delete()
                  }
    
                    //将索引临时文件改名为索引文件
                  if (!indexTmp.renameTo(indexFile)) {
                    throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
                  }
                  if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
                    throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
                  }
            }
          }
    

    我们仔细看完整个写入磁盘的流程,发现就是创建一个索引文件,索引临时文件和数据文件,然后就是对索引文件和数据文件的删除以及将索引临时文件改名为索引文件

    看到这个流程是不是觉得有点熟悉,我们曾经在哪个框架看到过,没错,就是kafka,kafka也会生成一个索引文件和数据文件,用来保证数据的大吞吐量,所以这也是为什么说spark比hadoop要快那么多的原因了,不仅在别的地方进行了优化,对于shuffle落盘也进行了优化,保证数据的快速读取

    3 shuffle的分类

    3.1 HashShuffle

    3.1.1 未优化的HashShuffle

    在这里插入图片描述

    以Task为单位,我们可以看到每个Task都必须生成相同Reducer个数的文件,索引当Task比较多时,我们需要生成很多个文件,我们要知道大数据对多个小文件是非常拒绝的,所以我们需要进行优化

    3.1.2优化的HashShuffle

    在这里插入图片描述

    现在我们是以CPU为单位,每个核只需要生成相同reducer 个数的文件就行,不管一个核中有多少个Task,但是我们仔细思考其实还是有很大缺陷的,当我们集群核数比较多,并且Reducer也比较多的时候文件其实还是会很多,而文件过多会进行过多的磁盘IO以及网络IO,非常消耗性能

    3.2 SortShuffle

    3.2.1 普通SortShuffle

    在这里插入图片描述

    正如我们之前看到那样,我们spark现在使用的就是SortShuffle,只会生成两个文件,一个索引文件,一个数据文件,通过索引文件来快速查找在数据文件中位置,但是我们要考虑到一些特殊情况,当我们数据量不是非常大的时候,做排序操作其实是很消耗性能的,当数据量不是很大时,我们能不能不用sort,而用另一种方式代替呢?

    3.2.2 bypass SortShuffle

    在这里插入图片描述

    结合了hashshuffle的一些特性,即当数据量不是很大时候,不需要对数据进行排序,对数据进行hash分区存储即可,和hashMap一样,当数据量不是很大时,其实hash的性能会更好一点

    3.2.3 SortShuffle的管理

    说了那么多,我们来实际看看spark底层是怎么控制和调用到底使用哪个SortShuffle呢?

    下面进入大家最喜欢的源码环节

    在上面shuffleWriter写数据阶段时,我们已经分析到具体怎么写操作的了,我们直接进入ShuffleMapTask类的runTask方法中

        dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
    
    def write(
          rdd: RDD[_],
          dep: ShuffleDependency[_, _, _],
          mapId: Long,
          context: TaskContext,
          partition: Partition): MapStatus = {
        var writer: ShuffleWriter[Any, Any] = null
        try {
          val manager = SparkEnv.get.shuffleManager
          writer = manager.getWriter[Any, Any](
            dep.shuffleHandle,
            mapId,
            context,
            createMetricsReporter(context))
            //我们知道这里就是调用写操作了
          writer.write(
            rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
          writer.stop(success = true).get
    

    我们进入write方法,发现

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-t6W6unJI-1596263606563)(C:\Users\15907\Desktop\流程或源码分析\imgs\QQ截图20200801114134.png)]

    有三个实现类,此时就是需要考虑到底使用哪个实现类的write方法了,所以我们回退到之前方法,应该是ShuffleManager创建writer的时候应该就已经判断好了

    我们回退到之前的方法

    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](
            dep.shuffleHandle,
            mapId,
            context,
            createMetricsReporter(context))
    
    override def getWriter[K, V](
          handle: ShuffleHandle,
          mapId: Long,
          context: TaskContext,
          metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
        val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
          handle.shuffleId, _ => new OpenHashSet[Long](16))
        mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
        val env = SparkEnv.get
        handle match {
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
            new UnsafeShuffleWriter(
              env.blockManager,
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf,
              metrics,
              shuffleExecutorComponents)
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              bypassMergeSortHandle,
              mapId,
              env.conf,
              metrics,
              shuffleExecutorComponents)
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
            new SortShuffleWriter(
              shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
        }
      }
    

    我们发现了是通过传递的handle参数的类型进行的模式匹配,并进行相应的操作,匹配到就创建相应的SortShuffleWriter对象,后面再调用相对应实现类的write方法

    我们往回找最早之前传递handle参数,并给handle参数赋值的方法

    writer = manager.getWriter[Any, Any](
            dep.shuffleHandle,
            mapId,
            context,
            createMetricsReporter(context))
    
    val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, this)
    
     override def registerShuffle[K, V, C](
          shuffleId: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
        if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
          new BypassMergeSortShuffleHandle[K, V](
            shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          new SerializedShuffleHandle[K, V](
            shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else {
          new BaseShuffleHandle(shuffleId, dependency)
        }
      }
    

    我们来分别分析这几种情况

    1)第一种情况

        if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
          new BypassMergeSortShuffleHandle[K, V](
            shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        }
    
    private[spark] val SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD =
        ConfigBuilder("spark.shuffle.sort.bypassMergeThreshold")
          .doc("In the sort-based shuffle manager, avoid merge-sorting data if there is no " +
            "map-side aggregation and there are at most this many reduce partitions")
          .version("1.1.1")
          .intConf
          .createWithDefault(200) 
    
    
    def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
        // We cannot bypass sorting if we need to do map-side aggregation.
          //如果需要进行map端的聚合,则无法绕过排序,返回false
        if (dep.mapSideCombine) {
          false
        } else {
          val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
            //分区数小于等于200
          dep.partitioner.numPartitions <= bypassMergeThreshold
        }
      }
    

    2)第二种情况

    else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
          new SerializedShuffleHandle[K, V](
            shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        }
    
      def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
        val shufId = dependency.shuffleId
        val numPartitions = dependency.partitioner.numPartitions
          
          //  private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
        if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
          false
            
          //如果需要进行map端的聚合,则无法绕过排序,返回false
        } else if (dependency.mapSideCombine) {
          false
            
          //  static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1;  // 16777215
          //  val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =
          //        PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
        } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
          false
            
          //其他情况下都为true
        } else {
          true
        }
      }
    

    3)第三种情况

     else {
          // Otherwise, buffer map outputs in a deserialized form:
          new BaseShuffleHandle(shuffleId, dependency)
        }
    

    上面情况不满足的情况下,就是BaseShuffleHandle,也就是SortShuffleWriter

    shuffleWriter划分总结:

    1)当需要进行map端聚合的情况下,返回false,当map端不需要进行聚合时并且分区数小于等于200(默认值)返回true,即创建BypassMergeSortShuffleWriter

    2) ①当序列化器不支持序列化对象的重定向(它是直接操作序列化后的字节数组,所以需要知道哪个部分是哪个对象),返回false

    ​ ②当map端需要进行聚合时,返回false

    ​ ③当总分区数大于16777215+1时,返回false

    ​ ④其他情况都不满足时,返回true ,即创建UnsafeShuffleWriter

    3)当上面两种情况都不满足的情况下,创建SortShuffleWriter

    自此,spark一系列的源码分析就到此结束了,小伙伴如果有什么不懂的地方欢迎评论区留言,如果spark中还有什么地方想了解的欢迎私信我,如果觉得写的不错,点个赞再走

    展开全文
  • Spark中Shuffle

    2022-06-17 17:57:20
    但是也 必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜, shuffle调优只能在整个Spark的性能调优占到一小部分而已。 在 Spark 的源码,负责 shuffle 过程的执行、计算...

       一、Spark  Shuffle 概述

          大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网 络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也 必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜, shuffle调优只能在整个Spark的性能调优中占到一小部分而已。   

         在 Spark 的源码中,负责 shuffle 过程的执行、计算和处理的组件主要就是 ShuffleManager,也即 shuffle 管理器。
         在 Spark 1.2 以前,默认的shuffle计算引擎是 HashShuffleManager。该 ShuffleManager 而 HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘 IO操作影响了性能。
         因此在Spark 1.2以后的版本中,默认的 ShuffleManager 改成了 SortShuffleManager。 SortShuffleManager 相较于 HashShuffleManager 来说,有了一定的改进。主要就在于,每个 Task 在进行 shuffle 操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并 (merge)成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个stage的 shuffle read task 拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

    二、ShuffleManager

    1.未经优化的HashShuffleManager

    每个 Executor 只有 1 个CPU core,也就是说,无论这个 Executor 上分配多少个 task 线程,同一时间都只能执行一 个 task 线程。

    我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个 stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所 谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文 件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲 填满之后,才会溢写到磁盘文件中去。

    那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的 task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task, 那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个 Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有

    Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量 是极其惊人的。

    接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的 每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所 在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个 task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节 点上,拉取属于自己的那一个磁盘文件即可。

    shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓 冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合 完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数 据到拉取完,并得到最终的结果。

    未经优化的HashShuffleManager

    2. 优化后的HashShuffleManager

    这里说的优化,是指我们可以设置一个参数, spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来 说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

    开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘 文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件 的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个 task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件 内。

    当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不 会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效 将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

    假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执 行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件, 所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数 量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建 100个磁盘文件,所有Executor只会创建1000个磁盘文件。

    优化后的HashShuffleManager

    3.SortShuffleManager的普通运行机制

    SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当 shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为 200),就会启用bypass机制

    在该模式下,数据会先写入一个内存数据结构中,此 时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算 子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle 算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会 判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据 溢写到磁盘,然后清空内存数据结构。

    在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数 据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式 分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。 BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一 次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

    一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文 件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文 件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文 件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文 件,其中标识了下游各个task的数据在文件中的start offset与end offset。

    SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage 有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每 个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁 盘文件。

    SortShuffleManager的普通运行机制

    4.SortShuffleManager-bypass机制

    bypass SortShuffleManager的原理。bypass运行机制的触发条件如下: 

    shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
    不是聚合类的shuffle算子(比如reduceByKey)。

    此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash 值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢 写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文 件。

    该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的 磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经 优化的HashShuffleManager来说,shuffle read的性能会更好。

    而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行 排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也 就节省掉了这部分的性能开销。

    SortShuffleManager-bypass机制

    三、shuffle相关参数调优

    1. spark.shuffle.file.buffer
    默认值:32k
    参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数 据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k), 从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在 实践中发现,合理调节该参数,性能会有1%~5%的提升。

    2. spark.reducer.maxSizeInFlight
    默认值:48m
    参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能 够拉取多少数据。 
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发 现,合理调节该参数,性能会有1%~5%的提升。
     

    3. spark.shuffle.io.maxRetries
    默认值:3
    参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络 异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数 之内拉取还是没有成功,就可能会导致作业执行失败。 
    调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60 次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于 针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
     

    4. spark.shuffle.io.retryWait
    默认值:5s 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。 
    调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

    5. spark.shuffle.memoryFraction
    默认值:0.2
    参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默 认是20%。 
    调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调 高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写 磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
     

    6. spark.shuffle.manager
    默认值:sort
    参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort 和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的 版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的 堆外内存管理机制,内存使用效率更高。 
    调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排 序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行 排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避 免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前 发现了一些相应的bug。
     

    7. spark.shuffle.sort.bypassMergeThreshold
    默认值:200
    参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个 阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成 一个文件,并会创建单独的索引文件。 
    调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调 大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

    8. spark.shuffle.consolidateFiles
    默认值:false 
    参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启 consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情 况下,这种方法可以极大地减少磁盘IO开销,提升性能。 
    调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以 尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启 consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高 出10%~30%。
     

    文中部分图片和文字来源于网上。

    展开全文
  • Spark中shuffle过程由ShuffleRDD触发。 shuffle的数据包含读取和写入两种操作,在一个SparkContext,所有的shuffleRDD具有一个递增的shuffleID来唯一标识Shuffle数据对应的RDD及Partition。 Spark集群由...
  • Sparkshuffle调优

    2018-12-19 15:17:25
    spark.shuffle.blockTransferService netty shuffle过程,传输数据的方式,两种选项,netty或nio,spark 1.2开始,默认就是netty,比较简单而且性能较高,spark 1.5开始nio就是过期的了,而且spark 1.6会去除掉 ...
  • hadoop阶段有存在一个环形缓冲区,当缓冲区达到阈值(默认80%)的时候,会将数据溢出到磁盘,并最终形成一个输出文件,而Spark虽然存在溢出,但不必须存在这个溢出过程,文件都是在并发在写,最后不需要合并成一个...
  • spark中shuffle算子汇总

    千次阅读 2019-08-26 23:31:07
    版本信息 spark version 2.3.3 jdk 1.8 idea 2019 MacBook Pro
  • Sparkshuffle过程

    2021-07-06 14:45:22
    Sparkshuffle分为老版本的HashShuffle(现在已经弃用)和新版本的SortShuffleShuffle过程发生在宽依赖切分Stage的过程,前一个Stage称作ShuffleMap Stage,后一个Stage称作Result Stage。 HashShuffle原理 ...
  • SparkSpark Shuffle 之SortShuffle

    千次阅读 2022-03-10 19:47:01
    SparkShuffle reduceByKey会将上一个RDD的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是对的形式,这样每一个key对应一个聚合起来的value。 问题: 聚合之前,每一个key对应的value...
  • Sparkshuffle

    2021-10-26 20:34:07
    1. Sparkshuffle阶段发生在阶段划分时,也就是宽依赖算子时。 宽依赖算子不一定发生shuffle。 2. Sparkshuffle分两个阶段,一个使Shuffle Write阶段,一个使Shuffle read阶段。 3. Shuffle Write阶段会选择分区...
  • SparkShuffle调优

    2022-02-21 20:01:21
    spark.sql.shuffle.partitions 参数说明:SQL 语句的 group by、join、distinct、partition by 都会触发 shuffle,当在 SparkSQL 的 Job 产生 Shuffle 时,默认的分区数 spark.sql.shuffle.partitions = 200...
  • SparkShuffle详解

    2021-03-20 16:47:18
    Spark作业性能主要消耗在Shuffle环境,因为其中包含大量磁盘IO、序列化、网络数据传输等操作,如果想提升作业性能,有必要对Shuffle过程进行调优。但也要注意,影响Spark作业性能因素主要还是代码开发、资源参数以及...
  • SparkShuffle总结分析

    2021-01-07 10:14:33
    一、shuffle原理分析 1.1 shuffle概述 Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。 在MapReduce框架,...1.2 Spark中shuffle 介绍 在DAG调度的过程,Stage 阶段的
  • SparkShuffle概念reduceByKey会将上一个RDD的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是&lt;key,value&gt;对的形式,这样每一个key对应一个聚合起来的value。问题:聚合...
  • Spark Shuffle过程详解

    2021-04-15 00:14:31
    点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源‍‍你需要预习:《Spark的Cache和Checkpoint区别和联系拾遗》《Spark Job 逻辑执行图和数据依赖解析》《S...
  • SparkShuffle是怎么回事

    千次阅读 2020-03-22 23:15:44
    Shuffle的中文含义是混洗,官方定义是:一种让数据重新分布以使得某些数据被放在同一分区...此篇文章从shuffle的含义开始讲起,按照spark中shuffle的几不同运行机制进行了解析,并最终附上了一些shuffle调优的建议。
  • 1)从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是...Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据aggregate 好以后进行 reduce() (Spark 里可能是后续
  • Spark Shuffle 过程

    2022-07-02 11:17:19
    本篇主要阐述Spark Shuffle过程,在执行 Job 任务时,无论是 MapReduce 或者 Spark Shuffle 过程都是比较消耗性能;因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。因此,在这一过程进行调参优化,...
  • Spark-Shuffle机制详解

    2020-04-12 21:09:02
    Shuffle机制详解 什么是Shuffleshuffle中文翻译为洗牌,需要shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。... 补充:是ShuffleManager的getWriter来...
  • Spark Shuffle详解

    2021-01-28 21:42:12
    --Spark技术内幕: 如何解决Shuffle Write一定要落盘的问题? https://blog.csdn.net/qq_34901049/article/details/103792271 Shuffle,翻译成中文就是洗牌。之所以需要Shuffle,还是因为具有某种共同特征的一类...
  • Spark Shuffle算子汇总一、去重二、聚合三、排序四、重分区五、集合或者表操作 我们在写spark时候,shuffle算子我们格外小心,因为shuffle有时候会造成数据倾斜问题,那么我们在编写代码时,要十分清楚哪些是shuffle...
  • spark1.1以前只有hashshuffle,1.1版本引入了sortshuffle,1.2版本以后默认方式改为sort方式,2.0版本以后移除了hashshuffle。 HashShuffle 执行原理: Map阶段的shuffle是为了下一个stage的task拉取数据作的。 ...
  • Spark学习(五)Spark Shuffle及内存分配

    千次阅读 2018-11-08 22:49:33
    文章目录一、什么是Spark Shuffle?...1、Shuffle中文意思就是“洗牌”,在Spark中Shuffle的目的是为了保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。 2、在Spark中,什么情况下会发生...
  • spark中shuffle的过程------不看你后悔

    千次阅读 2014-10-29 09:37:02
    Spark大会上,所有的演讲嘉宾都认为shuffle是最影响性能的地方,但是又无可奈何。之前去百度面试hadoop的时候,也被问到了这个问题,直接回答了不知道。 这篇文章主要是沿着下面几个问题来开展: 1、shuffle...
  • Spark】【Shuffle

    2022-01-08 16:22:38
    Shuffle 的核心要点 ...ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘,而ResultStage的开始伴随着从磁盘读取shuffle文件 ResultStage 基本上对应代码的 action 算子,即将一个函数应用在
  • Spark中shuffle过程与Mapreduce的Shuffle过程很多概念都很类似。在spakr,如果发生了宽依赖,前一个stage 的 ShuffleMapTask 进行 shuffle write, 把数据存储在 blockManager 上面, 并且把数据位置元信息上报...
  • Shuffle简介Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。而在MapReduceShuffle更像是洗牌的逆过程,指的是将m...
  • Spark RDD 的 shuffle 和 分区 分区的作用 RDD 使用分区来分布式并行处理数据, 并且要做到尽量少的在不同的 Executor 之间使用网络交换数据, 所以当使用 RDD 读取数据的时候, 会尽量的在物理上靠近数据源, 比如说在...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 36,651
精华内容 14,660
关键字:

spark中shuffle