Driver信息改变
case DriverStateChanged(driverId, state, exception) => { state match { //
如果Driver的状态是错误、完成、杀死、失败,就移除Driver case DriverState.ERROR | DriverState.FINISHED
| DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state,
exception) case _ => throw new Exception(s"Received unexpected state update for
driver $driverId: $state") } } // 删除driver def removeDriver(driverId: String,
finalState: DriverState, exception: Option[Exception]) {
//用Scala高阶函数find()根据driverId,查找到driver drivers.find(d => d.id == driverId)
match { case Some(driver) => logInfo(s"Removing driver: $driverId")
//将driver将内存缓存中删除 drivers -= driver if (completedDrivers.size >=
RETAINED_DRIVERS) { val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove) } //将driver加入到已经完成的completeDrivers
completedDrivers += driver //从持久化引擎中删除driver
persistenceEngine.removeDriver(driver) //设置driver状态设置为完成 driver.state =
finalState driver.exception = exception //从worker中遍历删除传入的driver
driver.worker.foreach(w => w.removeDriver(driver)) //重新调用schedule schedule()
case None => logWarning(s"Asked to remove unknown driver: $driverId") } }
Executor信息改变
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { //
找到Executor对应的Application,然后再反过来通过Application内部的Executor缓存获取Executor信息 val
execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match { case Some(exec) => { // 如果有值 val appInfo = idToApp(appId)
exec.state = state if (state == ExecutorState.RUNNING) {
appInfo.resetRetryCount() } // 向driver同步发送ExecutorUpdated消息
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
// 判断,如果Executor完成了 if (ExecutorState.isFinished(state)) { // Remove this
executor from the worker and app logInfo(s"Removing executor ${exec.fullId}
because it is $state") // 从Application缓存中移除Executor
appInfo.removeExecutor(exec) // 从运行Executor的Worker的缓存中移除Executor
exec.worker.removeExecutor(exec) // 判断 如果Executor的退出状态是非正常的 val normalExit =
exitStatus == Some(0) // Only retry certain number of times so we don't go into
an infinite loop. if (!normalExit) { // 判断Application当前的重试次数,是否达到了最大值,最大值是10 //
也就是说,Executor反复调度都是失败,那么认为Application也失败了 if (appInfo.incrementRetryCount() <
ApplicationState.MAX_NUM_RETRY) { // 重新进行调度 schedule() } else { //
否则,进行移除Application操作 val execs = appInfo.executors.values if
(!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application
${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount}
times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } }
} } case None => logWarning(s"Got status update for unknown executor
$appId/$execId") } }
接下来看下removeApplication()方法
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
if (apps.contains(app)) { logInfo("Removing app " + app.id)
//从application队列(hashset)中删除当前application apps -= app idToApp -= app.id
actorToApp -= app.driver addressToApp -= app.driver.path.address if
(completedApps.size >= RETAINED_APPLICATIONS) { val toRemove =
math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach( a
=> { appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource) })
completedApps.trimStart(toRemove) } //加入已完成的application队列 completedApps += app
// Remember it in our history //从当前等待运行的application队列中删除当前APP waitingApps -=
app // If application events are logged, use them to rebuild the UI
rebuildSparkUI(app) for (exec <- app.executors.values) { //停止executor
exec.worker.removeExecutor(exec) exec.worker.actor ! KillExecutor(masterUrl,
exec.application.id, exec.id) exec.state = ExecutorState.KILLED }
app.markFinished(state) if (state != ApplicationState.FINISHED) {
//从driver中删除application app.driver ! ApplicationRemoved(state.toString) }
//从持久化引擎中删除application persistenceEngine.removeApplication(app) //从新调度任务
schedule() // Tell all workers that the application has finished, so they can
clean up any app state. //告诉所有的worker,APP已经启动完成了,所以他们可以清空APP state
workers.foreach { w => w.actor ! ApplicationFinished(app.id) } } }

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信