spark_shuffle杂记

为什么会发生shuffle?

  1. 分布式系统中,并行计算任务涉及到聚合数据需求时,前一个任务向后一个任务传输数据的过程会涉及不同executor或不同节点间将相同key传输到一台机器上,这个过程存在大量数据读取,写入,网络传输,涉及IO,压缩,解压,序列化和反序列化等等。

所以如涉及跨节点的聚合需求时,往往就很快。

Spark会根据宽依赖把它一系列的算子划分成不同的 Stage,Stage 的内部会进行 Pipeline,Stage 与 Stage 之间进行 Shuffle,Shuffle 的过程包含三部份。

第一部份是 Shuffle 的 Write;第二部份是网络的传输;第三部份就是 Shuffle 的 Read,这三大部份涉及了内存操作、磁盘IO、网络IO以及 JVM 的管理

整体优化方向:

  • 减少数据IO,磁盘写,网络传输
  • 提高shuffle需要的内存(这里也有一个点需要注意,java gc)

shuffle的演进:

hashshuffle –》 sorted-based shuffle –》 Tungsten shuffle

spark中,将map端划分数据、持久化数据的过程称为shuffle write,而reducer端读取数据,聚合数据的过程称为shuffle read

  • map端也会发生聚合操作,需要看具体实现

Mapper端进行aggregate的好处是,减少些磁盘的写次数,减少网络传输的数据量,减少reduce task获取mapper task数据的次数

Reducer端如果内存不够,写磁盘的代价是双倍的,在mapper端无论内存够不够,它都需要先写磁盘。而reducer端在计算的时候,需要又一次的把数据从磁盘上抓回来,所以实际生产环境下需要适当的把shuffle内存调大一点。

问题变为如何高效的实现shuffle write和shuffle read

如何高效的实现shuffle write和shuffle read

【ShuffleRead】reduce阶段

shufflerdd计算过程, 需要先把mappartitionsrdd中的数据fetch过来,问题:

  1. 什么时候fetch?parent stage中的一个shuffleMapTask执行完?还是等全部shuffleMapTask执行完?
    • 为了迎合stage的概念(Spark job是按照stage线性执行的,只有当parent stage的task都执行完才能执行下一个stage),所以是parent stage的所有shuffleMapTask都执行完,才进行fetch。
  2. 边fetch边处理?还是一次性fetch完再处理?
    • 边fetch边处理。
  3. fetch来的数据存在哪里?
    • 刚fetch来的FileSegment文件放在softBuffer缓冲区中,经过处理后的文件放在内存+磁盘中,放在内存+磁盘,主要需要考虑的问题是如何在两者中间取得平衡?
  4. 怎么获取要fetch的数据的存放位置?
    • 一个shuffleMapStage形成后,会将该stage中最后一个rdd注册到MapOutputTrackerMaster中,通过MapOutputTrackerMaster.registerShuffle(shuffleid,rdd.partitions.size)方法。这个步骤很重要,因为shuffle过程中需要MapOutputTrackerMaster来执行shuffleMapTask将数据输出到什么位置。每个shuffleMapTask完成时都会将filesegment的存储位置汇报给MapOutputTrackerMaster。因此reducer在shuffle过程中读取数据的时候,要去driver里的MapOutputTrackerMaster询问shuffleMapTask数据输出的位置。

【hashshuffle】以前的方式,spark-2.1.1 版本之前就不用了,看一下它的优缺点。
hashshuffle中,shuffle wirte实现比较简单,没有做排序等额外操作,仅需要重新partition数据然后写到本地,对hashshuffle来说,遇到的问题就是如何减少IO消耗。

// TODO(SLi)这里缺少 hashshuffle 的基本结构

hashshuffle中,每条记录,通过partitioner.partition(record.getkey())来决定自己被分配到哪个缓冲区(bucket),及最后写到哪个block文件中(Filesegment)

hashshuffle的这个操作带来的问题:

