shuffle 订阅
曳步舞,又称鬼步舞。它起源于澳大利亚墨尔本。它属于一种力量型舞蹈,是一种拖着脚走的舞步,动作快速有力,音乐强悍有震撼力(主要是电子舞曲,多为Trance, hardstyle,house, hardcore ),舞蹈充满动感活力,极具现场渲染力。曳步舞有很多种风格,各风格的性质不一样,曳步舞的主要风格有:硬派、MAS、AUS、BASS、MARK、自由风等(因国籍与地域不同,很多相同风格的叫法略有不同)。 展开全文
曳步舞,又称鬼步舞。它起源于澳大利亚墨尔本。它属于一种力量型舞蹈,是一种拖着脚走的舞步,动作快速有力,音乐强悍有震撼力(主要是电子舞曲,多为Trance, hardstyle,house, hardcore ),舞蹈充满动感活力,极具现场渲染力。曳步舞有很多种风格,各风格的性质不一样,曳步舞的主要风格有:硬派、MAS、AUS、BASS、MARK、自由风等(因国籍与地域不同,很多相同风格的叫法略有不同)。
信息
起源地
澳大利亚墨尔本
基本步法
奔跑,侧滑,花式
起源时间
20世纪80年代
类似舞种
Jumpstyle Free step、Cwalk
中文名
曳(yè)步舞
主要风格
硬派、MAS、AUS、MARK、BASS
外文名
Shuffle
曳步舞舞蹈历史
曳步舞由于发源地是澳大利亚墨尔本,于是也被叫做:墨尔本曳步舞(MelbourneShuffle)。是20世纪80年代起源于锐舞派对的一种舞步,发源于澳大利亚墨尔本地下派对,基本动作是一个快速切换,流畅完成的两步动作,舞蹈风格适合各种类型的电子音乐,例如:House , Trance ,Techno ,Hard Dance , Hardstyle。MelbourneShuffle这个带前缀地区名词的名称,早期是为了区分Shuffle这个词的多元性,才用Melbourne标前缀词,区分出舞蹈。但是已经没有多少国家地区的舞者在用了。shuffling大多为欧美地区的舞者在用,但是被其他发展好的国家地区不太认同。比如俄罗斯,澳大利亚,马来西亚,就连新兴的韩国和日本有比赛的国家都大多用Shuffle。动作简洁,快速有力,节奏感强,主要表现通过双脚动作快速切换,通过滑行,踢腿,踩踏,转身等动作完成的一种即兴表演,自由度很高,个性十足的舞步。当前,中国国内流行的“广场舞鬼步舞”与曳步舞Shuffle存在本质区别。
收起全文
精华内容
下载资源
问答
  • shuffle

    2021-03-29 21:54:51
    sklearn里的shuffle   这个是目前我接触到最好用的shuffle,因为它既可以如2.1一样,打乱一个矩阵,也可以同时打乱两个变量组成的特征与标签,而且随机种子也集成了,总之就是省事。 from sklearn.utils import ...

    sklearn里的shuffle
      这个是目前我接触到最好用的shuffle,因为它既可以如2.1一样,打乱一个矩阵,也可以同时打乱两个变量组成的特征与标签,而且随机种子也集成了,总之就是省事。

    from sklearn.utils import shuffle
    X,Y = shuffle(X,Y, random_state=1337)

    展开全文
  • Flink Sort-Shuffle 实现简介

    千次阅读 2021-11-12 00:39:21
    公众号更名公告 「Flink 中文社区」更名为「Apache Flink」感谢你们的关注摘要:本文介绍 Sort-Shuffle 如何帮助 Flink 在应对大规模批数据处理任务时更加游刃...

    公众号更名公告

    「Flink 中文社区」更名为「Apache Flink

    感谢你们的关注

    摘要:本文介绍 Sort-Shuffle 如何帮助 Flink 在应对大规模批数据处理任务时更加游刃有余。主要内容包括:

    1. 数据 Shuffle 简介

    2. 引入 Sort-Shuffle 的意义

    3. Flink Sort-Shuffle 实现

    4. 测试结果

    5. 调优参数

    6. 未来展望

    Tips:FFA 峰会以及 Hackathon 比赛重磅开启,点击「阅读原文」即可报名~

    ad234d9629380d33168a2e0a8c84d111.png GitHub 地址 a417e59e4affaab2734e755bf91edb73.png

    欢迎大家关注 Flink ~

    4495c17e4ded516efd8bba793834d118.png

    Flink 作为批流一体的大数据计算引擎,大规模批数据处理也是 Flink 数据处理能力的重要组成部分。随着 Flink 的版本迭代,其批数据处理能力也在不断增强,sort-shuffle 的引入,使得 Flink 在应对大规模批数据处理任务时更加游刃有余。

    一、数据 Shuffle 简介


    数据 shuffle 是批数据处理作业的一个重要阶段,在这一阶段中,上游处理节点的输出数据会被持久化到外部存储中,之后下游的计算节点会读取这些数据并进行处理。这些持久化的数据不仅仅是一种计算节点间的数据交换形式,还在错误恢复中发挥着重要作用。

    目前,有两种批数据 shuffle 模型被现有的大规模分布式计算系统采用,分别是基于 hash 的方式以及基于 sort 的方式:

    1. 基于 hash 方式的核心思路是将发送给下游不同并发消费任务的数据写到单独的文件中,这样文件本身就成了一个自然的区分不同数据分区的边界;

    2. 基于 sort 方式的核心思路是先将所有分区的数据写在一起,然后通过 sort 来区分不同数据分区的边界。

    我们在 Flink 1.12 版本将基于 sort 的批处理 shuffle 实现引入了 Flink 并在后续进行了持续的性能与稳定性优化;到 Flink 1.13 版本,sort-shuffle 已经实现生产可用。

    二、引入 Sort-Shuffle 的意义


    我们之所以要在 Flink 中引入 sort-shuffle 的实现,一个重要的原因是 Flink 原本的基于 hash 的实现对大规模批作业不可用。这个也是被现有的其他大规模分布式计算系统所证明的:

    1. 稳定性方面:对于高并发批作业,基于 hash 的实现会产生大量的文件,并且会对这些文件进行并发读写,这会消耗很多资源并对文件系统会产生较大的压力。文件系统需要维护大量的文件元数据,会产生文件句柄以及 inode 耗尽等不稳定风险。

    2. 性能方面:对于高并发批作业,并发读写大量的文件意味着大量的随机 IO,并且每次 IO 实际读写的数据量可能是非常少的,这对于 IO 性能是一个巨大的挑战,在机械硬盘上,这使得数据 shuffle 很容易成为批处理作业的性能瓶颈。

    通过引入基于 sort 的批数据 shuffle 实现,并发读写的文件数量可以大大降低,有利于实现更好的数据顺序读写,从而能够提高 Flink 大规模批处理作业的稳定性与性能。除此之外,新的 sort-shuffle 实现还可以减小内存缓冲区的消耗。对于基于 hash 的实现,每个数据分区都需要一块读写缓冲区,内存缓冲区消耗和并发成正比。而基于 sort 的实现则可以做到内存缓冲区消耗和作业并发解耦(尽管更大的内存可能会带来更高的性能)。

    更为重要的一点是我们实现了新的存储结构与读写 IO 优化,这使得 Flink 的批数据 shuffle 相比于其他的大规模分布式数据处理系统更具优势。下面的章节会更为详细的介绍 Flink 的 sort-shuffle 实现以及所取得的结果。

    三、Flink Sort-Shuffle 实现


    和其他分布式系统的批数据 sort-shuffle 实现类似,Flink 的整个 shuffle 过程分为几个重要的阶段,包括写数据到内存缓冲区、对内存缓冲区进行排序、将排好序的数据写出到文件以及从文件中读取 shuffle 数据并发送给下游。但是,与其他系统相比,Flink 的实现有一些根本性的不同,包括多段数据存储格式、省掉数据合并流程以及数据读取 IO 调度等。这些都使得 Flink 的实现有着更优秀的表现。

    1. 设计目标

    在 Flink sort-shuffle 的整个实现过程中,我们把下面这些点作为主要的设计目标加以考量:

    ■ 1.1 减少文件数量


    正如上面所讨论的,基于 hash 的实现会产生大量的文件,而减少文件的数量有利于提高稳定性和性能。Sort-Spill-Merge 的方式被分布式计算系统广泛采纳以达到这一目标,首先将数据写入内存缓冲区,当内存缓冲区填满后对数据进行排序,排序后的数据被写出到一个文件中,这样总的文件数量是:(总数据量 / 内存缓冲区大小),从而文件数量被减少。当所有数据写出完成后,将产生的文件合并成一个文件,从而进一步减少文件数量并增大每个数据分区的大小(有利于顺序读取)。

    相比于其他系统的实现,Flink 的实现有一个重要的不同,即 Flink 始终向同一个文件中不断追加数据,而不会写多个文件再进行合并,这样的好处始终只有一个文件,文件数量实现了最小化。

    ■ 1.2 打开更少的文件

    同时打开的文件过多会消耗更多的资源,同时容易导致文件句柄不够用的问题,导致稳定性变差。因此,打开更少的文件有利于提升系统的稳定性。对于数据写出,如上所述,通过始终向同一个文件中追加数据,每个并发任务始终只打开一个文件。对于数据读取,虽然每个文件都需要被大量下游的并发任务读取,Flink 依然通过只打开文件一次,并在这些并发读取任务间共享文件句柄实现了每个文件只打开一次的目标。

    ■ 1.3 最大化顺序读写


    文件的顺序读写对文件的 IO 性能至关重要。通过减少 shuffle 文件数量,我们已经在一定程度上减少了随机文件 IO。除此之外,Flink 的批数据 sort-shuffle 还实现了更多 IO 优化来最大化文件的顺序读写。在数据写阶段,通过将要写出的数据缓冲区聚合成更大的批并通过 wtitev 系统调用写出从而实现了更好的顺序写。在数据读取阶段,通过引入读取 IO 调度,总是按照文件的偏移顺序服务数据读取请求从而最大限度的实现的文件的顺序读。实验表明这些优化极大的提升了批数据 shuffle 的性能。

    ■ 1.4 减少读写 IO 放大


    传统的 sort-spill-merge 方式通过将生成的多个文件合并成一个更大的文件从增大读取数据块的大小。这种实现方案虽然带来了好处,但也有一些不足,最终要的一点便是读写 IO 放大,对于计算节点间的数据 shuffle 而言,在不发生错误的情况下,本身只需要写入和读取数据一次,但是数据合并使得相同的数据被读写多次,从而导致 IO 总量变多,并且存储空间的消耗也会变大。

    Flink 的实现通过不断向同一个文件中追加数据以及独特的存储结构规避了文件和并的过程,虽然单个数据块的大小小于和并后的大小,但由于规避了文件合并的开销再结合 Flink 独有的 IO 调度,最终可以实现比 sort-spill-merge 方案更高的性能。

    ■ 1.5 减少内存缓冲区消耗

    类似于其他分布式计算系统中 sort-shuffle 的实现,Flink 利用一块固定大小的内存缓冲区进行数据的缓存与排序。这块内存缓冲区的大小是与并发无关的,从而使得上游 shuffle 数据写所需要的内存缓冲区大小与并发解耦。结合另一个内存管理方面的优化 FLINK-16428 可以同时实现下游 shuffle 数据读取的内存缓冲区消耗并发无关化,从而可以减少大规模批作业的内存缓冲区消耗。(注:FLINK-16428 同时适用于批作业与流作业)

    2. 实现细节

    ■ 2.1 内存数据排序

    在 shuffle 数据的 sort-spill 阶段,每条数据被首先序列化并写入到排序缓冲区中,当缓冲区被填满后,会对缓冲区中的所有二进制数据按照数据分区的顺序进行排序。此后,排好序的数据会按照数据分区的顺序被写出到文件中。虽然,目前并没有对数据本身进行排序,但是排序缓冲区的接口足够的泛化,可以实现后续潜在的更为复杂的排序要求。排序缓冲区的接口定义如下:

    
     
    public interface SortBuffer {
    
    
       */** Appends data of the specified channel to this SortBuffer. \*/*
       boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException;
    
    
       */** Copies data in this SortBuffer to the target MemorySegment. \*/*
       BufferWithChannel copyIntoSegment(MemorySegment target);
    
    
       long numRecords();
    
    
       long numBytes();
    
    
       boolean hasRemaining();
    
    
       void finish();
    
    
       boolean isFinished();
    
    
       void release();
    
    
       boolean isReleased();
     }

    在排序算法上,我们选择了复杂度较低的 bucket-sort。具体而言,每条序列化后的数据前面都会被插入一个 16 字节的元数据。包括 4 字节的长度、4 字节的数据类型以及 8 字节的指向同一数据分区中下一条数据的指针。结构如下图所示:

    e9aed1ad02f774fcfd00606d5e5b093b.png

    当从缓冲区中读取数据时,只需要按照每个数据分区的链式索引结构就可以读取到属于这个数据分区的所有数据,并且这些数据保持了数据写入时的顺序。这样按照数据分区的顺序读取所有的数据就可以达到按照数据分区排序的目标。

    ■ 2.2 文件存储结构

    如前所述,每个并行任务产生的 shuffle 数据会被写到一个物理文件中。每个物理文件包含多个数据区块(data region),每个数据区块由数据缓冲区的一次 sort-spill 生成。在每个数据区块中,所有属于不同数据分区(data partition,由下游计算节点不同并行任务消费)的数据按照数据分区的序号顺序进行排序聚合。下图展示了 shuffle 数据文件的详细结构。其中(R1,R2,R3)是 3 个不同的数据区块,分别对应 3 次数据的 sort-spill 写出。每个数据块中有 3 个不同的数据分区,分别将由(C1,C2,C3)3 个不同的并行消费任务进行读取。也就是说数据 B1.1,B2.1 及 B3.1 将由 C1 处理,数据 B1.2,B2.2 及 B3.2 将由 C2 处理,而数据 B1.3,B2.3 及 B3.3 将由 C3 处理。

    edd950a58fb7cffd66c648f45b92f6e2.png

    类似于其他的分布式处理系统实现,在 Flink 中,每个数据文件还对应一个索引文件。索引文件用来在读取时为每个消费者索引属于它的数据(data partition)。索引文件包含和数据文件相同的 data region,在每个 data region 中有与 data partition 相同数量的索引项,每个索引项包含两个部分,分别对应到数据文件的偏移量以及数据的长度。作为一个优化。Flink 为每个索引文件缓存最多 4M 的索引数据。数据文件与索引文件的对应关系如下:

    d5fe5e8954ec369c69cdbcf7039cd06a.png

    ■ 2.3 读取 IO 调度

    为了进一步提高文件 IO 性能,基于上面的存储结构,Flink 进一步引入了 IO 调度机制,类似于磁盘调度的电梯算法,Flink 的 IO 调度总是按照 IO 请求的文件偏移顺序进行调度。更具体来说,如果数据文件有 n 个 data region,每个 data region 有 m 个 data partition,同时有 m 个下游计算任务读取这一数据文件,那么下面的伪代码展示了 Flink 的 IO 调度算法的工作流程:

    
     
    *// let data_regions as the data region list indexed from 0 to n - 1*
     *// let data_readers as the concurrent downstream data readers queue indexed from 0 to m - 1*
     for (data_region in data_regions) {
       data_reader = poll_reader_of_the_smallest_file_offset(data_readers);
       if (data_reader == null)
         break;
       reading_buffers = request_reading_buffers();
       if (reading_buffers.isEmpty())
         break;
       read_data(data_region, data_reader, reading_buffers);
     }

    ■ 2.4 数据广播优化

    数据广播是指发送相同的数据给下游计算节点的所有并行任务,一个常见的应用场景是 broadcast-join。Flink 的 sort-shuffle 实现对这一过程进行了优化,使得在包括内存排序缓冲区和 shuffle 文件中,广播数据只保存一份,这可以大大提升数据广播的性能。更具体来说,当写入一条广播数据到排序缓冲区时,这条数据只会被序列化并且拷贝一次,同样在将数据写出到 shuffle 文件时,也只会写一份数据。在索引文件中,对于不同 data partition 的数据索引项,他们均指向数据文件中的同一块数据。下图展示了数据广播优化的所有细节:

    cb85006690cd448ec706b81d54a8cc39.png

    ■ 2.5 数据压缩


    数据压缩是一个简单而有效的优化手段,测试结果显示数据压缩可以提高 TPC-DS 总体性能超过 30%。类似于 Flink 的基于 hash 的批处理 shuffle 实现,数据压缩是以网络缓冲区(network buffer)为单位进行的,数据压缩不跨 data partition,也就是说发给不同下游并行任务的数据分开压缩,压缩发生在数据排序后写出前,下游消费任务在收到数据后进行解压。下图展示了数据压缩的整个流程:

    cd02d7cc85adbc1b05206fd311d7b762.png

    四、测试结果


    1. 稳定性

    新的 sort-shuffle 的实现极大的提高 Flink 运行批处理作业的稳定性。除了解决了潜在的文件句柄以及 inode 耗尽的不稳定问题外,还解决了一些 Flink 原有 hash-shuffle 存在的已知问题,如 FLINK-21201(创建过多文件导致主线程阻塞),FLINK-19925(在网络 netty 线程中执行 IO 操作导致网络稳定性受到影响)等。

    2. 性能

    我们在 1000 规模的并发下运行了 TPC-DS 10T 数据规模的测试,结果表明,相比于 Flink 原本的批数据 shuffle 实现,新的数据 shuffle 实现可以实现 2-6 倍的性能提升,如果排除计算时间,只统计数据 shuffle 时间可以是先最高 10 倍的性能提升。下表展示了性能提升的详细数据:

    8f7f4135867c6bc1322e769a47c994c1.png

    在我们的测试集群上,每块机械硬盘的数据读取以及写入带宽可以达到 160MB/s:

    e6a01fbd640816c92bcb61bab33e447f.png

    注:我们的测试环境配置如下,由于我们有较大的内存,所以一些 shuffle 数据量小的作业实际数据 shuffle 仅为读写内存,因此上面的表格仅列出了一些 shuffle 数据量大,性能提升明显的查询:

    2259147bf9b6bc44bf38be22ac42496f.png


    五、调优参数

    在 Flink 中,sort-shuffle 默认是不开启的,想要开启需要调小这个参数的配置:taskmanager.network.sort-shuffle.min-parallelism。这个参数的含义是如果数据分区的个数(一个计算任务并发需要发送数据给几个下游计算节点)低于这个值,则走 hash-shuffle 的实现,如果高于这个值则启用 sort-shuffle。实际应用时,在机械硬盘上,可以配置为 1,即使用 sort-shuffle。

    Flink 没有默认开启数据压缩,对于批处理作业,大部分场景下是建议开启的,除非数据压缩率低。开启的参数为 taskmanager.network.blocking-shuffle.compression.enabled

    对于 shuffle 数据写和数据读,都需要占用内存缓冲区。其中,数据写缓冲区的大小由 taskmanager.network.sort-shuffle.min-buffers 控制,数据读缓冲区由 taskmanager.network.sort-shuffle.min-buffers 控制。数据写缓冲区从网络内存中切分出来,如果要增大数据写缓冲区可能还需要增大网络内存总大小,以避免出现网络内存不足的错误。数据读缓冲区从框架的 off-heap 内存中切分出来,如果要增大数据读缓冲区,可能还需要增大框架的 off-heap 内存,以避免出现 direct 内存 OOM 错误。一般而言更大的内存缓冲区可以带来更好的性能,对于大规模批作业,几百兆的数据写缓冲区与读缓冲区是足够的。

    六、未来展望


    还有一些后续的优化工作,包括但不限于:

    1. 网络连接复用,这可以提高网络的建立的性能与稳定性,相关 Jira 包括 FLINK-22643 以及 FLINK-15455;

    2. 多磁盘负载均衡,这有利于解决负载不均的问题,相关 Jira 包括 FLINK-21790 以及 FLINK-21789;

    3. 实现远程数据 shuffle 服务,这有利于进一步提升批数据 shuffle 的性能与稳定性;

    4. 允许用户选择磁盘类型,这可以提高易用性,用户可以根据作业的优先级选择使用 HDD 或者 SSD。

    英文原文链接:

    https://flink.apache.org/2021/10/26/sort-shuffle-part1.html

    https://flink.apache.org/2021/10/26/sort-shuffle-part2.html


    12 月 4-5 日,Flink Forward Asia 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴;

    另有首届 Flink Forward Asia Hackathon 正式启动,10W 奖金等你来!

    点击文末「阅读原文」即可免费报名~

    bd9029a224b0bf88cb9eda6023035c59.png

    ▼ 关注「ApacheFlink」视频号,遇见更多大咖 ▼

    更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~

    02a946504548361267bb65f3b191fb5e.png

     976401d369c6a1a68bb63cf5e1ecb5c8.gif  戳我,报名 FFA 2021 大会!

    展开全文
  • 在 MapReduce 框架中, Shuffle 阶段是连接 Map 与 Reduce 之间的桥梁, Map 阶段通过 Shuffle 过程将数据输出到 Reduce 阶段中。由于 Shu...

    在 MapReduce 框架中, Shuffle 阶段是连接 Map 与 Reduce 之间的桥梁, Map 阶段通过 Shuffle 过程将数据输出到 Reduce 阶段中。由于 Shuffle 涉及磁盘的读写和网络 I/O,因此 Shuffle 性能的高低直接影响整个程序的性能。Spark 也有 Map 阶段和 Reduce 阶段,因此也会出现 Shuffle 。

    Spark Shuffle

    Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。先介绍下它们的发展历程,有助于我们更好的理解 Shuffle:

    在 Spark 1.1 之前, Spark 中只实现了一种 Shuffle 方式,即基于 Hash 的 Shuffle 。在 Spark 1.1 版本中引入了基于 Sort 的 Shuffle 实现方式,并且 Spark 1.2 版本之后,默认的实现方式从基于 Hash 的 Shuffle 修改为基于 Sort 的 Shuffle 实现方式,即使用的 ShuffleManager 从默认的 hash 修改为 sort。在 Spark 2.0 版本中, Hash Shuffle 方式己经不再使用

    Spark 之所以一开始就提供基于 Hash 的 Shuffle 实现机制,其主要目的之一就是为了避免不需要的排序,大家想下 Hadoop 中的 MapReduce,是将 sort 作为固定步骤,有许多并不需要排序的任务,MapReduce 也会对其进行排序,造成了许多不必要的开销。

    在基于 Hash 的 Shuffle 实现方式中,每个 Mapper 阶段的 Task 会为每个 Reduce 阶段的 Task 生成一个文件,通常会产生大量的文件(即对应为 M*R 个中间文件,其中, M 表示 Mapper 阶段的 Task 个数, R 表示 Reduce 阶段的 Task 个数) 伴随大量的随机磁盘 I/O 操作与大量的内存开销。

    为了缓解上述问题,在 Spark 0.8.1 版本中为基于 Hash 的 Shuffle 实现引入了 Shuffle Consolidate 机制(即文件合并机制),将 Mapper 端生成的中间文件进行合并的处理机制。通过配置属性 spark.shuffie.consolidateFiles=true,减少中间生成的文件数量。通过文件合并,可以将中间文件的生成方式修改为每个执行单位为每个 Reduce 阶段的 Task 生成一个文件。

    执行单位对应为:每个 Mapper 端的 Cores 数/每个 Task 分配的 Cores 数(默认为 1) 。最终可以将文件个数从 M*R 修改为 E*C/T*R,其中, E 表示 Executors 个数, C 表示可用 Cores 个数, T 表示 Task 分配的 Cores 数。

    Spark1.1 版本引入了 Sort Shuffle:

    基于 Hash 的 Shuffle 的实现方式中,生成的中间结果文件的个数都会依赖于 Reduce 阶段的 Task 个数,即 Reduce 端的并行度,因此文件数仍然不可控,无法真正解决问题。为了更好地解决问题,在 Spark1.1 版本引入了基于 Sort 的 Shuffle 实现方式,并且在 Spark 1.2 版本之后,默认的实现方式也从基于 Hash 的 Shuffle,修改为基于 Sort 的 Shuffle 实现方式,即使用的 ShuffleManager 从默认的 hash 修改为 sort。

    在基于 Sort 的 Shuffle 中,每个 Mapper 阶段的 Task 不会为每 Reduce 阶段的 Task 生成一个单独的文件,而是全部写到一个数据(Data)文件中,同时生成一个索引(Index)文件, Reduce 阶段的各个 Task 可以通过该索引文件获取相关的数据。避免产生大量文件的直接收益就是降低随机磁盘 I/0 与内存的开销。最终生成的文件个数减少到 2*M ,其中 M 表示 Mapper 阶段的 Task 个数,每个 Mapper 阶段的 Task 分别生成两个文件(1 个数据文件、 1 个索引文件),最终的文件个数为 M 个数据文件与 M 个索引文件。因此,最终文件个数是 2*M 个。

    从 Spark 1.4 版本开始,在 Shuffle 过程中也引入了基于 Tungsten-Sort 的 Shuffie 实现方式,通 Tungsten 项目所做的优化,可以极大提高 Spark 在数据处理上的性能。(Tungsten 翻译为中文是钨丝)

    注:在一些特定的应用场景下,采用基于 Hash 实现 Shuffle 机制的性能会超过基于 Sort 的 Shuffle 实现机制。

    一张图了解下 Spark Shuffle 的迭代历史:

    Spark Shuffle 迭代历史

    为什么 Spark 最终还是放弃了 HashShuffle ,使用了 Sorted-Based Shuffle?

    我们可以从 Spark 最根本要优化和迫切要解决的问题中找到答案,使用 HashShuffle 的 Spark 在 Shuffle 时产生大量的文件。当数据量越来越多时,产生的文件量是不可控的,这严重制约了 Spark 的性能及扩展能力,所以 Spark 必须要解决这个问题,减少 Mapper 端 ShuffleWriter 产生的文件数量,这样便可以让 Spark 从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模。

    但使用 Sorted-Based Shuffle 就完美了吗,答案是否定的,Sorted-Based Shuffle 也有缺点,其缺点反而是它排序的特性,它强制要求数据在 Mapper 端必须先进行排序,所以导致它排序的速度有点慢。好在出现了 Tungsten-Sort Shuffle ,它对排序算法进行了改进,优化了排序的速度。Tungsten-Sort Shuffle 已经并入了 Sorted-Based Shuffle,Spark 的引擎会自动识别程序需要的是 Sorted-Based Shuffle,还是 Tungsten-Sort Shuffle。

    下面详细剖析每个 Shuffle 的底层执行原理:

    一、Hash Shuffle 解析

    以下的讨论都假设每个 Executor 有 1 个 cpu core。

    1. HashShuffleManager

    shuffle write 阶段,主要就是在一个 stage 结束计算之后,为了下一个 stage 可以执行 shuffle 类的算子(比如 reduceByKey),而将每个 task 处理的数据按 key 进行“划分”。所谓“划分”,就是对相同的 key 执行 hash 算法,从而将相同 key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

    下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。比如下一个 stage 总共有 100 个 task,那么当前 stage 的每个 task 都要创建 100 份磁盘文件。如果当前 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,那么每个 Executor 上总共就要创建 500 个磁盘文件,所有 Executor 上会创建 5000 个磁盘文件。由此可见,未经优化的 shuffle write 操作所产生的磁盘文件的数量是极其惊人的

    shuffle read 阶段,通常就是一个 stage 刚开始时要做的事情。此时该 stage 的每一个 task 就需要将上一个 stage 的计算结果中的所有相同 key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行 key 的聚合或连接等操作。由于 shuffle write 的过程中,map task 给下游 stage 的每个 reduce task 都创建了一个磁盘文件,因此 shuffle read 的过程中,每个 reduce task 只要从上游 stage 的所有 map task 所在节点上,拉取属于自己的那一个磁盘文件即可。

    shuffle read 的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

    HashShuffleManager 工作原理如下图所示:

    未优化的HashShuffleManager工作原理

    2. 优化的 HashShuffleManager

    为了优化 HashShuffleManager 我们可以设置一个参数:spark.shuffle.consolidateFiles,该参数默认值为 false,将其设置为 true 即可开启优化机制,通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项

    开启 consolidate 机制之后,在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 stage 的 task 数量是相同的。一个 Executor 上有多少个 cpu core,就可以并行执行多少个 task。而第一批并行执行的每个 task 都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。

    当 Executor 的 cpu core 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件,也就是说,此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能

    假设第二个 stage 有 100 个 task,第一个 stage 有 50 个 task,总共还是有 10 个 Executor(Executor CPU 个数为 1),每个 Executor 执行 5 个 task。那么原本使用未经优化的 HashShuffleManager 时,每个 Executor 会产生 500 个磁盘文件,所有 Executor 会产生 5000 个磁盘文件的。但是此时经过优化之后,每个 Executor 创建的磁盘文件的数量的计算公式为:cpu core的数量 * 下一个stage的task数量,也就是说,每个 Executor 此时只会创建 100 个磁盘文件,所有 Executor 只会创建 1000 个磁盘文件。

    这个功能优点明显,但为什么 Spark 一直没有在基于 Hash Shuffle 的实现中将功能设置为默认选项呢,官方给出的说法是这个功能还欠稳定。

    优化后的 HashShuffleManager 工作原理如下图所示:

    优化后的HashShuffleManager工作原理
    基于 Hash 的 Shuffle 机制的优缺点

    优点

    • 可以省略不必要的排序开销。

    • 避免了排序所需的内存开销。

    缺点

    • 生产的文件过多,会对文件系统造成压力。

    • 大量小文件的随机读写带来一定的磁盘开销。

    • 数据块写入时所需的缓存空间也会随之增加,对内存造成压力。

    二、SortShuffle 解析

    SortShuffleManager 的运行机制主要分成三种:

    1. 普通运行机制

    2. bypass 运行机制,当 shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为 200),就会启用 bypass 机制;

    3. Tungsten Sort 运行机制,开启此运行机制需设置配置项 spark.shuffle.manager=tungsten-sort。开启此项配置也不能保证就一定采用此运行机制(后面会解释)。

    1. 普通运行机制

    在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

    在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能

    一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

    SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。

    普通运行机制的 SortShuffleManager 工作原理如下图所示:

    普通运行机制的SortShuffleManager工作原理

    2. bypass 运行机制

    Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort Shuffle 实现机制提供了一个带 Hash 风格的回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带 Hash 风格的回退计划。

    bypass 运行机制的触发条件如下:

    • shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。

    • 不是聚合类的 shuffle 算子。

    此时,每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

    该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

    而该机制与普通 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

    bypass 运行机制的 SortShuffleManager 工作原理如下图所示:

    bypass运行机制的SortShuffleManager工作原理

    3. Tungsten Sort Shuffle 运行机制

    Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。

    Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:

    对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle 方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。

    因此,当设置了 spark.shuffle.manager=tungsten-sort 时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。

    要实现 Tungsten Sort Shuffle 机制需要满足以下条件:

    1. Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。

    2. Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。

    3. Shuffle 过程中的输出分区个数少于 16777216 个。

    实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。

    所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。


    参考资料:

    • 《Spark大数据商业实战三部曲》

    •  https://spark.apache.org/docs/2.0.0/programming-guide.html#shuffle-operations

    • https://mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw

    --END--

    展开全文
  • 本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时...

    前言

    本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。

    数据倾斜调优

    调优概述

    有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

    数据倾斜发生时的现象

    • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。
    • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

    数据倾斜发生的原理

    数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行 聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万 条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整 个Spark作业的运行进度是由运行时间最长的那个task决定的。

    因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

    下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world 和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍, 而整个stage的运行速度也由运行最慢的那个task所决定。

    spark

    如何定位导致数据倾斜的代码

    数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、 groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时, 可能就是你的代码中使用了这些算子中的某一个所导致的。

    某个task执行特别慢的情况

    首先要看的,就是数据倾斜发生在第几个stage中。

    如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn- cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

    比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别 慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时 间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生 了数据倾斜。

    spark

    知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分 代码中肯定会有一个shuffle类算子。精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里我们可以介绍一个相对简单实用 的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

    这里我们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大致推算出一个stage对应的代码。如下示例,在整个代码中,只 有一个reduceByKey是会发生shuffle的算子,因此就可以认为,以这个算子为界限,会划分出前后两个stage。

    • stage0,主要是执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,我们可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内。
    • stage1,主要是执行从reduceByKey到collect操作,stage1的各个task一开始运行,就会首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作, 在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。
    1. val conf = new SparkConf()
    2. val sc = new SparkContext(conf)
    3.  
    4. val lines = sc.textFile("hdfs://...")
    5. val words = lines.flatMap(_.split(" "))
    6. val pairs = words.map((_, 1))
    7. val wordCounts = pairs.reduceByKey(_ + _)
    8.  
    9. wordCounts.collect().foreach(println(_))

    通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边 界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包 括了reduceByKey这个shuffle类算子,此时基本就可以确定是由educeByKey算子导致的数据倾斜问题。比如某个单词出现了100万 次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

    某个task莫名其妙内存溢出的情况

    这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn- cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有 shuffle类算子,此时很可能就是这个算子导致了数据倾斜。

    但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢 出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

    查看导致数据倾斜的key的数据分布情况

    知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布 情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解 决。

    此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:

    1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
    2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计 出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

    举例来说,对于上面所说的单词计数程序,如果确定了是stage1的reduceByKey算子导致了数据倾斜,那么就应该看看进行 reduceByKey操作的RDD中的key分布情况,在这个例子中指的就是pairs RDD。如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印 样本数据中各个key的出现次数。

    1. val sampledPairs = pairs.sample(false, 0.1)
    2. val sampledWordCounts = sampledPairs.countByKey()
    3. sampledWordCounts.foreach(println(_))

    数据倾斜的解决方案

    解决方案一:使用Hive ETL预处理数据

    方案适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

    方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的 Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

    方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么 肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

    方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

    方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

    方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的 场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。

    项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度 太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6 倍以上。

    解决方案二:过滤少数导致倾斜的key

    方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

    方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干 脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用 sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

    方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

    方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

    方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

    方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然 OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个 key之后,直接在程序中将那些key给过滤掉。

    解决方案三:提高shuffle操作的并行度

    方案适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。

    方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

    方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个 key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。

    方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

    方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

    方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有 100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。 所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

    spark

    解决方案四:两阶段聚合(局部聚合+全局聚合)

    方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

    方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机 数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

    方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

    方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

    方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

    spark

     
    1. // 第一步,给RDD中的每个key都打上一个随机前缀。
    2. JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
    3. new PairFunction<Tuple2<Long,Long>, String, Long>() {
    4. private static final long serialVersionUID = 1L;
    5. @Override
    6. public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
    7. throws Exception {
    8. Random random = new Random();
    9. int prefix = random.nextInt(10);
    10. return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
    11. }
    12. });
    13.  
    14. // 第二步,对打上随机前缀的key进行局部聚合。
    15. JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
    16. new Function2<Long, Long, Long>() {
    17. private static final long serialVersionUID = 1L;
    18. @Override
    19. public Long call(Long v1, Long v2) throws Exception {
    20. return v1 + v2;
    21. }
    22. });
    23.  
    24. // 第三步,去除RDD中每个key的随机前缀。
    25. JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
    26. new PairFunction<Tuple2<String,Long>, Long, Long>() {
    27. private static final long serialVersionUID = 1L;
    28. @Override
    29. public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
    30. throws Exception {
    31. long originalKey = Long.valueOf(tuple._1.split("_")[1]);
    32. return new Tuple2<Long, Long>(originalKey, tuple._2);
    33. }
    34. });
    35.  
    36. // 第四步,对去除了随机前缀的RDD进行全局聚合。
    37. JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
    38. new Function2<Long, Long, Long>() {
    39. private static final long serialVersionUID = 1L;
    40. @Override
    41. public Long call(Long v1, Long v2) throws Exception {
    42. return v1 + v2;
    43. }
    44. });

    解决方案五:将reduce join转为map join

    方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

    方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操 作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存 中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量 数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

    方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数 据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。

    方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

    方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会 比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那 么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

    spark

     
    1. // 首先将数据量比较小的RDD的数据,collect到Driver中来。
    2. List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
    3. // 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
    4. // 可以尽可能节省内存空间,并且减少网络传输性能开销。
    5. final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
    6.  
    7. // 对另外一个RDD执行map类操作,而不再是join类操作。
    8. JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
    9. new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
    10. private static final long serialVersionUID = 1L;
    11. @Override
    12. public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
    13. throws Exception {
    14. // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
    15. List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
    16. // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
    17. Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
    18. for(Tuple2<Long, Row> data : rdd1Data) {
    19. rdd1DataMap.put(data._1, data._2);
    20. }
    21. // 获取当前RDD数据的key以及value。
    22. String key = tuple._1;
    23. String value = tuple._2;
    24. // 从rdd1数据Map中,根据key获取到可以join到的数据。
    25. Row rdd1Value = rdd1DataMap.get(key);
    26. return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
    27. }
    28. });
    29.  
    30. // 这里得提示一下。
    31. // 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
    32. // 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
    33. // rdd2中每条数据都可能会返回多条join后的数据。

    解决方案六:采样倾斜key并分拆join操作

    方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么 此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另 一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

    方案实现思路:

    • 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
    • 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
    • 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
    • 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
    • 而另外两个普通的RDD就照常join即可。
    • 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

    方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

    方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

    方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

    spark

     
    1. // 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
    2. JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);
    3.  
    4. // 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
    5. // 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
    6. // 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
    7. JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
    8. new PairFunction<Tuple2<Long,String>, Long, Long>() {
    9. private static final long serialVersionUID = 1L;
    10. @Override
    11. public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
    12. throws Exception {
    13. return new Tuple2<Long, Long>(tuple._1, 1L);
    14. }
    15. });
    16. JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
    17. new Function2<Long, Long, Long>() {
    18. private static final long serialVersionUID = 1L;
    19. @Override
    20. public Long call(Long v1, Long v2) throws Exception {
    21. return v1 + v2;
    22. }
    23. });
    24. JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
    25. new PairFunction<Tuple2<Long,Long>, Long, Long>() {
    26. private static final long serialVersionUID = 1L;
    27. @Override
    28. public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
    29. throws Exception {
    30. return new Tuple2<Long, Long>(tuple._2, tuple._1);
    31. }
    32. });
    33. final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
    34.  
    35. // 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
    36. JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
    37. new Function<Tuple2<Long,String>, Boolean>() {
    38. private static final long serialVersionUID = 1L;
    39. @Override
    40. public Boolean call(Tuple2<Long, String> tuple) throws Exception {
    41. return tuple._1.equals(skewedUserid);
    42. }
    43. });
    44. // 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
    45. JavaPairRDD<Long, String> commonRDD = rdd1.filter(
    46. new Function<Tuple2<Long,String>, Boolean>() {
    47. private static final long serialVersionUID = 1L;
    48. @Override
    49. public Boolean call(Tuple2<Long, String> tuple) throws Exception {
    50. return !tuple._1.equals(skewedUserid);
    51. }
    52. });
    53.  
    54. // rdd2,就是那个所有key的分布相对较为均匀的rdd。
    55. // 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
    56. // 对扩容的每条数据,都打上0~100的前缀。
    57. JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
    58. new Function<Tuple2<Long,Row>, Boolean>() {
    59. private static final long serialVersionUID = 1L;
    60. @Override
    61. public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
    62. return tuple._1.equals(skewedUserid);
    63. }
    64. }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
    65. private static final long serialVersionUID = 1L;
    66. @Override
    67. public Iterable<Tuple2<String, Row>> call(
    68. Tuple2<Long, Row> tuple) throws Exception {
    69. Random random = new Random();
    70. List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
    71. for(int i = 0; i < 100; i++) {
    72. list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
    73. }
    74. return list;
    75. }
    76.  
    77. });
    78.  
    79. // 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
    80. // 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
    81. JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
    82. new PairFunction<Tuple2<Long,String>, String, String>() {
    83. private static final long serialVersionUID = 1L;
    84. @Override
    85. public Tuple2<String, String> call(Tuple2<Long, String> tuple)
    86. throws Exception {
    87. Random random = new Random();
    88. int prefix = random.nextInt(100);
    89. return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
    90. }
    91. })
    92. .join(skewedUserid2infoRDD)
    93. .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
    94. private static final long serialVersionUID = 1L;
    95. @Override
    96. public Tuple2<Long, Tuple2<String, Row>> call(
    97. Tuple2<String, Tuple2<String, Row>> tuple)
    98. throws Exception {
    99. long key = Long.valueOf(tuple._1.split("_")[1]);
    100. return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
    101. }
    102. });
    103.  
    104. // 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
    105. JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);
    106.  
    107. // 将倾斜key join后的结果与普通key join后的结果,uinon起来。
    108. // 就是最终的join结果。
    109. JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

    解决方案七:使用随机前缀和扩容RDD进行join

    方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

    方案实现思路:

    • 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
    • 然后将该RDD的每条数据都打上一个n以内的随机前缀。
    • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
    • 最后将两个处理后的RDD进行join即可。

    方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同 key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾 斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情 况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

    方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

    方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

    方案实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。

    1. // 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
    2. JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
    3. new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
    4. private static final long serialVersionUID = 1L;
    5. @Override
    6. public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
    7. throws Exception {
    8. List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
    9. for(int i = 0; i < 100; i++) {
    10. list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
    11. }
    12. return list;
    13. }
    14. });
    15.  
    16. // 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
    17. JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
    18. new PairFunction<Tuple2<Long,String>, String, String>() {
    19. private static final long serialVersionUID = 1L;
    20. @Override
    21. public Tuple2<String, String> call(Tuple2<Long, String> tuple)
    22. throws Exception {
    23. Random random = new Random();
    24. int prefix = random.nextInt(100);
    25. return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
    26. }
    27. });
    28.  
    29. // 将两个处理后的RDD进行join即可。
    30. JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

    解决方案八:多种方案组合使用

    在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾 斜场景,那么可能需要将多种方案组合起来使用。比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据, 并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性 能。大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

     

    shuffle调优(默认是SortShuffleManager

    调优概述

    大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作 业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以 及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给 大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

    ShuffleManager发展概述

    在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

    在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

    因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于 HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个有序的磁盘文件,因此每个Task就只有一个有序的磁盘文件,并且会生成一个索引文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

    下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理。

    HashShuffleManager运行原理

    未经优化的HashShuffleManager

    下图说明了未经优化的HashShuffleManager的原理。这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

    我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个 task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文 件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

    那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建 多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个 task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有 Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

    接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个 节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

    shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数 据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

    spark

    优化后的HashShuffleManager

    下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指我们可以设置一个参 数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我 们使用HashShuffleManager,那么都建议开启这个选项。

    开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个 shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件 内。

    当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就 是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件, 这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

    假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个 task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件(当前),所有Executor会产生 5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量就是下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。(相较于SortShuffleManager,优化后的HashShuffleManager的小文件依然较多!!)

    spark

    优化前的HashShuffleManager的小文件数(每个Executor中)= 当前Executor的task数*下一个stage的task数(逻辑上,一个CPU core就是一个线程)

    Spark 1.2以前默认)优化后的HashShuffleManager的小文件(每个Executor中)= 下一个stage的task数

    Spark 1.2以后默认)SortShuffleManager的小文件(每个Executor中)=当前Executor的task数(一个task的输出最后会合并成一个文件)

     

    SortShuffleManager运行原理

    SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用 bypass机制。

    普通运行机制

    下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子, 可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内 存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一 下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

    在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是 10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的 BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存 缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

    (spark默认的序列化方式是BufferedOutputStream/BufferedInputStream)

    一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并, 这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件, 也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的 start offset与end offset。

    (MR的shuffle也是会把task溢写的多个小文件合并成一个大文件,且小文件和合并后的大文件都是有序的)

    SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共 有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此 此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

    spark

    bypass运行机制

    下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

    • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
    • 不是聚合类的shuffle算子(比如reduceByKey)。

    此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件 之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独 的索引文件。

    该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘 文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

    而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

    spark

    shuffle相关参数调优

    以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

    spark.shuffle.file.buffer

    • 默认值:32k
    • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
    • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

    spark.reducer.maxSizeInFlight

    • 默认值:48m(应该是拉取多个文件的部分数据的总大小)
    • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
    • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

    spark.shuffle.io.maxRetries

    • 默认值:3
    • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取 还是没有成功,就可能会导致作业执行失败。
    • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定 性。

    spark.shuffle.io.retryWait

    • 默认值:5s
    • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
    • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

    spark.shuffle.memoryFraction

    • 默认值:0.2
    • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
    • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

    spark.shuffle.manager

    • 默认值:sort
    • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash(HashShuffleManager)、sort(SortShuffleManager)和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的 堆外内存管理机制,内存使用效率更高。
    • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的 SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的 HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了 一些相应的bug。

    spark.shuffle.sort.bypassMergeThreshold

    • 默认值:200
    • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临 时磁盘文件都合并成一个文件,并会创建单独的索引文件。
    • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量,会按照bypass机制。|||那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁 盘文件,因此shuffle write性能有待提高。

    spark.shuffle.consolidateFiles(合并小文件)

    • 默认值:false
    • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度 合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
    • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将 spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在 实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

    写在最后的话

    本文分别讲解了开发过程中的优化原则、运行前的资源参数设置调优、运行中的数据倾斜的解决方案、为了精益求精的shuffle调优。希望大家能够在 阅读本文之后,记住这些性能调优的原则以及方案,在Spark作业开发、测试以及运行的过程中多尝试,只有这样,我们才能开发出更优的Spark作业,不 断提升其性能。

    转自:

    http://lxw1234.com/archives/2016/05/663.htm

    展开全文
  • 问题描述Pythonrandom的“shuffle方法随机化序列项”是我们在学习中会经常遇到的一个知识点,今天我们就来简单的学习一下吧!解决方案在学习这个方法时我们就要了解他是怎样构成的。第一:Python这门编程语言第二:...
  • 摘要:本文介绍 Sort-Shuffle 如何帮助 Flink 在应对大规模批数据处理任务时更加游刃有余。主要内容包括:数据 Shuffle 简介引入 Sort-Shuffle 的意义Fl...
  • Spark性能优化:Shuffle调优篇 一、调优概述 大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对...
  • 本文讨论了京东Spark计算引擎研发团队关于自主研发并落地Remote Shuffle Service,助力京东大促场景的探索和实践。近年来,大数据技术在各行各业的应用越来越广泛,Spark自UCBerkeley的AMP实验室诞生到如今3.0版本的...
  • 请关注红框部分 这两个 RDD 之间是 Shuffle 关系, 也就是说, 右边的 RDD 的一个分区可能依赖左边 RDD 的所有分区, 这样的话, 数据在这个地方流不动了, 怎么办? 第三个想法: 划分阶段 既然在 Shuffle 处...
  • 在 MapReduce 框架中, Shuffle 阶段是连接 Map 与 Reduce 之间的桥梁, Map 阶段通过 Shuffle 过程将数据输出到 Reduce 阶段中。由于 Shuffle 涉及磁盘的读写和网络 I/O,因此 Shuffle 性能的高低直接影响整个程序...
  • Spark性能调优-Shuffle调优及故障排除篇

    千次阅读 多人点赞 2021-03-26 11:42:52
    Spark调优之Shuffle调优 本节开始先讲解Shuffle核心概念;然后针对HashShuffle、SortShuffle进行调优;接下来对map端、reduce端调优;再针对Spark中的数据倾斜问题进行剖析及调优;最后是Spark运行过程中的故障排除...
  • Shuffle是分布式计算框架用来衔接上下游任务的数据重分布过程,在分布式计算中所有涉及到数据上下游衔接的过程都可以理解为shuffle。针对不同的分布式框架,shuffle有几种实现形态: 基于文件的pull based shuffle...
  • Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制...
  • Flink Sort-Shuffle写简析

    2021-07-21 21:33:30
    文章目录1、配置2、初始创建3、成员变量4、写shuffle文件4.1、获取SortBuffer4.2、追加数据4.3、buffer不足的处理4.4、buffer不足数据未读完5、关于排序5.1、segment申请5.2、writeIndex5.2.1、获取当前可用segment...
  • Flink Sort-Shuffle读简析

    2021-07-22 17:11:10
    4.2、buffersRead读取   由4.1可知,buffersRead存放了读入内存的shuffle数据,这一步放入操作是由blocking-shuffle-io线程完成的,此处简析buffersRead读取如何被下游获取。 PartitionRequestQueue....
  • Spark Shuffle详解

    2021-01-28 21:42:12
    --Spark技术内幕: 如何解决Shuffle Write一定要落盘的问题? https://blog.csdn.net/qq_34901049/article/details/103792271 Shuffle,翻译成中文就是洗牌。之所以需要Shuffle,还是因为具有某种共同特征的一类...
  • 【Spark】Shuffle详解

    2021-03-20 16:47:18
    Spark作业性能主要消耗在Shuffle环境,因为其中包含大量磁盘IO、序列化、网络数据传输等操作,如果想提升作业性能,有必要对Shuffle过程进行调优。但也要注意,影响Spark作业性能因素主要还是代码开发、资源参数以及...
  • 点击下方卡片,关注“CVer”公众号AI/CV重磅干货,第一时间送达转载自:AIWalkerShuffle Transformer: Rethinking Spatial Shuffle ...
  • python3中shuffle函数

    2021-01-14 01:56:56
    1、 shuffle函数与其他函数不一样的地方shuffle函数没有返回值!shuffle函数没有返回值!shuffle函数没有返回值!仅仅是实现了对list元素进行随机排序的一种功能请看下面的坑1.1 误认为shuffle函数会有一个返回值的...
  • shuffle过程简介

    2021-01-04 08:42:30
    shuffle: 洗牌,清洗。 // 源文件: public static void main hello hello static private asdfasdf ba c abc public 1 static 2  hello 2 *shuffle过程是MapReduce整个工作流程的核心环节 map : mapShuffle ...
  • 首先 dataloader 可以设置是否 shuffle 那么只要看 shuffle 参数对这个过程有什么影响即可 class DataLoader(Generic[T_co]): def __init__(self, dataset: Dataset[T_co], batch_size: Optional[int] = 1,
  • 79991814ns third shuffle run time : 73720515ns fourth shuffle run time: 78353061ns java shuffle run time : 64146465ns first shuffle run time : 84314386ns second shuffle run time: 80074803ns third ...
  • [SPARK-30602] SPIP: Support push-based shuffle to improve shuffle efficiency - ASF JIRA (apache.org) [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation by venkata91 · Pull ...
  • Spark Shuffle过程详解

    2021-04-15 00:14:31
    对比 Hadoop MapReduce 和 Spark 的 Shuffle 过程 如果熟悉 Hadoop MapReduce 中的 shuffle 过程,可能会按照 MapReduce 的思路去想象 Spark 的 shuffle 过程。然而,它们之间有一些区别和联系。 从 high-level 的...
  • 本文介绍了Impala在选择join方式的时候,是如何分别对broadcast和shuffle进行代价计算的。除此之外,也逐个介绍了代价计算中用到的cardinality、selectivity和row size的含义,以及各自的计算方式。需要注意的是,...
  • 洗牌算法shuffle

    2021-03-18 01:18:49
    实例1: 输入: N = 3 输出: 132 实例2: 输入: N = 5 输出: 32514 当时我的解法(写了两种方法): 写的好烂,面完和面试官交流的时候面试官让我看下Collections.shuffle的源码,于是乎就开始研究这个“洗牌算法...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 213,755
精华内容 85,502
关键字:

shuffle