在Spark中,什么情况下,会发生shuffle?reduceByKey、groupByKey、sortByKey、countByKey、join、cogroup等操作

Spark Shuffle操作的两个特点

第一个特点

在Spark早期版本中,那个bucket缓存是非常非常重要的,因为需要将一个ShuffleMapTask所有的数据都写入内存缓存之后,才会刷新到磁盘。但是这就有一个问题,如果map
side数据过多,那么很容易造成内存溢出。所以spark在新版本中,优化了,默认那个内存缓存是100kb,然后呢,写入一点数据达到了刷新到磁盘的阈值之后,就会将数据一点一点地刷新到磁盘。

这种操作的优点,是不容易发生内存溢出。缺点在于,如果内存缓存过小的话,那么可能发生过多的磁盘写io操作。所以,这里的内存缓存大小,是可以根据实际的业务情况进行优化的。
第二个特点,

与MapReduce完全不一样的是,MapReduce它必须将所有的数据都写入本地磁盘文件以后,才能启动reduce操作,来拉取数据。为什么?因为mapreduce要实现默认的根据key的排序!所以要排序,肯定得写完所有数据,才能排序,然后reduce来拉取。

但是Spark不需要,spark默认情况下,是不会对数据进行排序的。因此ShuffleMapTask每写入一点数据,ResultTask就可以拉取一点数据,然后在本地执行我们定义的聚合函数和算子,进行计算。

spark这种机制的好处在于,速度比mapreduce快多了。但是也有一个问题,mapreduce提供的reduce,是可以处理每个key对应的value上的,很方便。但是spark中,由于这种实时拉取的机制,因此提供不了,直接处理key对应的values的算子,只能通过groupByKey,先shuffle,有一个MapPartitionsRDD,然后用map算子,来处理每个key对应的values。就没有mapreduce的计算模型那么方便。

普通的Shuffle操作原理剖析



普通的Shuffle操作原理剖析.png

 

每个ShuffleMapTask,都会为每个ResultTask创建一份bucket缓存,以及对应的ShuffleBlockFile磁盘文件
所以假设,如果有100个ShuffleMapTask,100个ResultTask,本地磁盘要产生10000个文件,磁盘IO过多,影响性能

ShuffleMapTask的输出,会作为MapStatus,发送到DAGScheduler的MapOutputTrackerMaster中,MapStatus包含了每个ResultTask要拉取的数据大大小

每个ResultTask会用BlockStoreShuffleFetcher去MapOutputTrackerMaster获取自己的要拉取的文件的信息,然后底层通过BlockManager将数据拉取过来
Map端的数据,可以理解为Shuffle的第一个RDD,MapPartitionsRDD
每个ResultTask拉取过来的数据,其实会组成一个内部的RDD,叫ShuffledRDD,优先放入内存,如果内存不够,那么写入磁盘
然后每个ResultTask针对数据进行聚合,最后生成MapPartitionsRDD,就是我们执行reduceByKey等操作希望获得的那个RDD

优化后的Shuffle原理剖析



优化后的Shuffle原理剖析.png

 

Spark新版本中,引入了consolidation机制,也就说,提出了ShuffleGroup概念

一个ShuffleMapTask将数据写入ResultTask数量的本地文件,这个不会变,但是,当下一个ShuffleMapTask运行的时候,可以直接将数据写入之前的ShuffleMapTask的本地文件,相当于是,对多个ShuffleMapTask的输出进行了合并,从而大大减少了本地磁盘的数量

一组ShuffleGroup,每个文件中,都存储了多个ShuffleMapTask的数据,每个ShuffleMapTask的数据,叫做一个segment。此外,会通过一些索引,来标记每个ShuffleMapTask的输出在ShuffleBlockFile中的索引,以及偏移量等,来进行不同ShuffleMapTask的数据的区分

机器上,有两个cpu,也就说,4个ShuffleMapTask,有2个ShuffleMapTask是可以并行执行的,先执行的是黑色边框的两个ShuffleMapTask,再执行红色边框的两个ShuffleMapTask

并行执行的ShuffleMapTask,写入的文件,一定是不同的,当一批并行执行的ShuffleMapTask运行完之后,那么新的一批ShuffleMapTask启动起来并行执行的时候,优化机制就开始发挥作用了,开启优化Shuffle操作,主要是通过在SparkConf中,设置一个参数就可以
每一个节点上面,比如有2个cpu,100个ShuffleMapTask,那么此时,就会产生100*100个磁盘文件,就是10000个
开启了consolidation机制之后的Shuffle write
每个节点上的磁盘文件,数量变成了cpu core的数量 * ShuffleMapTask的数量,同样的情况,每个节点只产生200个磁盘文件