shuffle write写文件的时候,内存中有两个东西,
- 有一个缓冲区buffer,
- 有一个操作本地磁盘文件的文件句柄。
也就是说,但是从shuffle write内存占用的角度,内存已经被两部分占用了。如果M*R数量很多,在shuffle过程中,这么多的内存消耗,很容易产生OOM,对GC来说也是很大的负担。hashshuffle的时候,如果reducer通过drive去mapper端读取数据时,这么多小文件需要打开对应个数的网络通道读取,打开这么多端口也不是一件轻松的事情。
这就导致一个非常常见的操作:reducer端fetch数据的时候,说文件找不到。其实不是真的找不到,而是GC导致程序不响应

  1. 因为是shuffleMapTask都会包含R个缓冲区(R=reducer的个数,即下一个阶段task的个数),所以如果有M个shuffleMapTask,R个reducer,那么就会产生M*R个小文件,因为通常任务里M和R都很大,就会导致磁盘上大量文件,同时可能产生低效IO消耗。(spark的stage是所有shuffleMapTask任务都处理完,再执行下一个stage,及shuffle read部分,在此之前所有小文件都会存下来)
  2. 缓冲区占用内存太大,同样因为每个maptask都开R个缓冲区,M*R也会产生大量缓冲区占用内存(由于内存中需要保存大量文件操作句柄和临时信息,如果数据处理的规模比较庞大,内存可能OOM)

优化:

  1. 问题1比较好解决,spark consolidated hash-based shuffle里面实现的consolidation方法,
    • 将原来每个maptask都写R个文件,改为一个core产生R个文件,这样同一个core中连续执行的maptask可以共同输出文件,写到一起,这样磁盘上产生的文件个数可以有原来MR,减少为coresR,会少很多。
  2. 问题2不好解决,因为写文件到磁盘一定要开缓冲区,而且缓冲区太小影响IO速度

consolidated hashshuffle也有弱点:

  • 如果reducer端的并行任务或者数据分片过多的话,则core*redcuer 任务也会过大,产生很多小文件。

hashshuffle的处理shuffle write的方式严重制约了spark的集群规模和处理数据量的级别


【sorted-based shuffle】

为什么最终舍弃hashshuffle,选择了sorted-based shuffle?

即使是使用consolidation的方式,依然也会产生很多小文件,最终还是要减少Mapper端文件个数。

好处:

  • mapper端内存占用减少
  • reducer端拉取数据的次数减少
  • 网络通道的句柄减少;
  • 整体上提升了spark处理数据量的级别

sorted-based是如何减少mapper端文件个数的?

sorted-based shuffle不会为每个reducer中的task产生一个单独的文件,而是每个shuffleMapTask写两个文件,一个是index和一个data文件。其中data存储的是当前task的shuffle输出的分类数据,而index文件则存储了data文件中的数据通过partitioner的分类信息。下一个阶段的stage中的task就是根据这个index文件获取自己所需要抓取的上一个stage中shuffleMapTask产生的数据。

假设并行度是100,那么sorted-based shuffle mapper端产生的文件个数为 100*2,相比hashshuflle 100*100的个数极大减少。

sorted-based shuffle 主要在mapper阶段,与reducer端没有任何关系。在mapper阶段要进行排序。

  • 如何排序的?这里没太写明白。
  1. 根据partitionid进行排序,
  2. 根据本身数据的key进行排序。
    • 溢写前对内存中已有的数据进行排序,排序后会分批将数据写入磁盘,默认1w条一个batch。写的过程是通过java的bufferOutputStream,先生成多个临时文件,然后再merge成一个最终文件,同时在merge的过程中会写一个index文件,标记下个stage每个task需要的数据的start offset和end offset。

在mapper极端进行排序的目的:

  • 让reducer进行抓取的时候变得更高效。

    具体而言,reducer首先找driver去获取parent stage中的每个shuffleMapTask输出的位置信息,根据位置信息获取index文件,解析index文件,从解析的index文件中获取data文件中属于自己的那部分内容。

sorted-based shuffle的弱点:

  • 强制排序
    • 它要基于记录本身排序,是sorted-based shuffle最致命的性能消耗。
    • 如果数据本身不需要排序的话,会导致多余的消耗
    • 如果需要在partition内进行排序的话,就需要进行mapper端和reducer端两次排序

bypass机制
当shuffle read task 的数量<= bypass参数的阈值,就自动启动
与sorted-based shuffle 不同点在于,第一写磁盘机制不同,第二不会进行排序

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2022-2023 ligongzhao
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信