博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Shuffle及其调优
阅读量:4206 次
发布时间:2019-05-26

本文共 15326 字,大约阅读时间需要 51 分钟。

Spark Shuffle

Shuffle概述

  在MapReduce和Spark中都有Shuffle。对于MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,将Map阶段读取的数据进行Shuffle操作并输出到对应的Reduce,Reduce进行计算。MapReduce的Shuffle过程如图所示。

在这里插入图片描述
Shuffle实质上是对数据进行重组,在整个Shuffle过程中,往往伴随着大量的磁盘和网络I/O,因此Shuffle性能的高低也直接决定了整个程序的性能高低。Spark的Shuffle实现过程如图所示。
在这里插入图片描述
  Spark在DAG调度的过程中,会根据Shuffle过程来划分Stage阶段,即存在ShuffleDependency(宽依赖)的时候,会将作业job划分成多个Stage。划分Stage时,构建ShuffleDependency的时候会进行Shuffle注册,获取后续数据读取所需要的ShuffleHandle,最终每一个job提交后都会生成一个ResultStage和若干个ShuffleMapStage,其中ResultStage表示生成作业的最终结果所在的Stage。ResultStage与ShuffleMapStage中的task分别对应着ResultTask与ShuffleMapTask。一个job,除了最终的ResultStage外,其他若干ShuffleMapStage中各个ShuffleMapTask都需要将最终的数据根据相应的Partitioner对数据进行分组,然后持久化分区的数据。进行数据持久化目的在于1)容错;2)降低内存数据存储压力。

Spark的四种Shuffle机制

  负责Shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即Shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager(spark1.2之前使用)和SortShuffleManager,因此Spark的Shuffle有Hash Shuffle和Sort Shuffle两种。

  在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行Shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

spark1.2版本以前:hashShuffleManager	未经优化的hashShuffleManager	经过优化的hashShuffleManagerspark1.2版本以后:SortShuffleManager	普通机制	ByPass机制

HashShuffle机制

  在1.2版本之前,Spark使用HashShuffle,1.2版本之后使用Sort-Base Shuffle。Spark的运行主要分为两部分:一部分是以SparkContext为核心的驱动程序,另一部分是Worker节点上Task,它是运行实际任务的。程序运行的时候,Driver和Executor进程相互交互:运行什么任务,即Driver会通过网络传输为Executor分配Task;任务数据从哪儿获取,即Task要从 Driver 抓取其他上游Task的数据结果,所以这个过程中就会不断地产生网络结果。其中,下游Stage 向上游Stage申请数据的过程就称之为 Shuffle。

未优化的HashShuffle机制

  对相同的key执行hash算法,从而将相同的key都写入到一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入到内存缓冲,当内存缓冲填满之后,才会溢写到磁盘文件中。但是这种策略的不足在于,下游有几个task,上游的每一个task都就都需要创建几个临时文件,每个文件中只存储key取hash之后相同的数据,导致了当下游的task任务过多的时候,上游会堆积大量的小文件。

在这里插入图片描述

经过优化的HashShuffle机制

  在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

  如果想使用优化之后的ShuffleManager,需要设置:spark.shuffle.consolidateFiles=true
在这里插入图片描述

未经优化:上游的task数量:m下游的task数量:n上游的executor数量:k  (m>=k)总共的磁盘文件:m*n优化之后的:上游的task数量:m下游的task数量:n上游的executor数量:k  (m>=k)总共的磁盘文件:k*n

SortShuffleManager

  SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当ShuffleReadTask的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

  在普通模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可以选用不同的数据结构。如果是由聚合操作的shuffle算子,就是用map的数据结构(边聚合边写入内存),如果是join的算子,就使用array的数据结构(直接写入内存)。接着,每写一条数据进入内存数据结构之后,就会判断是否达到了某个临界值,如果达到了临界值的话,就会尝试的将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

  在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,排序之后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批次1万条数据的形式分批写入磁盘文件,写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
  此时task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写,会产生多个临时文件,最后会将之前所有的临时文件都进行合并,最后会合并成为一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件(标识了下游各个task的数据在文件中的start offset与end offset)。最终再由下游的task根据索引文件读取相应的数据文件。
在这里插入图片描述

shuffle write:mapper阶段,上一个stage得到最后的结果写出shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并

SortShuffleManager-bypass机制

下图说明了bypass SortShuffleManager的原理。

在这里插入图片描述
bypass运行机制的触发条件如下:
1)ShuffleMap Task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认为200)
2)不是聚合类的Shuffle算子(比如reduceByKey)

  此时Task会为每个下游Task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

  该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,ShuffleRead的性能会更好。
  而该机制与普通SortShuffleManager运行机制的不同在于:
1)磁盘写机制不同
2)不会进行排序
也就是说,启用该机制的最大好处在于,ShuffleWrite过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

总结

  Map端的Shuffle一般为Shuffle的Write阶段,Reduce端的Shuffle一般为Shuffle的Read阶段。Spark的Shuffle分为两种实现:HashShuffle和SortShuffle。
  HashShuffle又分为普通机制和合并机制,普通机制因为其会产生MR个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用磁盘文件从而从而大幅度减少磁盘文件的数量,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。
  SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。
  SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

Spark Shuffle源码分析

  在Spark 1.0版本之前,Spark只支持Hash Shuffle,原理很简单:每个Task根据key的哈希值计算出每个key将要写入的partition,然后将数据写入一个文件供下游的Task来拉取。但是这种Hash Shuffle模式有其缺点:当并行度很高时,会产生很多中间落地的文件,对系统的内存和磁盘IO会造成很大压力。

  为了解决这个问题,在Spark 0.8.1中加入了Shuffle Consolidate File机制,在1.6版本之前,需要通过设置spark.shuffle.consolidateFiles设置为true(默认为false)来使用这个功能,1.6版本之后成为默认项。其实现原理为:对于同一个core的不同Task在写中间文件的时候可以共享追加同一个文件,这样就显著的减小了文件的数量。
  Shuffle Consolidate File机制虽然缓解了Shuffle过程产生文件过多的问题,但是并没有彻底解决内存和IO的问题,所以在Spark 1.1中实现了Sort Based Shuffle,通过spark.shuffle.manager选项可以设置,默认为Hash,而在Spark 1.2中Sort Based Shuffle取代Hash Based Shuffle成为默认选项,在Spark 2.0版本之后,Hash Based Shuffle已经不见踪影,Sort Based Shuffle成为唯一选项。
  Sort Based Shuffle的实现:首先,每个ShuffleMapTask不会为每个Reducer生成单独的一个文件,它会将所有的结果写到一个文件中,同时生成一个Index文件,Reducer可以通过这个Index文件取得它需要处理的数据,这样就避免了产生大量的中间文件,也就节省同时打开大量文件使用的内存和随机写带来的IO。过程是这样的:

  1. 每个Map Task会为下游的每一个Reducer,或者说每一个partition生成一个Array,将key-value数据写入到这个Array中,每一个partition中的数据并不会排序(避免不必要的排序)
  2. 每个Array中的数据如果超过某个阈值将会写到外部存储,这个文件会记录相应的partitionId以及保存了多少了数据条目等信息
  3. 最后用归并排序将不同partition的文件归并到一个新的文件中,每个partition数据在新的文件中相当于一个桶,并且需要同时生成Index索引文件来记录桶的位置信息

Shuffle write

  整个Shuffle过程就被分成了两个部分:shuffle writeshuffle read,在executor中,task的执行实际上是调用Task类的runTask()方法来计算,而shuffle write过程则是存在于Task的实现类ShuffleMapTask的runTask()方法中。

  首先反序列化收到的Task信息,然后调用了ShuffleWrite实例的write方法。首先来看writer的实例化代码writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)。其中,manager是ShuffleManager,在1.6.3版本中,它有两个实现类:HashShuffleManager和SortShuffleManager,而在2.3.0版本中,HashShuffleManager已经不见踪影,只剩下SortShuffleManager实现类,Hash Based Shuffle已经被优化掉。

Hash Shuffle Write

  HashShuffleManager中getWriter方法创建了HashShuffleWriter实例。接着调用了这个实例的write方法,在方法中首先判断是否存在aggregator聚合操作,并进一步判断是否为map端的聚合mapSideCombine,如果是的话就调用combineValuesByKey方法对records进行聚合,否则的话就直接返回records。

  val bucketId = dep.partitioner.getPartition(elem._1)这一行代码对应上面说的根据Key的哈希值来计算出对应的partition Id,dep对应着传入的ShuffleHandle,那么相应的Partitioner就是HashPartitioner,其getPartition的nonNegativeMod方法实现:key的hashCode对numPartitions取模并保证结果为非负数。
  再来看接下shuffle.writers(bucketId).write(elem._1, elem._2),其中shuffle是ShuffleWriterGroup的实例。ShuffleWriterGroup用来存储writers,为每一个reducer或者说每一个partition都保存一个writer,而这每一个writer其实是DiskBlockObjectWriter的实例,其中封装了本地文件的信息。每一个MapTask针对下游的每个partition生成一个本地文件来存储信息,这样的话就会生成M*R个中间文件(M为Mapper的数量,R为Reducer的数量),即上面说的HashBasedShuffle的弊病。
  根据Key的哈希值取得对应的writer后,最后通过DiskBlockObjectWriter的write方法将数据写到本地文件。