源码

Shuffle写

入口
// 这个writer默认是HashShuffleWriter writer.write(rdd.iterator(partition,
context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
看HashShuffleWriter的writer方法
// 将每个ShuffleMapTask计算出来的新的rdd的partition数据,写入本地磁盘 override def write(records:
Iterator[_ <: Product2[K, V]]): Unit = { //
首先判断,是否需要再map端本地进行聚合,这里的话,如果是reduceByKey这种操作,它的dep.aggregator.isDefined、dep.mapSideCombine都是true
// 那么就会进行map端的本地聚合 val iter = if (dep.aggregator.isDefined) { if
(dep.mapSideCombine) { //
这里就会执行本地聚合,比如本地有(hello,1)、(hello,1),那么此时就会聚合成(hello,2)
dep.aggregator.get.combineValuesByKey(records, context) } else { records } }
else { require(!dep.mapSideCombine, "Map-side combine without Aggregator
specified!") records } //
需要本地聚合,那么先本地聚合,然后遍历数据,对每个数据,调用partitioner,默认是HashPartitioner //
生成bucketId,也就是决定了,每一份数据,要写入哪个bucket中 for (elem <- iter) { val bucketId =
dep.partitioner.getPartition(elem._1) //
获取到了bucketId之后,会调用shuffleBlockManager.forMapTask()方法,来生成bucketId对应的writer,然后用writer将数据写入bucket
shuffle.writers(bucketId).write(elem) } }
上面代码的shuffle其实是
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId,
numOutputSplits, ser, writeMetrics)
看看forMapTask()方法
// 给每一个mao task 获取一个ShuffleWriterGroup def forMapTask(shuffleId: Int, mapId:
Int, numBuckets: Int, serializer: Serializer, writeMetrics:
ShuffleWriteMetrics) = { new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) private val
shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup
= null // 对应之前讲解的,shuffle有两种模式,一种是普通的,一种是优化后的 //
这里会判断,如果开启了consolidation机制,也就是consolidateShuffleFiles为true的话,那么实际上,不会给每个bucket都获取一个独立的文件
// 而是为这个bucket获取一个ShuffleGroup的writer val writers: Array[BlockObjectWriter] =
if (consolidateShuffleFiles) { fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => //
首先,用shuffleId、mapId、bucketId(也就是reduceId,一个bucket对应一个reduce)生成一个唯一的ShuffleBlockId
// 然后用buckId,来调用ShuffleFileGroup的apply()函数,为bucket获取一个ShuffleFileGroup val
blockId = ShuffleBlockId(shuffleId, mapId, bucketId) //
然后调用BlockManager的getDiskWriter()方法,针对ShuffleFileGroup获取一个writer //
这里,就明白了,如果开启了consolidation机制,实际上,对于每一个bucket,都会获取一个针对ShuffleFileGroup的wtriter,而不是一个独立的ShuffleBlockFile的writer
// 这样就实现了所谓的,多个ShuffleMapTask的输出数据的合并 blockManager.getDiskWriter(blockId,
fileGroup(bucketId), serializer, bufferSize, writeMetrics) } } else { //
如果没有开启consolidation机制,也就是普通的shuffle操作的话
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => //
同样生成一个ShuffleBlockId val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
// 然后调用BlockManager的diskBlockManager,获取一个代表了要写入本地磁盘文件的blockFile val blockFile =
blockManager.diskBlockManager.getFile(blockId) // Because of previous failures,
the shuffle file may already exist on this machine. // If so, remove it. //
而且会判断,如果blockFile要是存在的话,还得删除它 if (blockFile.exists) { if (blockFile.delete()) {
logInfo(s"Removed existing shuffle file $blockFile") } else {
logWarning(s"Failed to remove existing shuffle file $blockFile") } } //
然后调用BlockManager的getDiskWriter()方法,针对那个blockFile生成writer
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize,
writeMetrics) } //
所以使用这种普通的shuffle操作的话,对于每一个ShuffleMapTask输出的bucket,都会在本地获取一个单独的ShuffleBlockFile }
shuffle读

