一、Shuffle
Spark Shuffle的历史
1.1 以前 Hash Shuffle
1.1.x 添加 Sort Shuffle
1.5.x 添加 Unsafe Shuffle
1.6.x 合并 Unsafe Shuffle和Sort Shuffle
2.0.x 移除 Hash Shuffle
Hash Shuffle v1
每个map会为下游stage的partition写一个文件,如果有1000个分区的话会生成m×1000个临时文件,一般来说excutor都会运行多个task,另一方面一个excutor都有K个Core,那么会申请KN个文件描述符,一旦partition较多,必定会耗尽,同事也会带来 内存消耗
在reduce阶段会拉取上游产生的数据,所有的文件都需要网络来传输,又涉及到大量的文件描述符,如果reduce段有combiner的操作,需要将网络上的数据保存到HashMap中进行合并,数据量大必定会OOM
Hash Shuffle v2
在上面生成分区文件的过程中每个excutor只生成n个partition文件,对其他task生成partition进行和并,在reduce端读取分区文件的过程中,势必会造成OOM
Sort Shuffle v1
受Hadoop MapReduce Shuffle的影响,引入Sort Shuffle
在map端会按照partition id和key进行排序,将所有分区的数据写在同一个文件中,该记录首先会按照分区id顺序排序,每个分区内部按照key进行排序,map task期间会顺序写每个分区的数据,并通过索引数据记录每个分区的大小个偏移量,一个map task只开两个文件描述符,即使有K个core,也只有k×2个文件描述符
在Reduce阶段,做reduce combiner时使用ExternalAppendOnlyMap,在大数据量的情况下,做combiner时如果内存不够,会刷写磁盘,避免了OOM
解决了Hash Shuffle的弊端,但是在Shuffle过程中要对数据进行排序,所以性能有所损失
Unsafe Shuffle
钨丝计划重在解决内存和CPU的使用,刚开始是在优化Spark SQL,Shuffle也因此受益,做法是直接在序列化的二进制数据上面进行排序,减少了内存的使用和GC,在排序中使用cache-efficient sorter,使用8byte的指针,把排序转化为指针数组的排序
但是使用Unsafe Shuffle有几个限制,shuffle阶段不能有aggregate操作,分区数不能超过一定大小(224−1,这是可编码的最大parition id),所以像reduceByKey这类有aggregate操作的算子是不能使用Unsafe Shuffle,它会退化采用Sort Shuffle。
Sort Shuffle v2
在满足Unsafe Shuffle 的情况下自动使用,否则使用Sort Shuffle,从spark 2.0后移除了Hash Shuffle 也就是只有Sort Shuffle v2
评论区