Sort Shuffle Write

  对于SortShuffleWriter类的write方法,在这个方法中,首先考虑聚合,如果是mapSideCombine,那么创建一个带有aggregator和Key的排序器的外部排序器ExternalSorter,否则就创建一个不带聚合和Key的排序器的ExternalSorter,然后将数据都放入排序器中。然后将数据从排序器的数据结构中利用归并排序写入到本地文件中,并根据返回的parititionLengths创建Index文件。

  下面从外部排序器ExternalSorter入手来了解这个过程,这个方法也是一分为二,如果spills是空的,说明数据全在内存的数据结构中,没有溢写到磁盘,否则说明内存和磁盘中都有records。如果数据都在内存中,那么又一分为二,其中定义过aggregator的数据放在PartitionedAppendOnlyMap中,没有的话就放在PartitionedPairBuffer中。这两个数据结构的实现比较复杂,它们主要的功能是用来存放records,如果达到某个阈值则spill到磁盘,落成文件,两者的区别为:map用来存放有聚合需求的数据,buffer用来存放没有聚合需求的数据。
  最后在内存中的数据都通过叫作FileSegment的实例封装,其中包括每个partition的起始和终止position,并且返回一个数组,用来记录partitionId与每个Segment的长度的关系,即每一个partition的数据写入一个FileSegment,所有的FileSegment有序落入一个文件。
  如果数据已经溢出至外部存储,那么这部分数据需要采用归并排序的方式合并成一个文件,由ExternalSorter的partitionedIterator方法实现,是按照是否有mapSideCombine分为使用map还是buffer,并且如果没有传入key的比较器,则直接按照partitionId来排序,否则还需要加上KeyComparator,最后将spills的数据与in-memory的数据使用归并排序合并到一个文件中。
  writePartitionedFile方法最后返回一个数组,其中数组下标为partitionID,而内容就是对应的FileSegment的长度,拿到这个数组是用来建立Index文件,至此,sort shuffle write结束。

总结

  Hash-Based-Shuffle设计之初是为了避免多余的排序操作,但是出现了中间落地文件过多的问题,即使采用ConsolidateFile机制,也不能有效解决问题,而在生产环境中,当数据量很大时,并行度也会很高,相应的shuffle上下游map和reduce的partition数量就会很多,导致中间落地文件数量过多,从而出现内存溢出和磁盘IO性能瓶颈,在spark 2.0版本以后消失不见。

  Sort-Based-Shuffle为了解决这个问题,采用了FileSegment的概念,通过partitionId对数据分桶,写入一个文件,并且建立Index文件提供offset在Reducer拉取数据时使用,并且在合并文件的时候仅根据partitionId来排序,避免了多余的排序,在spark 1.2版本以后已经成为默认选项。

Shuffle read

  shuffle read的起点应该是下游的Reducer来读取中间落地文件,而除了需要从外部存储取数据和已经cache或者checkpoint的RDD之外,一般的Task都是通过ShuffledRDD的shuffle read开始reduce的。首先可以看一下ShuffledRDD的compute()方法,调用ShuffleManager的getReader方法回去一个reader。ShuffleManager这里有两个实现类,HashShuffleManager和SortShuffleManager了,分别对应两种不同的策略,但在shuffle read的过程中,它们的getReader方法都创建了同一个BlockStoreShuffleReader对象,即它们的shuffle read过程相同,接着shuffle read的核心实现BlockStoreShuffleReader的read()方法:

,切分一下有三块功能:

  1. 用序列化工具读取文件成为一个key/value iterator并更新Task context的元数据信息
  2. 根据传入的Dependency中是否有聚合动作来对数据进行聚合处理
  3. 根据Dependency中是否存在key的排序器来对数据进行排序处理

其中,aggregator和keyOrdering对应着shuffle write过程中的相应参数,接下来看下下游是如何获取数据的。

block fetch

  在第一部分中,首先创建了一个ShuffleBlockFetcherIterator对象,这个对象会创建一个(BlockID, InputStream)形式的Iterator来拉取中间文件的multiple blocks,这个对象在实例化的过程中首先会调用initialize()方法。拉取数据有两种,一种是remoteBlocks另一种localBlocks,如果数据不在本地节点上,那么就要通过网络去获取数据,通过网络拉取就会占用网络带宽,所以系统提供了两种策略,具体实现在splitLocalRemoteBlocks方法中,通过网络了拉取数据blocks的策略:

  1. 每次最多启动5个线程到最多5个节点上读取数据
  2. 每次请求数据的大小不会超过spark.reducer.maxMbInFlight(默认48MB)的五分之一

