精华内容
下载资源
问答
  • sparkshuffle
    2022-04-29 16:41:18

    Shuffle简介

    Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。其在MapReduce中所处的工作阶段是map输出后到reduce接收前,具体可以分为map端和reduce端前后两个部分。

    在shuffle之前,也就是在map阶段,MapReduce会对要处理的数据进行分片(split)操作,为每一个分片分配一个MapTask任务。接下来map会对每一个分片中的每一行数据进行处理得到键值对(key,value)此时得到的键值对又叫做“中间结果”。此后便进入reduce阶段,由此可以看出Shuffle阶段的作用是处理“中间结果”。

    由于Shuffle涉及到了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响到了整个程序的运行效率。

    MapReduce Shuffle

    Hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工作是从Map结束到Reduce开始之间的过程。shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle。

    Map端的shuffle

    下图是MapReduce Shuffle的官方流程:

    因为频繁的磁盘I/O操作会严重的降低效率,因此“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”,在写入的过程中进行分区(partition),也就是对于每个键值对来说,都增加了一个partition属性值,然后连同键值对一起序列化成字节数组写入到缓冲区(缓冲区采用的就是字节数组,默认大小为100M)。

    当写入的数据量达到预先设置的阙值后便会启动溢写出线程将缓冲区中的那部分数据溢出写(spill)到磁盘的临时文件中,并在写入前根据key进行排序(sort)和合并(combine,可选操作)。

    溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性指定的本地目录中。当整个map任务完成溢出写后,会对磁盘中这个map任务产生的所有临时文件(spill文件)进行归并(merge)操作生成最终的正式输出文件,此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行一次排序(sort),生成key和对应的value-list,文件归并时,如果溢写文件数量超过参数min.num.spills.for.combine的值(默认为3)时,可以再次进行合并。

    至此map端的工作已经全部结束,最终生成的文件也会存储在TaskTracker能够访问的位置。每个reduce task不间断的通过RPC从JobTracker那里获取map task是否完成的信息,如果得到的信息是map task已经完成,那么Shuffle的后半段开始启动。

     

    Reduce端的shuffle

    当mapreduce任务提交后,reduce task就不断通过RPC从JobTracker那里获取map task是否完成的信息,如果获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程就开始启动。Reduce端的shuffle主要包括三个阶段,copy、merge和reduce。

    每个reduce task负责处理一个分区的文件,以下是reduce task的处理流程:

    • reduce task从每个map task的结果文件中拉取对应分区的数据。因为数据在map阶段已经是分好区了,并且会有一个额外的索引文件记录每个分区的起始偏移量。所以reduce task取数的时候直接根据偏移量去拉取数据就ok。
    • reduce task从每个map task拉取分区数据的时候会进行再次合并,排序,按照自定义的reducer的逻辑代码去处理。
    • 最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。

    为什么要排序

    • key存在combine操作,排序之后相同的key放到一块显然方便做合并操作。
    • reduce task是按key去处理数据的。 如果没有排序那必须从所有数据中把当前相同key的所有value数据拿出来,然后进行reduce逻辑处理。显然每个key到这个逻辑都需要做一次全量数据扫描,影响性能,有了排序很方便的得到一个key对于的value集合。
    • reduce task按key去处理数据时,如果key按顺序排序,那么reduce task就按key顺序去读取,显然当读到的key是文件末尾的key那么就标志数据处理完毕。如果没有排序那还得有其他逻辑来记录哪些key处理完了,哪些key没有处理完。

    虽有千万种理由需要这么做,但是很耗资源,并且像排序其实我们有些业务并不需要排序。

    为什么要文件合并

    • 因为内存放不下就会溢写文件,就会发生多次溢写,形成很多小文件,如果不合并,显然会小文件泛滥,集群需要资源开销去管理这些小文件数据。
    • 任务去读取文件的数增多,打开的文件句柄数也会增多。
    • mapreduce是全局有序。单个文件有序,不代表全局有序,只有把小文件合并一起排序才会全局有序。

    Spark的Shuffle

    Spark的Shuffle是在MapReduce Shuffle基础上进行的调优。其实就是对排序、合并逻辑做了一些优化。在Spark中Shuffle write相当于MapReduce 的map,Shuffle read相当于MapReduce 的reduce。

    Spark丰富了任务类型,有些任务之间数据流转不需要通过Shuffle,但是有些任务之间还是需要通过Shuffle来传递数据,比如宽依赖的group by key以及各种by key算子。宽依赖之间会划分stage,而Stage之间就是Shuffle,如下图中的stage0,stage1和stage3之间就会产生Shuffle。

    Stage的划分_程序姜的博客-CSDN博客_划分stage的依据

    在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。

    Spark Shuffle发展史

    Spark 0.8及以前 Hash Based Shuffle

    Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

    Spark 0.9 引入ExternalAppendOnlyMap

    Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

    Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

    Spark 1.4 引入Tungsten-Sort Based Shuffle

    Spark 1.6 Tungsten-sort并入Sort Based Shuffle

    Spark 2.0 Hash Based Shuffle退出历史舞台

    在Spark的版本的发展,ShuffleManager在不断迭代,变得越来越先进。

    在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。

    SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

    Hash Shuffle

    HashShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是合并的运行机制。合并机制主要是通过复用buffer来优化Shuffle过程中产生的小文件的数量。Hash shuffle是不具有排序的Shuffle。

    普通机制的Hash Shuffle

    最开始使用的Hash Based Shuffle,每个Mapper会根据Reducer的数量创建对应的bucket,bucket的数量是M * R,M是map的数量,R是Reduce的数量。如下图所示:2个core 4个map task 3 个reduce task,会产生4*3=12个小文件。

     

    优化后的Hash Shuffle

    普通机制Hash Shuffle会产生大量的小文件(M * R),对文件系统的压力也很大,也不利于IO的吞吐量,后来做了优化(设置spark.shuffle.consolidateFiles=true开启,默认false),把在同一个core上的多个Mapper输出到同一个文件,这样文件数就变成core * R 个了。如下图所示:2个core 4个map task 3 个reduce task,会产生2*3=6个小文件。

    Hash shuffle合并机制的问题:如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。进而引出了更优化的sort shuffle。在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。

    Sort Shuffle

    SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

    普通机制的Sort Shuffle

    这种机制和mapreduce差不多,在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

    在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。

    一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,由于一个task就只对应一个磁盘文件因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

    SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量,由于每个task最终只有一个磁盘文件所以文件个数等于上游shuffle write个数。

    bypass机制的Sort Shuffle

     

    bypass运行机制的触发条件如下:

    • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认值200。
    • 不是聚合类的shuffle算子(比如reduceByKey)。

    此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

    该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

    而该机制与普通SortShuffleManager运行机制的不同在于:

    第一,磁盘写机制不同;

    第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

    Spark Shuffle总结

    Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。

    Shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle。

    HashShuffle又分为普通机制和合并机制,普通机制因为其会产生MR个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到CoreR个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。

    SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。

    在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager,因为HashShuffleManager会产生大量的磁盘小文件而性能低下,在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。

    SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

    Spark与MapReduce Shuffle的异同

    • 从整体功能上看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce(Spark 里可能是后续的一系列操作)。
    • 从流程的上看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine和 reduce的 records 必须先 sort。这样的好处在于 combine/reduce可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。以前 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行合并,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey的操作。在Spark 1.2之后,sort-based变为默认的Shuffle实现。
    • 从流程实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map, spill, merge, shuffle, sort, reduce等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation,所以 spill, merge, aggregate 等操作需要蕴含在 transformation中。

     

    关注微信公众号【飞哥大数据】,回复666 获取2022年100+公司面试真题,以及spark与flink面试题汇总 

    更多相关内容
  • Spark Shuffle

    千次阅读 2022-04-01 13:50:56
    Spark Shuffle Spark Shuffle是发生在宽依赖(Shuffle Dependency)的情况下,上游Stage和下游Stage之间传递数据的一种机制。Shuffle解决的问题是如何将数据重新组织,使其能够在上游和下游task之间进行传递和计算。...

    Spark Shuffle

    Spark Shuffle是发生在宽依赖(Shuffle Dependency)的情况下,上游Stage和下游Stage之间传递数据的一种机制。Shuffle解决的问题是如何将数据重新组织,使其能够在上游和下游task之间进行传递和计算。如果是单纯的数据传递,则只需要将数据进行分区、通过网络传输即可,没有太大难度,但Shuffle机制还需要进行各种类型的计算(如聚合、排序),而且数据量一般会很大。如何支持这些不同类型的计算,如何提高Shuffle的性能都是Shuffle机制设计的难点问题。

    从总体框架上来看,Spark Shuffle分为Shuffle Write和Shuffle Read两个部分,Shuffle Write解决上游Stage的输出数据的分区问题,Shuffle Read解决下游Stage从上游Stage获取数据、重新组织、并为后续操作提供数据的问题。

    Shuffle Write

    在Shuffle Write阶段,数据操作需要分区、聚合和排序三个功能,但是不同的数据操作所需要的功能不同,有些数据操作或许只需要一到两个功能。但是,Shuffle Write有一个总体的设计框架,即“map()输出->数据聚合(combine)->排序(sort)->分区”。
    在这里插入图片描述
    不同的数据操作算子有着不一样的实现,这里我讲一下不同情况下的Shuffle Write方式。

    1. 不需要聚合(combine)和排序(sort)

    这种方法只需要将数据分区即可。输出每条数据并通过hash取模(hashcode(key)%numPartitions)计算其分区id,然后按照分区id输入到不同的buffer中,每当buffer填满时就溢写到磁盘分区文件中。使用buffer是为了减少磁盘I/O次数,用缓冲提高效率。这种Shuffle Write方式叫做BypassMergeSortShuffleWriter。
    在这里插入图片描述
    这种方式的Shuffle Write优点就是速度快,不需要聚合和排序操作,直接按照分区输出,缺点就是资源消耗高,每个分区都需要一个buffer和分区文件,因此不适合过大的分区数。

    该模式适用的操作类型:map()端不需要聚合(combine)、Key不需要排序且分区个数较少(〈=spark.Shuffle.sort.bypassMergeThreshold,默认值为200)。例如,groupByKey(100),partitionBy(100),sortByKey(100)等。

    2. 不需要聚合(combine),但需要排序(sort)

    这种方式跟上一种方式比起来多了一个排序的功能,在计算出分区id后,会把数据放到一个Array中,在这个Array中,我们会让Array的Key变成分区id+Key的形式。在Spark Shuffle中,这个Array名字叫PartitionedPairBuffer。

    然后,我们按照分区id+Key来做排序,如果在接收数据过程中buffer满了,就会先扩容,如果还存放不下,那么就会将当前buffer排序后溢写到磁盘,清空buffer继续写。等待数据输出完后,再将Array和磁盘的数据做一个全局排序,得到最后一个大的排序的分区文件。这个Shuffle模式叫做SortShuffleWrite。
    在这里插入图片描述
    这个方式的Shuffle Write的优点就是可以按照分区id+Key排序,并且buffer有扩容和溢写的功能,最后会整合到一个分区文件中,减少了磁盘I/O。缺点就是排序本身提高了计算时延。

    该Shuffle模式适用的操作:map()端不需要聚合(combine)、Key需要排序、分区个数无限制。目前,Spark本身没有提供这种排序类型的数据操作,但不排除用户会自定义,或者系统未来会提供这种类型的操作。sortByKey()操作虽然需要按Key进行排序,但这个排序过程在Shuffle Read端完成即可,不需要在Shuffle Write端进行排序。

    另外,我们可以看看刚刚所说的BypassMergeSortShuffleWriter的最大问题就是分区过多(>200)会导致buffer过大、建立和打开文件数过多。在这种情况下,我们可以将SortShuffleWrite中的“按照分区id+Key来做排序”改为“只按分区id排序”的实现方式,就可以支持第一种情况中分区数过大时的问题。例如groupByKey(300)、partitionBy(300)、sortByKey(300)。

    3. 需要聚合(combine),需要或者不需要按Key进行排序(sort)

    在这种情况下,Shuffle就走完了“map()输出->数据聚合(combine)->排序(sort)->分区”的流程。

    在数据聚合阶段,Spark Shuffle会创建一个Map的结构来聚合数据。Map的数据格式是<(PID, K), V>,每次来数据的时候会按照分区id+Key来给数据做聚合,每来一条新数据就以Map的旧数据去更新Map的值。例如聚合方式是sum(),那么每次从Map中取出旧值,与新数据相加,再put到Map中。

    数据聚合后,会通过Array将数据进行排序。如果需要按照Key进行排序,那么就按照分区id+Key来排序;如果不需要按照Key进行排序,那么只按照分区id来排序。

    如果Map放不下,则会先扩容为两倍大小,如果还存放不下,就把Map中的数据排序后溢写到磁盘上,并清空Map继续聚合,这个操作可以重复多次。当数据处理完后,会把Map数据和磁盘中的数据再次聚合(merge),最后得到一个聚合与排序后的分区文件。

    该Shuffle模式的优缺点:

    优点是只需要一个Map结构就可以支持map()端的combine功能,Map具有扩容和spill到磁盘上的功能,支持小规模到大规模数据的聚合,也适用于分区个数很大的情况。在聚合后使用Array排序,可以灵活支持不同的排序需求。

    缺点是在内存中进行聚合,内存消耗较大,需要额外的数组进行排序,而且如果有数据spill到磁盘上,还需要再次进行聚合。Spark在Shuffle Write中,使用一个经过特殊设计和优化的Map,命名为PartitionedAppendOnlyMap,可以同时支持聚合和排序操作,相当于Map和Array的合体。
    在这里插入图片描述
    该Shuffle模式适用的操作:适合map()端聚合(combine)、需要或者不需要按Key进行排序、分区个数无限制的应用,如reduceByKey()、aggregateByKey()等。

    总结:

    Shuffle Write框架需要执行的3个步骤是“数据聚合→排序→分区”。

    • 如果应用中的数据操作不需要聚合,也不需要排序,而且分区个数很少,那么可以采用直接输出模式,即BypassMergeSortShuffleWriter。
    • 为了克服BypassMergeSortShuffleWriter打开文件过多、buffer分配过多的缺点,也为了支持需要按Key进行排序的操作,Spark提供了SortShuffleWriter,使用基于Array排序的方法,以分区id或分区id+Key进行排序,只输出单一的分区文件即可。
    • 最后,为了支持map()端combine操作,Spark提供了基于Map的SortShuffleWriter,将Array替换为类似HashMap的操作来支持聚合操作,在聚合后根据partitionId或分区id+Key对record进行排序,并输出分区文件。因为SortShuffleWriter按分区id进行了排序,所以被称为sort-based Shuffle Write。

    Shuffle Read

    在Shuffle Read中,也有一个总体框架:“跨节点数据获取->聚合->排序”。Reduce Task从各个Map Task端中获取属于该分区的数据,然后使用Map边获取数据边聚合。聚合完成后,放到Array根据Key进行排序,最后将结果输出或者传递给下一个操作。当然,如果不需要聚合或者排序的算子就可以省下这一两个功能。
    在这里插入图片描述

    1. 不需要聚合(combine)和排序(sort)

    这种情况只需要把各个Map Task获取的数据输出到buffer即可,是最简单的情况。

    该Shuffle模式的优缺点:优点是逻辑和实现简单,内存消耗很小。缺点是不支持聚合、排序等复杂功能。

    该Shuffle模式适用的操作:适合既不需要聚合也不需要排序的应用,如partitionBy()等。
    在这里插入图片描述

    2. 不需要聚合(combine),需要按Key排序

    这种情况下,把数据从Map Task端获取后,将buffer中的数据输出到一个Array中。这里仍然使用Shuffle Write的PartitionedPairBuffer进行排序,因此保留了分区id,即使一个Reduce Task中的分区都是相同的。

    当内存无法存下数据时,PartitionedPairBuffer就会尝试扩容,若内存仍不够,就会在排序后将数据溢写到磁盘中,当所有数据都接收到后,再将buffer中的数据和磁盘中的数据做merge sort。
    在这里插入图片描述
    该Shuffle模式的优缺点:优点是只需要一个Array结构就可以支持按照Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,不受数据规模限制。缺点是排序增加计算时延。

    该Shuffle模式适用的操作:适合reduce端不需要聚合,但需要按Key进行排序的操作,如sortByKey()、sortBy()等。

    3. 需要聚合(combine)不需要或者需要按Key进行排序(sort)

    这种情况下,获取数据后会建立一个Map来对数据做聚合(ExternalAppendOnlyMap)。聚合操作和Shuffle Write基本一致,用旧值和新数据来更新新值。

    做完聚合操作后,如果需要进行排序,那么就建立一个Array并排序,排序后将结果输出或者传递给下一步操作。

    如果Map放不下,那么会先扩容到两倍大小。如果还不够,那么会在排序后溢写到磁盘,数据都处理完后再将内存和磁盘的数据做一次聚合、排序,再将数据交给下一次操作。
    在这里插入图片描述
    该Shuffle模式的优缺点:

    优点是只需要一个Map和一个Array结构就可以支持reduce端的聚合和排序功能,Map 具有扩容和spill到磁盘上的功能,支持小规模到大规模数据的聚合。边获取数据边聚合,效率较高。

    缺点是需要在内存中进行聚合,内存消耗较大,如果有数据spill到磁盘上,还需要进行再次聚合。另外,经过HashMap聚合后的数据仍然需要拷贝到Array中进行排序,内存消耗较大。在实现中,Spark使用的HashMap是一个经过特殊优化的HashMap,命名为ExternalAppendOnlyMap,可以同时支持聚合和排序操作,相当于HashMap和Array的合体。

    该Shuffle模式适用的操作:适合reduce端需要聚合、不需要或需要按Key进行排序的操作,如reduceByKey()、aggregateByKey()等。

    总结:

    • Shuffle Read框架需要执行的3个步骤是“数据获取→聚合→排序输出”。如果应用中的数据操作不需要聚合,也不需要排序,那么获取数据后直接输出。
    • 对于需要按Key进行排序的操作,Spark 使用基于Array的方法来对Key进行排序。
    • 对于需要聚合的操作,Spark提供了基于HashMap的聚合方法,同时可以再次使用Array来支持按照Key进行排序。
    • 总体来讲,Shuffle Read框架使用的技术和数据结构与Shuffle Write过程类似,而且由于不需要分区,过程比Shuffle Write更为简单。

    在这里插入图片描述

    为高效聚合和排序所设计的数据结构

    为了提高Shuffle的聚合与排序的性能,Spark Shuffle特别设计了三种数据结构,如下图所示。这几种数据结构的基本思想是在内存中对record进行聚合和排序,如果存放不下,则进行扩容,如果还存放不下,就将数据排序后spill到磁盘上,最后将磁盘和内存中的数据进行聚合、排序,得到最终结果。
    在这里插入图片描述
    仔细观察Shuffle Write/Read过程,我们会发现Shuffle机制中使用的数据结构的两个特征:一是只需要支持record的插入和更新操作,不需要支持删除操作,这样我们可以对数据结构进行优化,减少内存消耗;二是只有内存放不下时才需要spill到磁盘上,因此数据结构设计以内存为主,磁盘为辅。Spark中的PartitionedAppendOnlyMap和ExternalAppendOnlyMap都基于AppendOnlyMap实现。因此,我们先介绍AppendOnlyMap的原理。

    AppendOnlyMap

    AppendOnlyMap实际上是一个只支持record添加和对Value进行更新的HashMap。与Java HashMap采用“数组+链表”实现不同,AppendOnlyMap只使用数组来存储元素,根据元素的Hash值确定存储位置,如果存储元素时发生Hash值冲突,则使用二次地址探测法(Quadratic probing)来解决Hash值冲突。

    对于每个新来的〈K,V〉record,先使用Hash(K)计算其存放位置,如果存放位置为空,就把record存放到该位置。如果该位置已经被占用,就使用二次探测法来找下一个空闲位置。对于图6.12中新来的〈K6,V6〉record来说,第1次找到的位置Hash(K6)已被K2占用。按照二次探测法向后递增1个record位置,也就是Hash(K6)+1×2,发现位置已被K3占用,然后向后递增4个record位置(指数递增,Hash(K6)+2×2),发现位置没有被占用,放进去即可。在这里插入图片描述

    扩容:AppendOnlyMap使用数组来实现的问题是,如果插入的record太多,则很快会被填满。Spark的解决方案是,如果AppendOnlyMap的利用率达到70%,那么就扩张一倍,扩张意味着原来的Hash()失效,因此对所有Key进行rehash,重新排列每个Key的位置。

    排序:由于AppendOnlyMap采用了数组作为底层存储结构,可以支持快速排序等排序算法。实现方法,如图6.13所示,先将数组中所有的〈K,V〉record转移到数组的前端,用begin和end来标示起始位置,然后调用排序算法对[begin,end]中的record进行排序。对于需要按Key进行排序的操作,如sortByKey(),可以按照Key值进行排序;对于其他操作,只按照Key的Hash值进行排序即可。
    在这里插入图片描述

    ExternalAppendOnlyMap

    AppendOnlyMap的优点是能够将聚合和排序功能很好地结合在一起,缺点是只能使用内存,难以适用于内存空间不足的情况。为了解决这个问题,Spark基于AppendOnlyMap设计实现了基于内存+磁盘的ExternalAppendOnlyMap,用于Shuffle Read端大规模数据聚合。

    ExternalAppendOnlyMap的工作原理是,先持有一个AppendOnlyMap来不断接收和聚合新来的record,AppendOnlyMap快被装满时检查一下内存剩余空间是否可以扩展,可直接在内存中扩展,不可对AppendOnlyMap中的record进行排序,然后将record都spill到磁盘上。因为record不断到来,可能会多次填满AppendOnlyMap,所以这个spill过程可以出现多次,最终形成多个spill文件。等record都处理完,此时AppendOnlyMap中可能还留存一些聚合后的record,磁盘上也有多个spill文件。因为这些数据都经过了部分聚合,还需要进行全局聚合(merge)。因此,ExternalAppendOnlyMap的最后一步是将内存中AppendOnlyMap的数据与磁盘上spill文件中的数据进行全局聚合,得到最终结果。

    上述过程中涉及3个核心问题:(1)如何获知当前AppendOnlyMap的大小?因为AppendOnlyMap中不断添加和更新record,其大小是动态变化的,什么时候会超过内存界限是难以确定的。(2)如何设计spill的文件结构,使得可以支持高效的全局聚合?(3)怎样进行全局聚合?

    1.AppendOnlyMap的大小估计

    虽然我们知道AppendOnlyMap中持有的数组的长度和大小,但数组里面存放的是Key和Value的引用,并不是它们的实际对象(object)大小,而且Value会不断被更新,实际大小不断变化。因此,想准确得到AppendOnlyMap的大小比较困难。一种简单的解决方法是在每次插入record或对现有record的Value进行更新后,都扫描一下AppendOnlyMap中存放的record,计算每个record的实际对象大小并相加,但这样会非常耗时。而且一般AppendOnlyMap会插入几万甚至几百万个record,如果每个record进入AppendOnlyMap都计算一遍,则开销会很大。Spark设计了一个增量式的高效估算算法,在每个record插入或更新时根据历史统计值和当前变化量直接估算当前AppendOnlyMap的大小,算法的复杂度是O(1),开销很小。在record插入和聚合过程中会定期对当前AppendOnlyMap中的record进行抽样,然后精确计算这些record的总大小、总个数、更新个数及平均值等,并作为历史统计值。进行抽样是因为AppendOnlyMap中的record可能有上万个,难以对每个都精确计算。之后,每当有record插入或更新时,会根据历史统计值和历史平均的变化值,增量估算AppendOnlyMap的总大小,详见Spark源码中的SizeTracker.estimateSize()方法。抽样也会定期进行,更新统计值以获得更高的精度。

    2.Spill过程与排序

    当AppendOnlyMap达到内存限制时,会将record排序后写入磁盘中。排序是为了方便下一步全局聚合(聚合内存和磁盘上的record)时可以采用更高效的merge-sort(外部排序+聚合)。那么,问题是根据什么对record进行排序的?自然想到的是根据record的Key进行排序的,但是这就要求操作定义Key的排序方法,如sortByKey()等操作定义了按照Key进行的排序。大部分操作,如groupByKey(),并没有定义Key的排序方法,也不需要输出结果是按照Key进行排序的。在这种情况下,Spark采用按照Key的Hash值进行排序的方法,这样既可以进行merge-sort,又不要求操作定义Key排序的方法。然而,这种方法的问题是会出现Hash值冲突,也就是不同的Key具有相同的Hash值。为了解决这个问题,Spark在merge-sort的同时会比较Key的Hash值是否相等,以及Key的实际值是否相等。

    前面提到过,由于最终的spill文件和内存中的AppendOnlyMap都是经过部分聚合后的结果,其中可能存在相同Key的record,因此还需要一个全局聚合阶段将AppendOnlyMap中的record与spill文件中的record进行聚合,得到最终聚合后的结果。全局聚合的方法就是建立一个最小堆或最大堆,每次从各个spill文件中读取前几个具有相同Key(或者相同Key的Hash值)的record,然后与AppendOnlyMap中的record进行聚合,并输出聚合后的结果。在图6.14中,在全局聚合时,Spark分别从4个spill文件中提取第1个〈K,V〉record,与还留在AppendOnlyMap中的第1个record组成最小堆,然后不断从最小堆中提取具有相同Key的record进行聚合(merge)。然后,Spark继续读取spill文件及AppendOnlyMap中的record填充最小堆,直到所有record处理完成。由于每个spill文件中的record是经过排序的,按顺序读取和聚合可以保证能够对每个record得到全局聚合的结果。

    总结:ExternalAppendOnlyMap是一个高性能的HashMap,只支持数据插入和更新,但可以同时利用内存和磁盘对大规模数据进行聚合和排序,满足了Shuffle Read阶段数据聚合、排序的需求。
    在这里插入图片描述

    PartitionedAppendOnlyMap

    PartitionedAppendOnlyMap用于在Shuffle Write端对record进行聚合(combine)。PartitionedAppendOnlyMap的功能和实现与ExternalAppendOnlyMap的功能和实现基本一样,唯一区别是PartitionedAppendOnlyMap中的Key是“PartitionId+Key”,这样既可以根据partitionId进行排序(面向不需要按Key进行排序的操作),也可以根据partitionId+Key进行排序(面向需要按Key进行排序的操作),从而在Shuffle Write阶段可以进行聚合、排序和分区。

    PartitionedPairBuffer

    PartitionedPairBuffer本质上是一个基于内存+磁盘的Array,随着数据添加,不断地扩容,当到达内存限制时,就将Array中的数据按照partitionId或partitionId+Key进行排序,然后spill到磁盘上,该过程可以进行多次,最后对内存中和磁盘上的数据进行全局排序,输出或者提供给下一个操作。

    参考:许利杰 方亚芬《大数据处理框架Apache Spark设计与实现》

    展开全文
  • Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章SparkShuffle过程分析:Map阶段处理流程了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个...
  • SparkShuffle.xmind

    2019-08-07 22:50:17
    SparkShuffle思维导图,xmind
  • Spark Shuffle 优化

    千次阅读 2022-04-30 16:18:15
    Spark Shuffle

    shuffle 过程

    Spark 的shuffle分为老版本的HashShuffle(现在已经弃用)和新版本的SortShuffle。Shuffle过程发生在宽依赖切分Stage的过程中,前一个Stage称作ShuffleMap Stage,后一个Stage称作Result Stage。

    1.HashShuffle 原理(未经优化)

     1. Map Task将数据写入buffer缓冲区,待缓冲区达到阈值时开始溢写文件,文件数量取决于(等于)Reduce Task的数量。Reduce Task的数量取决于(等于)上一个Stage的最后一个RDD的分区数。


    2. Reduce Task一边拉取对应分区的数据到buffer缓存中,一边进行处理,使用map方法对数据进行归并,直至所有的数据归并结束。


    3. 未经优化的Hash Shuffle每个map task都会为下游的每个reduce task创建一个磁盘文件。如果有50个map task,100个renduce task,那么就会产生5000个小文件,产生过多的小文件,极大的影响了shuffle write的性能。

    2.HashShuffle 原理(优化后)

    1. 设置spark.shuffle.consolidateFiles为true,开启优化机制(默认开启)


    2. 开始优化机制后,就不再是每个map task为下游的每个reduce task生成一个磁盘文件,而是一个executor为一个reduce task生成一个磁盘文件。


    3. consolidateFiles机制允许多个map task复用同一个磁盘文件,可以在一定程度上对map task的数据进行合并,从而大幅减小磁盘文件的数量,提升shuffle write性能。
     

    3.Sort Shuffle (普通运行机制)

             在普通模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可以选用不同的数据结构。如果是由聚合操作的shuffle算子,就是用map的数据结构(边聚合边写入内存),如果是join的算子,就使用array的数据结构(直接写入内存)。接着,每写一条数据进入内存数据结构之后,就会判断是否达到了某个临界值,如果达到了临界值的话,就会尝试的将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
            在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,排序之后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批次1万条数据的形式分批写入磁盘文件,写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

            此时task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写,会产生多个临时文件,最后会将之前所有的临时文件都进行合并,最后会合并成为一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件(标识了下游各个task的数据在文件中的start offset与end offset)。最终再由下游的task根据索引文件读取相应的数据文件。

    1. 普通运行机制下,map task会先将数据写入到内存缓冲区中,当内存缓冲区中的数据达到一定阈值时,开始进行溢写。


    2. 溢写到磁盘之前,先按照数据的key进行排序,排序后分批次写入磁盘, 默认每批次1w条数据。


    3. 数据溢写到磁盘的过程中会产生多个临时磁盘文件,临时磁盘文件会进行merge归并,最终一个map task只会生成一个归并后的磁盘文件,同时还是生成一个对应的索引文件,记录数据的offset信息。
    4.reduce task拉取对应分区的数据进行处理。

    bypass 运行机制

    1.bypass运行机制触发需要满足两个条件:①不是聚合类型的shuffle算子;②map task的数量小于bypassMergeThreshold的值,默认是200。


    2.bypass运行机制,map task数据溢写到磁盘之前不会对数据进行排序。


    3.bypass运行机制,数据的溢写规则和未经优化的hashshuffle一样,每个map task都会为下游的每个reduce task生成一个文件,但是最后会进行数据的归并,合并为一个磁盘文件。


    4.bypass机制的好处就是不对数据进行排序,节省了这一部分的资源开销。

    Shuffle优化

    1.Map端优化

    1.1. Map 端聚合

            map-side 预聚合,就是在每个节点本地对相同的 key 进行一次聚合操作,类似于 MapReduce 中的本地 combiner。map-side 预聚合之后,每个节点本地就只会有一条相同的 key,因为多条相同的 key 都被聚合起来了。其他节点在拉取所有节点上的相同 key 时,就 会大大减少需要拉取的数据数量,从而也就减少了磁盘 IO 以及网络传输开销。

            RDD 的话建议使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子。 因为 reduceByKey 和 aggregateByKey 算子都会使用用户自定义的函数对每个节点本地的相 同 key 进行预聚合。而 groupByKey 算子是不会进行预聚合的,全量的数据会在集群的各个 节点之间分发和传输,性能相对来说比较差。

    SparkSQL 本身的 HashAggregte 就会实现本地预聚合+全局聚合。

    1.2. 读取小文件优化

            读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应 产生的 Task 元信息也会给 Spark Driver 的内存造成压力,带来单点问题。设置参数:

    spark.sql.files.maxPartitionBytes=128MB 默认 128m spark.files.openCostInBytes=4194304 默认 4m

    maxPartitionBytes:一个分区最大字节数

    openCostInBytes:打开一个文件的开销

    1.3. 增大 map 溢写时输出流 buffer

            a. map端Shuffle Write有一个缓冲区,初始阈值5m,超过会尝试增加到2*当前使用 内存。如果申请不到内存,则进行溢写。这个参数是 internal,指定无效(见下方源码)。 也就是说资源足够会自动扩容,所以不需要我们去设置。

            b. 溢写时使用输出流缓冲区默认 32k,这些缓冲区减少了磁盘搜索和系统调用次数, 适当提高可以提升溢写效率。

            c. Shuffle 文件涉及到序列化,是采取批的方式读写,默认按照每批次 1 万条去读写。 设置得太低会导致在序列化时过度复制,因为一些序列化器通过增长和复制的方式来翻倍 内部数据结构。这个参数是 internal,指定无效(见下方源码)。

    综合以上分析,我们可以调整的就是输出缓冲区的大小。
    

    2.Reduce 端优化

    2.1.增大 reduce 缓冲区,减少拉取次数

            Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导 致失败会自动进行重试。对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最 大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失 败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。
    reduce 端拉取数据重试次数可以通过 spark.shuffle.io.maxRetries 参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致 作业执行失败,默认为 3.

    2.2. 调节 reduce 端拉取数据等待间隔

            Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导 致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加 大间隔时长(比如 60s),以增加 shuffle 操作的稳定性。reduce 端拉取数据等待间隔可以通过 spark.shuffle.io.retryWait 参数进行设置,默认值 为 5s。

    2.3. 输出产生小文件优化

    2.4. 动态分区插入数据

            有 Shuffle 的情况下,上面的 Task 数量 就变成了 spark.sql.shuffle.partitions(默认值 200)。那么最差情况就会有 spark.sql.shuffle.partitions * 表分区数。

            当 spark.sql.shuffle.partitions 设置过大时,小文件问题就产生了;当设置过小时,任务的并行度就下降了,性能随之受到影响。

    最理想的情况是根据分区字段进行shuffle,在上面的sql中加上distribute by aa。把同 一分区的记录都哈希到同一个分区中去,由一个 Spark 的 Task 进行写入,这样的话只会产 生 N 个文件, 但是这种情况下也容易出现数据倾斜的问题。

    3.Spark3.0 AQE

    Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution),即自适应查询执行。AQE 是Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计 划,来完成对原始查询语句的运行时优化。

    3.1. 动态合并分区

    在 Spark 中运行查询处理非常大的数据时,shuffle 通常会对查询性能产生非常重要的 影响。shuffle 是非常昂贵的操作,因为它需要进行网络传输移动数据,以便下游进行计算。

    最好的分区取决于数据,但是每个查询的阶段之间的数据大小可能相差很大,这使得 该数字难以调整:

    a. 如果分区太少,则每个分区的数据量可能会很大,处理这些数据量非常大的分区, 可能需要将数据溢写到磁盘(例如,排序和聚合),降低了查询。

    b. 如果分区太多,则每个分区的数据量大小可能很小,读取大量小的网络数据块, 这也会导致 I/O 效率低而降低了查询速度。拥有大量的 task(一个分区一个 task)也会给 Spark 任务计划程序带来更多负担。

    为了解决这个问题,我们可以在任务开始时先设置较多的 shuffle 分区个数,然后在运 行时通过查看 shuffle 文件统计信息将相邻的小分区合并成更大的分区。

    例如,假设正在运行select max(i) from tbl group by j。输入tbl很小,在分组前只有2 个分区。那么任务刚初始化时,我们将分区数设置为 5,如果没有 AQE,Spark 将启动五个 任务来进行最终聚合,但是其中会有三个非常小的分区,为每个分区启动单独的任务这样 就很浪费。

     取而代之的是,AQE 将这三个小分区合并为一个,因此最终聚只需三个 task 而不是五 个

     

    3.2. 动态切换 Join 策略

    Spark 支持多种 join 策略,其中如果 join 的一张表可以很好的插入内存,那么 broadcast shah join 通常性能最高。因此,spark join 中,如果小表小于广播大小阀值(默认 10mb),Spark 将计划进行 broadcast hash join。但是,很多事情都会使这种大小估计出错 (例如,存在选择性很高的过滤器),或者 join 关系是一系列的运算符而不是简单的扫描表 操作。

            为了解决此问题,AQE 现在根据最准确的 join 大小运行时重新计划 join 策略。从下图 实例中可以看出,发现连接的右侧表比左侧表小的多,并且足够小可以进行广播,那么 AQE 会重新优化,将 sort merge join 转换成为 broadcast hash join。

    对于运行是的broadcast hash join,可以将shuffle优化成本地shuffle,优化掉stage减少 网络传输。Broadcast hash join 可以规避 shuffle 阶段,相当于本地 join。 

    3.3. 动态优化 Join 倾斜

    当数据在群集中的分区之间分布不均匀时,就会发生数据倾斜。严重的倾斜会大大降 低查询性能,尤其对于join。AQE skew join优化会从随机shuffle文件统计信息自动检测到 这种倾斜。然后它将倾斜分区拆分成较小的子分区。

    例如,下图 A join B,A 表中分区 A0 明细大于其他分区:

     因此,skew join 会将 A0 分区拆分成两个子分区,并且对应连接 B0 分区

    没有这种优化,会导致其中一个分区特别耗时拖慢整个 stage,有了这个优化之后每个 task 耗时都会大致相同,从而总体上获得更好的性能。

    Spark3.0 增加了以下参数。

    a. spark.sql.adaptive.skewJoin.enabled :是否开启倾斜 join 检测,如果开启了,那么会 将倾斜的分区数据拆成多个分区,默认是开启的,但是得打开 aqe。

    b. spark.sql.adaptive.skewJoin.skewedPartitionFactor :默认值5,此参数用来判断分区数据量是否数据倾斜,当任务中最大数据量分区对应的数据量大于的分区中位数乘以此参数, 并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 参数,那么此任务 是数据倾斜。

    c. spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes :默认值256mb,用于判 断是否数据倾斜。

    d. spark.sql.adaptive.advisoryPartitionSizeInBytes :此参数用来告诉 spark 进行拆分后推 荐分区大小是多少。

    如果同时开启了 spark.sql.adaptive.coalescePartitions.enabled 动态合并分区功能,那么 会先合并分区,再去判断倾斜,将动态合并分区打开后,重新执行:

     3.4. Spark3.0 DPP

    Spark3.0 支持动态分区裁剪 Dynamic Partition Pruning,简称 DPP,核心思路就是先将 join 一侧作为子查询计算出来,再将其所有分区用到 join 另一侧作为表过滤条件,从而实 现对分区的动态修剪。如下图所示

    触发条件: 

    a. 待裁剪的表 join 的时候,join 条件里必须有分区字段

    b. 如果是需要修剪左表,那么join必须是inner join ,left semi join或right join,反之

    亦然。但如果是 left out join,无论右边有没有这个分区,左边的值都存在,就不需要被裁剪.

    c. 另一张表需要存在至少一个过滤条件,比如 a join b on a.key=b.key and a.id<2 参数 spark.sql.optimizer.dynamicPartitionPruning.enabled 默认开启。

    展开全文
  • Spark Shuffle过程详解

    2021-04-15 00:14:31
    点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源‍‍你需要预习:《Spark的Cache和Checkpoint区别和联系拾遗》《Spark Job 逻辑执行图和数据依赖解析》《S...

    点击上方蓝色字体,选择“设为星标

    回复”资源“获取更多资源

    ‍你需要预习:

    《Spark的Cache和Checkpoint区别和联系拾遗

    《Spark Job 逻辑执行图和数据依赖解析

    《Spark Job 物理执行图详解

    上一章里讨论了 job 的物理执行图,也讨论了流入 RDD 中的 records 是怎么被 compute() 后流到后续 RDD 的,同时也分析了 task 是怎么产生 result,以及 result 怎么被收集后计算出最终结果的。然而,我们还没有讨论数据是怎么通过 ShuffleDependency 流向下一个 stage 的?

    对比 Hadoop MapReduce 和 Spark 的 Shuffle 过程

    如果熟悉 Hadoop MapReduce 中的 shuffle 过程,可能会按照 MapReduce 的思路去想象 Spark 的 shuffle 过程。然而,它们之间有一些区别和联系。

    从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。

    从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。

    从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。

    如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑?以及两个处理逻辑应该怎么高效实现?

    Shuffle write

    由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

    shuffle write 的任务很简单,那么实现也很简单:将 shuffle write 的处理逻辑加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最后,该 stage 的 final RDD 每输出一个 record 就将其 partition 并持久化。图示如下:

    上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。每个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为spark.shuffle.file.buffer.kb ,默认是 32KB(Spark 1.1 版本以前是 100KB)。

    其实 bucket 是一个广义的概念,代表 ShuffleMapTask 输出结果经过 partition 后要存放的地方,这里为了细化数据存放位置和数据名称,仅仅用 bucket 表示缓冲区。

    ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算得到 finalRDD 中对应 partition 的 records。每得到一个 record 就将其送到对应的 bucket 里,具体是哪个 bucket 由partitioner.partition(record.getKey()))决定。每个 bucket 里面的数据会不断被写到本地磁盘上,形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 reducer 会去 fetch 属于自己的 FileSegment,进入 shuffle read 阶段。

    这样的实现很简单,但有几个问题:

    1. 产生的 FileSegment 过多。每个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。

    2. 缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了cores * R * 32 KB。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。

    目前来看,第二个问题还没有好的方法解决,因为写磁盘终究是要开缓冲区的,缓冲区太小会影响 IO 速度。但第一个问题有一些方法去解决,下面介绍已经在 Spark 里面实现的 FileConsolidation 方法。先上图:

    可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。这样,每个 worker 持有的文件数降为 cores * R。FileConsolidation 功能可以通过spark.shuffle.consolidateFiles=true来开启。

    Shuffle read

    先看一张包含 ShuffleDependency 的物理执行图,来自 reduceByKey:

    很自然地,要计算 ShuffleRDD 中的数据,必须先把 MapPartitionsRDD 中的数据 fetch 过来。那么问题就来了:

    • 在什么时候 fetch,parent stage 中的一个 ShuffleMapTask 执行完还是等全部 ShuffleMapTasks 执行完?

    • 边 fetch 边处理还是一次性 fetch 完再处理?

    • fetch 来的数据存放到哪里?

    • 怎么获得要 fetch 的数据的存放位置?

    解决问题:

    • 在什么时候 fetch?当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。理论上讲,一个 ShuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 如果其 parent stages 没有执行完,自己是不能被提交执行的),还是选择全部 ShuffleMapTasks 执行完再去 fetch。因为 fetch 来的 FileSegments 要先在内存做缓冲,所以一次 fetch 的 FileSegments 总大小不能太大。Spark 规定这个缓冲界限不能超过 spark.reducer.maxMbInFlight,这里用 softBuffer 表示,默认大小为 48MB。一个 softBuffer 里面一般包含多个 FileSegment,但如果某个 FileSegment 特别大的话,这一个就可以填满甚至超过 softBuffer 的界限。

    • 边 fetch 边处理还是一次性 fetch 完再处理?边 fetch 边处理。本质上,MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到全部数据都 shuffle-sort 后再开始 reduce()。因为 Spark 不要求 shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理。那么如何实现边 shuffle 边处理,而且流入的 records 是无序的?答案是使用可以 aggregate 的数据结构,比如 HashMap。每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 \ record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value),比如上面 WordCount 例子中的 func 就是 hashMap.get(Key) + Value,并将 func 的结果重新 put(key) 到 HashMap 中去。这个 func 功能上相当于 reduce(),但实际处理数据的方式与 MapReduce reduce() 有差别,差别相当于下面两段程序的差别。

        // MapReduce
        reduce(K key, Iterable<V> values) { 
            result = process(key, values)
            return result    
        }
      
        // Spark
        reduce(K key, Iterable<V> values) {
            result = null 
            for (V value : values) 
                result  = func(result, value)
            return result
        }

      MapReduce 可以在 process 函数里面可以定义任何数据结构,也可以将部分或全部的 values 都 cache 后再进行处理,非常灵活。而 Spark 中的 func 的输入参数是固定的,一个是上一个 record 的处理结果,另一个是当前读入的 record,它们经过 func 处理后的结果被下一个 record 处理时使用。因此一些算法比如求平均数,在 process 里面很好实现,直接sum(values)/values.length,而在 Spark 中 func 可以实现sum(values),但不好实现/values.length。更多的 func 将会在下面的章节细致分析。

    • fetch 来的数据存放到哪里?刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果spark.shuffle.spill = false就只用内存。内存使用的是AppendOnlyMap ,类似 Java 的HashMap,内存+磁盘使用的是ExternalAppendOnlyMap,如果内存空间不足时,ExternalAppendOnlyMap可以将 \

       records 进行 sort 后 spill 到磁盘上,等到需要它们的时候再进行归并,后面会详解。使用“内存+磁盘”的一个主要问题就是如何在两者之间取得平衡?在 Hadoop MapReduce 中,默认将 reducer 的 70% 的内存空间用于存放 shuffle 来的数据,等到这个空间利用率达到 66% 的时候就开始 merge-combine()-spill。在 Spark 中,也适用同样的策略,一旦 ExternalAppendOnlyMap 达到一个阈值就开始 spill,具体细节下面会讨论。
    • 怎么获得要 fetch 的数据的存放位置?在上一章讨论物理执行图中的 stage 划分的时候,我们强调 “一个 ShuffleMapStage 形成后,会将该 stage 最后一个 final RDD 注册到 MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size),这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置”。因此,reducer 在 shuffle 的时候是要去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置的。每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给 MapOutputTrackerMaster。

    至此,我们已经讨论了 shuffle write 和 shuffle read 设计的核心思想、算法及某些实现。接下来,我们深入一些细节来讨论。

    典型 transformation() 的 shuffle read

    1. reduceByKey(func)

    上面初步介绍了 reduceByKey() 是如何实现边 fetch 边 reduce() 的。需要注意的是虽然 Example(WordCount) 中给出了各个 RDD 的内容,但一个 partition 里面的 records 并不是同时存在的。比如在 ShuffledRDD 中,每 fetch 来一个 record 就立即进入了 func 进行处理。MapPartitionsRDD 中的数据是 func 在全部 records 上的处理结果。从 record 粒度上来看,reduce() 可以表示如下:

    可以看到,fetch 来的 records 被逐个 aggreagte 到 HashMap 中,等到所有 records 都进入 HashMap,就得到最后的处理结果。唯一要求是 func 必须是 commulative 的(参见上面的 Spark 的 reduce() 的代码)。

    ShuffledRDD 到 MapPartitionsRDD 使用的是 mapPartitionsWithContext 操作。

    为了减少数据传输量,MapReduce 可以在 map 端先进行 combine(),其实在 Spark 也可以实现,只需要将上图 ShuffledRDD => MapPartitionsRDD 的 mapPartitionsWithContext 在 ShuffleMapStage 中也进行一次即可,比如 reduceByKey 例子中 ParallelCollectionRDD => MapPartitionsRDD 完成的就是 map 端的 combine()。

    对比 MapReduce 的 map()-reduce() 和 Spark 中的 reduceByKey():

    • map 端的区别:map() 没有区别。对于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上进行 combine()。

    • reduce 端区别:MapReduce 的 shuffle 阶段先 fetch 数据,数据量到达一定规模后 combine(),再将剩余数据 merge-sort 后 reduce(),reduce() 非常灵活。Spark 边 fetch 边 reduce()(在 HashMap 上执行 func),因此要求 func 符合 commulative 的特性。

    从内存利用上来对比:

    • map 端区别:MapReduce 需要开一个大型环形缓冲区来暂存和排序 map() 的部分输出结果,但 combine() 不需要额外空间(除非用户自己定义)。Spark 需要 HashMap 内存数据结构来进行 combine(),同时输出 records 到磁盘上时也需要一个小的 buffer(bucket)。

    • reduce 端区别:MapReduce 需要一部分内存空间来存储 shuffle 过来的数据,combine() 和 reduce() 不需要额外空间,因为它们的输入数据分段有序,只需归并一下就可以得到。在 Spark 中,fetch 时需要 softBuffer,处理数据时如果只使用内存,那么需要 HashMap 来持有处理后的结果。如果使用内存+磁盘,那么在 HashMap 存放一部分处理后的数据。

    2. groupByKey(numPartitions)

    与 reduceByKey() 流程一样,只是 func 变成 result = result ++ record.value,功能是将每个 key 对应的所有 values 链接在一起。result 来自 hashMap.get(record.key),计算后的 result 会再次被 put 到 hashMap 中。与 reduceByKey() 的区别就是 groupByKey() 没有 map 端的 combine()。对于 groupByKey() 来说 map 端的 combine() 只是减少了重复 Key 占用的空间,如果 key 重复率不高,没必要 combine(),否则,最好能够 combine()。

    3. distinct(numPartitions)

    与 reduceByKey() 流程一样,只是 func 变成 result = result == null? record.value : result,如果 HashMap 中没有该 record 就将其放入,否则舍弃。与 reduceByKey() 相同,在map 端存在 combine()。

    4. cogroup(otherRDD, numPartitions)

    CoGroupedRDD 可能有 0 个、1 个或者多个 ShuffleDependency。但并不是要为每一个 ShuffleDependency 建立一个 HashMap,而是所有的 Dependency 共用一个 HashMap。与 reduceByKey() 不同的是,HashMap 在 CoGroupedRDD 的 compute() 中建立,而不是在 mapPartitionsWithContext() 中建立。

    粗线表示的 task 首先 new 出一个 Array[ArrayBuffer(), ArrayBuffer()],ArrayBuffer() 的个数与参与 cogroup 的 RDD 个数相同。func 的逻辑是这样的:每当从 RDD a 中 shuffle 过来一个 \ record 就将其添加到 hashmap.get(Key) 对应的 Array 中的第一个 ArrayBuffer() 中,每当从 RDD b 中 shuffle 过来一个 record,就将其添加到对应的 Array 中的第二个 ArrayBuffer()。

    CoGroupedRDD => MappedValuesRDD 对应 mapValues() 操作,就是将 [ArrayBuffer(), ArrayBuffer()] 变成 [Iterable[V], Iterable[W]]。

    5. interp(otherRDD) 和 join(otherRDD, numPartitions)

     这两个操作中均使用了 cogroup,所以 shuffle 的处理方式与 cogroup 一样。

    6. sortByKey(ascending, numPartitions)

    sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的处理逻辑与 reduceByKey() 不太一样,没有使用 HashMap 和 func 来处理 fetch 过来的 records。

    sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的处理逻辑是:将 shuffle 过来的一个个 record 存放到一个 Array 里,然后按照 Key 来对 Array 中的 records 进行 sort。

    7. coalesce(numPartitions, shuffle = true)

    coalesce() 虽然有 ShuffleDependency,但不需要对 shuffle 过来的 records 进行 aggregate,所以没有建立 HashMap。每 shuffle 一个 record,就直接流向 CoalescedRDD,进而流向 MappedRDD 中。

    Shuffle read 中的 HashMap

    HashMap 是 Spark shuffle read 过程中频繁使用的、用于 aggregate 的数据结构。Spark 设计了两种:一种是全内存的 AppendOnlyMap,另一种是内存+磁盘的 ExternalAppendOnlyMap。下面我们来分析一下两者特性及内存使用情况

    1. AppendOnlyMap

    AppendOnlyMap 的官方介绍是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是类似 HashMap,但没有remove(key)方法。其实现原理很简单,开一个大 Object 数组,蓝色部分存储 Key,白色部分存储 Value。如下图:

    当要 put(K, V) 时,先 hash(K) 找存放位置,如果存放位置已经被占用,就使用 Quadratic probing 探测方法来找下一个空闲位置。对于图中的 K6 来说,第三次查找找到 K4 后面的空闲位置,放进去即可。get(K6) 的时候类似,找三次找到 K6,取出紧挨着的 V6,与先来的 value 做 func,结果重新放到 V6 的位置。

    迭代 AppendOnlyMap 中的元素的时候,从前到后扫描输出。

    如果 Array 的利用率达到 70%,那么就扩张一倍,并对所有 key 进行 rehash 后,重新排列每个 key 的位置。

    AppendOnlyMap 还有一个 destructiveSortedIterator(): Iterator[(K, V)] 方法,可以返回 Array 中排序后的 (K, V) pairs。实现方法很简单:先将所有 (K, V) pairs compact 到 Array 的前端,并使得每个 (K, V) 占一个位置(原来占两个),之后直接调用 Array.sort() 排序,不过这样做会破坏数组(key 的位置变化了)。

    2. ExternalAppendOnlyMap

    相比 AppendOnlyMap,ExternalAppendOnlyMap 的实现略复杂,但逻辑其实很简单,类似 Hadoop MapReduce 中的 shuffle-merge-combine-sort 过程:

    ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 过程与原始的 AppendOnlyMap 一模一样。如果 AppendOnlyMap 快被装满时检查一下内存剩余空间是否可以够扩展,够就直接在内存中扩展,不够就 sort 一下 AppendOnlyMap,将其内部所有 records 都 spill 到磁盘上。图中 spill 了 4 次,每次 spill 完在磁盘上生成一个 spilledMap 文件,然后重新 new 出来一个 AppendOnlyMap。最后一个 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 来的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已经被处理完,因为每次 insert 的时候,新来的 record 只与 AppendOnlyMap 中的 records 进行 aggregate,并不是与所有的 records 进行 aggregate(一些 records 已经被 spill 到磁盘上了)。因此当需要 aggregate 的最终结果时,需要对 AppendOnlyMap 和所有的 spilledMaps 进行全局 merge-aggregate。

    全局 merge-aggregate 的流程也很简单:先将 AppendOnlyMap 中的 records 进行 sort,形成 sortedMap。然后利用 DestructiveSortedIterator 和 DiskMapIterator 分别从 sortedMap 和各个 spilledMap 读出一部分数据(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key),所以图中第一个 spilledMap 只读出前三个 records 进入 StreamBuffer。mergeHeap 顾名思义就是使用堆排序不断提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并将其一个个放入 mergeBuffers 中,放入的时候与已经存在于 mergeBuffers 中的 StreamBuffer 进行 merge-combine,第一个被放入 mergeBuffers 的 StreamBuffer 被称为 minBuffer,那么 minKey 就是 minBuffer 中第一个 record 的 key。当 merge-combine 的时候,与 minKey 相同的 records 被 aggregate 一起,然后输出。整个 merge-combine 在 mergeBuffers 中结束后,StreamBuffer 剩余的 records 随着 StreamBuffer 重新进入 mergeHeap。一旦某个 StreamBuffer 在 merge-combine 后变为空(里面的 records 都被输出了),那么会使用 DestructiveSortedIterator 或 DiskMapIterator 重新装填 hash(key) 相同的 records,然后再重新进入 mergeHeap。

    整个 insert-merge-aggregate 的过程有三点需要进一步探讨一下:

    • 内存剩余空间检测

      与 Hadoop MapReduce 规定 reducer 中 70% 的空间可用于 shuffle-sort 类似,Spark 也规定 executor 中 spark.shuffle.memoryFraction * spark.shuffle.safetyFraction 的空间(默认是0.3 * 0.8)可用于 ExternalOnlyAppendMap。Spark 略保守是不是?更保守的是这 24% 的空间不是完全用于一个 ExternalOnlyAppendMap 的,而是由在 executor 上同时运行的所有 reducer 共享的。为此,exectuor 专门持有一个 ShuffleMemroyMap: HashMap[threadId, occupiedMemory] 来监控每个 reducer 中 ExternalOnlyAppendMap 占用的内存量。每当 AppendOnlyMap 要扩展时,都会计算 ShuffleMemroyMap 持有的所有 reducer 中的 AppendOnlyMap 已占用的内存 + 扩展后的内存 是会否会大于内存限制,大于就会将 AppendOnlyMap spill 到磁盘。有一点需要注意的是前 1000 个 records 进入 AppendOnlyMap 的时候不会启动是否要 spill 的检查,需要扩展时就直接在内存中扩展。

    • AppendOnlyMap 大小估计

      为了获知 AppendOnlyMap 占用的内存空间,可以在每次扩展时都将 AppendOnlyMap reference 的所有 objects 大小都算一遍,然后加和,但这样做非常耗时。所以 Spark 设计了粗略的估算算法,算法时间复杂度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小变化及一共 insert 的 records 的个数来估算大小,具体见 SizeTrackingAppendOnlyMap 和 SizeEstimator

    • Spill 过程

      与 shuffle write 一样,在 spill records 到磁盘上的时候,会建立一个 buffer 缓冲区,大小仍为 spark.shuffle.file.buffer.kb ,默认是 32KB。另外,由于 serializer 也会分配缓冲区用于序列化和反序列化,所以如果一次 serialize 的 records 过多的话缓冲区会变得很大。Spark 限制每次 serialize 的 records 个数为 spark.shuffle.spill.batchSize,默认是 10000。

    Discussion

    通过本章的介绍可以发现,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加灵活,会根据不同的 transformation() 的语义去设计不同的 shuffle-aggregate 策略,再加上不同的内存数据结构来混搭出合理的执行流程。

    这章主要讨论了 Spark 是怎么在不排序 records 的情况下完成 shuffle write 和 shuffle read,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通信角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。

    Spark的Cache和Checkpoint区别和联系拾遗

    Spark Job 逻辑执行图和数据依赖解析

    Spark Job 物理执行图详解

    欢迎点赞+收藏+转发朋友圈素质三连

    文章不错?点个【在看】吧! 

    展开全文
  • 带你揭开 Spark shuffle 机制迷雾!
  • 我们都知道,Shuffle 操作在 Spark 中是一种昂贵的操作。在 Facebook,单个 Job 的 Shuffle 就可能往磁盘中写入 300TB 的数据;而且 shuffle reads 也是一种低效的操作,这会大大延长作业的整体执行时间,并且消耗...
  • Spark Shuffle是比较消耗性能的环节,本文对涉及shuffle过程的部分参数进行讲解,方便在使用Spark编写任务时进行调优。 spark.shuffle.file.buffer 默认值:32 参数说明:该参数用于设置shuffle write task的...
  • Spark shuffle

    2020-03-25 23:52:24
    1、什么是shuffle Shuffle中文翻译为“洗牌”,需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。把父RDD中的KV对按照Key重新分区,从而得到一个新的RDD。也就是说原本同...
  • Spark Shuffle详解

    2021-01-28 21:42:12
    --Spark技术内幕: 如何解决Shuffle Write一定要落盘的问题? https://blog.csdn.net/qq_34901049/article/details/103792271 Shuffle,翻译成中文就是洗牌。之所以需要Shuffle,还是因为具有某种共同特征的一类...
  • SparkShuffle配置调优1、Shuffle优化配置 -spark.shuffle.file.buffer2、Shuffle优化配置 -spark.reducer.maxSizeInFlight3、Shuffle优化配置 -spark.shuffle.io.maxRetries4、Shuffle优化配置 -spark.shuffle.io....
  • Spark Shuffle运行原理

    千次阅读 2020-09-07 12:48:19
    1.什么是spark shuffleShuffle中文意思就是“洗牌”,在SparkShuffle的目的是为了保证每一个key所对应的value都会汇聚到同一个分区上去聚合和处理。 Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行...
  • SparkShuffle

    2021-08-13 08:43:27
    SparkShuffle 什么是shuffle shuffle是分布式计算中不可缺少的一部分,也是计算性能消耗最严重的部分, 在spark中有些算子会触发shuffle进行分区数据的重新规划 spark中的shuffl的演变过程 -spark最早的shuffle ...
  • Spark Shuffle 过程

    2022-07-02 11:17:19
    本篇主要阐述Spark Shuffle过程,在执行 Job 任务时,无论是 MapReduce 或者 Spark Shuffle 过程都是比较消耗性能;因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。因此,在这一过程中进行调参优化,...
  • SparkSpark Shuffle 之SortShuffle

    千次阅读 2022-03-10 19:47:01
    SparkShuffle reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是对的形式,这样每一个key对应一个聚合起来的value。 问题: 聚合之前,每一个key对应的value...
  • spark shuffle对比hive shuffle

    千次阅读 2019-04-07 22:26:45
    Spark 和 Hadoop一直是大数据离线计算的必经之路,自己在工作中也经常用到,所以学习一下原理还是很有必要的,不然碰到问题很容易一脸懵逼,其中感觉shuffle是两者的核心之一,故整理下,方便以后回顾。 大数据的...
  • Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端...
  •   今天学习 Spark Shuffle。昨天文章提到了 Spark 划分 stage 时,分为了 ShuffleMapStage 和 ResultStage。没看过的可以看昨天的文章。 【SparkSpark 任务调度 在划分 stage 时: 前面的所有 stage 被称为 ...
  • Spark Shuffle FetchFailedException
  • Spark Shuffle及其调优

    2019-11-21 00:16:18
    Spark-ShuffleShuffle概述ShuffleHashShuffle机制HashShuffle概述未优化的HashShuffle机制优化的HashShuffle机制Sort-Based ShuffleSort-Based Shuffle概述 Shuffle概述   在MapReduce和Spark中都有Shuffle。对于...
  • 今天的大数据开发分享,我们就来具体讲一讲Spark ShuffleShuffle概念解读 Shuffle最早出现于MapReduce框架中,负责连接Map阶段的输出与Reduce阶段的输入。Shuffle阶段涉及磁盘IO、网络传输、内存使用等多种资源的...
  • Shuffle的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。为什么MapReduce计算...
  • 六、Spark Shuffle的配置选项(配置调优) 一、sparkshuffle调优 主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等 二、spark.shuffle.file.buffer 参数说明:该...
  • spark Shuffle Write和Read

    2020-10-08 14:35:07
    sparkshuffle主要部分就是shuffleWrite 和 shuffleReader. 大致流程 spark通过宽依赖划分stage,如果是宽依赖就需要进行shuffle操作,上游stage的shufflemaptask进行shuffleWrite,上游的write操作做的最重要的...
  • mr shufflespark shuffle的区别

    千次阅读 2020-07-22 00:47:27
    对比mr和sparkShuffle 过程有着诸多类似,例如,Shuffle 过程中,提供数据的一端被称作 Map 端,Map 端每个生成数据的任务称为 Mapper,对应的,接收数据的一端被称作 Reduce 端,Reduce 端每个拉取数据的任务称为...
  • Spark Shuffle调优指南

    2020-04-30 17:04:30
    文章目录Shuffle对性能消耗的原理详解Spark Shuffle过程中影响性能的操作:Spark 压缩算法的比较Shuffle调优指南系统架构无法避免Shuffle序列化底层释放能力JVM层Spark层面 Shuffle对性能消耗的原理详解 Spark ...
  • 目录shuffle为什么要有shuffleshuffle分类Shuffle WriteShuffle Readshuffle可能会面临的问题HashShuffle优化解决问题reduce分区数决定因素SortShuffle shuffle 为什么要有shuffle shuffle:为了让相同的key进入同一...
  • 用来更好的理解spark shuffle中的点点滴滴 分析 我们直接从SortShuffleManager着手,因为这是个shuffle的纽带: override def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 36,711
精华内容 14,684
关键字:

sparkshuffle