入口
override def compute(split: Partition, context: TaskContext): Iterator[(K,
C)] = { //
ResultTask或者ShuffleMapTask在执行到ShuffleRDD时,肯定会调用ShuffleRDD的computer()方法,来计算当前这个RDD的partition的数据
//
在这里,会调用ShuffleManager的getReader()方法,获取一个HashShuffleReader,然后调用它的read()方法,拉取该ResultTask
/ ShuffleMapTask需要聚合的数据 val dep =
dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index,
split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
看看HashShuffleReader的read()方法
/** Read the combined key-values for this reduce task */ override def read():
Iterator[Product2[K, C]] = { val ser = Serializer.getSerializer(dep.serializer)
//
reduceTask在拉取数据时,其实会用BlockStoreShuffleFetcher来从DAGDcheduler的MapOutputTrackerMaster中获取自己想要的数据的信息
// 然后底层,再通过blockManager从对应的位置,拉取需要的数据 val iter =
BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) { new InterruptibleIterator(context,
dep.aggregator.get.combineCombinersByKey(iter, context)) } else { new
InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter,
context)) } } else { require(!dep.mapSideCombine, "Map-side combine without
Aggregator specified!") // Convert the Product2s to pairs since this is what
downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K,
C]]].map(pair => (pair._1, pair._2)) } // Sort the output if there is a sort
ordering defined. dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => //
Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is
disabled, // the ExternalSorter won't spill to disk. val sorter = new
ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
sorter.iterator case None => aggregatedIter } }
看看BlockStoreShuffleFetcher.fetch()方法
private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T](
shuffleId: Int, reduceId: Int, context: TaskContext, serializer: Serializer) :
Iterator[T] = { logDebug("Fetching outputs for shuffle %d, reduce
%d".format(shuffleId, reduceId)) // 拿到全局的blockManager val blockManager =
SparkEnv.get.blockManager val startTime = System.currentTimeMillis //
拿到一个全局的MapOutputTracker的引用,调用其getServerStatuses()方法,传入了shuffleId和reduceId //
shuffleId可以代表当前这个stage的上一个stage,shuffle是分为两个stage的,shuffle
writer发生在上一个stage中,shuffle read 是发生在当前stage中的 //
首先通过shuffleId可以限制到上一个stage的所有ShuffleMapTask的输出的MapStatus,接着,通过reduceId,也就是所谓的bucketId,来限制,从每个mapTask中,获取当前这个resultTask需要获取的每个ShuffleMapTask的输出文件的信息
//
getServerStatuses()方法,一定是走远程网络通信的,因为要联系Driver上的DAGScheduler的MapOutputTrackerMaster
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId,
reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took
%d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) //
对刚才拉取的数据,进行一些数据结构上的转换操作 val splitsByAddress = new HashMap[BlockManagerId,
ArrayBuffer[(Int, Long)]] for (((address, size), index) <-
statuses.zipWithIndex) { splitsByAddress.getOrElseUpdate(address,
ArrayBuffer()) += ((index, size)) } val blocksByAddress: Seq[(BlockManagerId,
Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map { case (address, splits) =>
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2))) }
def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = { val
blockId = blockPair._1 val blockOption = blockPair._2 blockOption match { case
Success(block) => { block.asInstanceOf[Iterator[T]] } case Failure(e) => {
blockId match { case ShuffleBlockId(shufId, mapId, _) => val address =
statuses(mapId.toInt)._1 throw new FetchFailedException(address, shufId.toInt,
mapId.toInt, reduceId, e) case _ => throw new SparkException( "Failed to get
block " + blockId + ", which is not a shuffle block", e) } } } } //
ShuffleBlockFetcherIterator构造以后,在其内部,就直接根据拉取到的地理位置信息,通过BlockManager去远程ShuffleMapTask所在的节点的BlockManager去拉取数据
val blockFetcherItr = new ShuffleBlockFetcherIterator( context,
SparkEnv.get.blockManager.shuffleClient, blockManager, blocksByAddress,
serializer, SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024
* 1024) val itr = blockFetcherItr.flatMap(unpackBlock) //
最后,将拉取到的数据,执行一些转换和封装,返回 val completionIter = CompletionIterator[T,
Iterator[T]](itr, { context.taskMetrics.updateShuffleReadMetrics() }) new
InterruptibleIterator[T](context, completionIter) { val readMetrics =
context.taskMetrics.createShuffleReadMetricsForDependency() override def
next(): T = { readMetrics.incRecordsRead(1) delegate.next() } } } }