这么做的目的一个是减少占用带宽,另一个是使用并行化请求数据减少请求时间。

  请求已经切分好了,接下来通过调用fetchUpToMaxBytes()方法来发送请求,通过ShuffleClient实例去拉取Blocks,这里的ShuffleClient有多种实现,其中通过网络获取Blocks的实现类为:NettyBlockTransferService,而本地获取Blocks的实现类为:BlockTransferService,fetchBlocks方法中根据传入的host地址端口和executorId,然后使用Netty协议去获取数据。

  接下来,再来看一下本地的数据拉取方法,本地的Blocks直接通过blockManager的getBlockData方法去获取数据,而如果数据是通过shuffle过程获取的,getBlockData就有两种实现:Hash和Sort

Hash的实现类为:FileShuffleBlockResolver

Sort的实现类为:IndexShuffleBlockResolver

其中的不同就是Sort策略的getBlockData需要先通过IndexFile定位到数据对应的FileSegment,而Hash则可以直接通过blockId直接获取文件。

MapReduce Shuffle 和 Spark Shuffle对比

  1. 从整体功能上看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce(Spark 里可能是后续的一系列操作)
  2. 从流程的上看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine和 reduce的 records 必须先 sort。这样的好处在于 combine/reduce可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。以前 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行合并,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey的操作。在Spark 1.2之后,sort-based变为默认的Shuffle实现
  3. 从流程实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map, spill, merge, shuffle, sort, reduce等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation,所以 spill, merge, aggregate 等操作需要蕴含在 transformation中

Spark RDD中的Shuffle算子

//去重def distinct()def distinct(numPartitions: Int)//聚合def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]//排序def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]//重分区def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)//集合或者表操作def intersection(other: RDD[T]): RDD[T]def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]def intersection(other: RDD[T], numPartitions: Int): RDD[T]def subtract(other: RDD[T], numPartitions: Int): RDD[T]def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

Spark Shuffle参数调优

spark.shuffle.file.buffer

默认值:32K
参数说明:用于设置ShuffleWriteTask的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优思路:如果job可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘I/O次数,进而提升性能。
spark.reducer.maxSizeInFlight
默认值:48M
参数说明:用于设置ShuffleReadTask的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优思路:如果job可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96M),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
spark.shuffle.io.maxRetries
默认值:3
参数说明:ShuffleReadTask从ShuffleWriteTask所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优思路:对于那些包含了特别耗时的Shuffle操作的作业,适当增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。特别针对超大数据量(数十亿~上百亿)的Shuffle过程,调节该参数可以大幅度提升稳定性。
spark.shuffle.io.retryWait
默认值:5s
参数说明:代表了每次重试拉取数据的等待间隔,默认是5s。
调优思路:适当加大间隔时长(比如60s),以增加shuffle操作的稳定性。
spark.shuffle.memoryFraction
默认值:0.2
参数说明:代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优思路:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给ShuffleRead的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。
spark.shuffle.manager
默认值:sort
参数说明:用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优思路:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。
spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果ShuffleReadTask的数量小于这个阈值(默认是200),则ShuffleWrite过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优思路:当你使用SortShuffleManager时,如果不需要排序操作,那么适当将这个参数调大一些,大于ShuffleReadTask的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此ShuffleWrite性能有待提高。
spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并ShuffleWrite的输出文件,对于ShuffleReadTask数量特别多的情况下,这种方法可以极大地减少磁盘I/O开销,提升性能。
调优建议:如果不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。

转载地址:http://ayhli.baihongyu.com/

你可能感兴趣的文章
c语言中的void类型解析
查看>>
c语言中的内存4区域模型(堆,栈,全局区,代码区)
查看>>
c语言中栈和数组buf的生长方向
查看>>
字符串相关一级指针内存模型
查看>>
一级指针(char *)易错模型分析
查看>>
c语言中的const专题
查看>>
二级指针内存模型
查看>>
c语言中的数组, 数组类型
查看>>
数组指针类型
查看>>
c语言中,多维数组本质技术推演
查看>>
多维数组做函数参数技术推演
查看>>
memset函数及其用法,C语言memset函数详解
查看>>
数组做函数参数和结构体做函数参数之间的一些区别
查看>>
结构体做函数参数的例子demo
查看>>
结构体做函数参数进阶
查看>>
结构体做函数参数,即结构体中套一级指针,结构体中套二级指针
查看>>
结构体的深度拷贝和浅拷贝
查看>>
结构体中的话题-偏移量
查看>>
c语言文件读写概念
查看>>
按照字符读写文件
查看>>