ExecutorAllocationManager的作用已在《Spark2.1.0——SparkContext概述
<https://blog.csdn.net/beliefer/article/details/83895000>
》一文有过介绍,更为准确地说,ExecutorAllocationManager是基于工作负载动态分配和删除Executor的代理。简单讲,ExecutorAllocationManager与集群管理器之间的关系可以用图1来表示。
图1       ExecutorAllocationManager与集群管理器之间的关系
 


ExecutorAllocationManager内部会定时根据工作负载计算所需的Executor数量,如果对Executor需求数量大于之前向集群管理器申请的Executor数量,那么向集群管理器申请添加Executor;如果对Executor需求数量小于之前向集群管理器申请的Executor数量,那么向集群管理器申请取消部分Executor。此外,ExecutorAllocationManager内部还会定时向集群管理器申请移除(杀死)过期的Executor。

        
有了对ExecutorAllocationManager的了解,这里来看看SparkContext是如何创建和启动ExecutorAllocationManager的。创建ExecutorAllocationManager的代码如下:
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend
match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ => None } } else { None } _executorAllocationManager.foreach(_.start())
根据上述代码,创建ExecutorAllocationManager的步骤如下:

*
调用Utils工具类的isDynamicAllocationEnabled方法判断是否需要启用ExecutorAllocationManager。根据附录A中对isDynamicAllocationEnabled方法的介绍,我们知道在配置spark.dynamicAllocation.enabled属性为true的前提下,在非Local模式下或者当spark.dynamicAllocation.testing属性为true时启用ExecutorAllocationManager。
*
当SchedulerBackend的实现类同时实现了特质ExecutorAllocationClient的情况下,才会创建ExecutorAllocationManager。
* 调用ExecutorAllocationManager的start方法启动ExecutorAllocationManager。
         ExecutorAllocationManager的start方法的实现如下:
def start(): Unit = { listenerBus.addListener(listener) val scheduleTask =
new Runnable() { override def run(): Unit = { try { schedule() } catch { case
ct: ControlThrowable => throw ct case t: Throwable => logWarning(s"Uncaught
exception in thread ${Thread.currentThread().getName}", t) } } }
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis,
TimeUnit.MILLISECONDS) client.requestTotalExecutors(numExecutorsTarget,
localityAwareTasks, hostToLocalTaskCount) }
根据上述代码,ExecutorAllocationManager的start方法的执行步骤如下。

* 向事件总线添加ExecutorAllocationListener。
* 创建定时调度的任务scheduleTask,此任务主要调用schedule方法。
*
将scheduleTask提交给executor(executor是只有一个线程的ScheduledThreadPoolExecutor),以固定的间隔intervalMillis(值为100)进行调度。
*
调用ExecutorAllocationClient的requestTotalExecutors方法请求所有的Executor。numExecutorsTarget是动态分配Executor的总数,取spark.dynamicAllocation.initialExecutors、spark.dynamicAllocation.minExecutors、spark.executor.instances三个属性配置的最大值。localityAwareTasks是由本地性偏好的Task数量。hostToLocalTaskCount是Host与想要在此节点上运行的Task的数量之间的映射关系。

注意:在SchedulerBackend的实现类中只有CoarseGrainedSchedulerBackend同时实现了特质ExecutorAllocationClient,因此对CoarseGrainedSchedulerBackend实现的requestTotalExecutors方法的具体内容将放在第9章详细介绍。


定时任务scheduleTask会按照固定的时间间隔调用ExecutorAllocationManager的schedule方法以调整待执行Executor请求的数量和运行的Executor的数量。schedule方法的实现如下:
private def schedule(): Unit = synchronized { val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now) val executorIdsToBeRemoved =
ArrayBuffer[String]() removeTimes.retain { case (executorId, expireTime) => val
expired = now >= expireTime if (expired) { initializing = false
executorIdsToBeRemoved += executorId } !expired } if
(executorIdsToBeRemoved.nonEmpty) { removeExecutors(executorIdsToBeRemoved) } }
根据上述代码,schedule方法的执行步骤如下:

* 调用updateAndSyncNumExecutorsTarget方法重新计算所需的Executor数量,并更新请求的Executor数量。
*
对过期的Executor进行删除。removeExecutors方法将利用ExecutorAllocationClient的killExecutors方法通知集群管理器杀死Executor。killExecutors方法需要ExecutorAllocationClient的实现类去实现。
updateAndSyncNumExecutorsTarget方法的实现如下:
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded // 获得实际需要的Executor的最大数量maxNeeded if
(initializing) { // ExecutorAllocationManager还在初始化,则返回0 0 } else if (maxNeeded
< numExecutorsTarget) { // 减少需要的Executor的数量 val oldNumExecutorsTarget =
numExecutorsTarget numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
numExecutorsToAdd = 1 if (numExecutorsTarget < oldNumExecutorsTarget) {
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,
hostToLocalTaskCount) logDebug(s"Lowering target number of executors to
$numExecutorsTarget (previously " + s"$oldNumExecutorsTarget) because not all
requested executors are actually needed") } numExecutorsTarget -
oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) { //
添加Executor val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add
more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS
seconds)") addTime += sustainedSchedulerBacklogTimeoutS * 1000 delta } else { 0
} }
根据上述代码,updateAndSyncNumExecutorsTarget方法的执行步骤为:

* 调用maxNumExecutorsNeeded方法获得实际需要的Executor的最大数量maxNeeded。
* 如果ExecutorAllocationManager还在初始化,则返回0。
*
如果Executor的目标数量(numExecutorsTarget)超过我们实际需要的数量(maxNeeded),那么首先将numExecutorsTarget设置为maxNeeded与最小Executor数量(minNumExecutors)之间的最大值,然后调用ExecutorAllocationClient的requestTotalExecutors方法重新请求numExecutorsTarget指定的目标Executor数量,以此停止添加新的执行程序,并通知集群管理器取消额外的待处理Executor的请求,最后返回减少的Executor数量。
*
如果maxNeeded大于等于numExecutorsTarget且当前时间大于上次添加Executor的时间,那么首先调用addExecutors方法(此方法也是利用ExecutorAllocationClient的requestTotalExecutors方法)通知集群管理器添加额外的Executor,然后更新添加Executor的时间,最后返回添加的Executor数量。

这里对ExecutorAllocationManager的主要工作原理进行了分析,maxNumExecutorsNeeded方法、addExecutors方法及removeExecutors方法的实现留给感兴趣的读者自行阅读。local-cluster和Standalone模式下的集群管理器Master如何给应用程序分配或取消Executor的分析,将在9.9.5节详细介绍。

         根据对ExecutorAllocationManager的分析,可以用图2来表示Executor的动态分配过程。
图2      Executor的动态分配过程
 

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信