一,简介

Spark调度机制可以理解为两个层面的调度。首先,是Spark
Application调度。也就是Spark应用程序在集群运行的调度,应用程序包括Driver调度和Executor调度。其次,就是每个Spark
Application都会有若干Jobs(Spark
Actions),然后这些job是以何种机制,在Executor上执行的,也是需要一个调度管理的机制,该层面调度也可以理解为SparkContext内部调度。之所以会出现这种情况,主要是生产中可能会希望一个SparkContext作为服务,同时执行若干Jobs,此时可以用Spark
Application的Fair机制的调度。

 

二,App之间的调度

在以集群的方式运行Spark App时,每个Spark App会包含一些列独立资源的Executor
JVMs,这些JVMs仅仅运行该App的tasks,缓存该App的数据。当有多个应用或者多个程序在你的集群中运行时,这就牵涉到如何在集群中给这些Spark
App分配资源。


最简单的方式是提供静态资源分配。也即给运行程序分配固定资源,资源数在该程序运行期间都不会有变动。这种方式出现在Spark的Standalone,yarn和coarse-grained
Mesos 模式。资源的分配方式,在每种集群运行模式中有些不同:

 

1,standalone模式


默认情况下,app在Standalone集群中以FIFO的方式运行。您可以通过设置spark.cores.max配置属性来限制应用程序使用的节点数,也可以通过spark.deploy.defaultCores更改未设置此应用程序的默认值。
最后,除了控制cores之外,每个应用程序可以使用spark.executor.memory配置控制其内存使用。

 

2,mesos


要在Mesos上使用静态分区,请将spark.mesos.coarse配置属性设置为true,并且可选地像Standalone模式一样设置spark.cores.max来限制每个应用程序的资源。您还应该设置spark.executor.memory来控制执行程序内存。

 

3,yarn

Spark
YARN客户端的--num-executors选项控制在集群上分配的Executor数量,而--executor-memory和--executor-cores则控制每个执行程序的资源。

 

在Mesos模式下还有一个可选项,那就是动态共享Core。在这种模式下,Spark
App依然拥有固定数量和独立的内存(spark.executor.memory设置),但是当Spark
App在一个机器上没有运行的task的时候,其它的程序可以使用这些cores运行tasks。当你需要运行大量不活跃的Spark
App时,此模式是有用的。但是此模式,是有一定风险的,因为当Spark
App需要恢复使用这些cores的时候,需要等待一些时间才能使用这些core去执行任务。要使用此模式,只需要使用mesos://URL并将spark.executor.coarse设置为false。

 

请注意,目前没有提供任何夸应用程序的内存共享。如果想进行夸应用程序共享数据,建议使用第三方存储,例如tachyon来实现内存共享。

 

三,动态资源申请

Spark提供了一种动态调整应用程序占用资源的机制。 这意味着如果您的应用程序不再使用,您的应用程序可能会将资源返回给群集,并在需要时再次请求它们。
如果多个应用程序在Spark群集中共享资源,则此功能特别有用。

在任何coarse-grained集群管理器(Standalone,yarn ,mesos coarse-grained模式 )此功能禁用。

 

1,配置

用这种配置有两个要求:

第一,需要Spark App设置spark.dynamicAllocation.enabled为true。


第二,需要在集群中每个节点启用外部shuffle服务,设置spark.shuffle.service.enabled为true。使用这种模式,运行应用程序在深处的时候,不需要删除它们输出的shuffle的文件。根据不同的集群管理器,该服务的设置方式稍微有点不同。

在Standalone模式下,仅仅在 spark.shuffle.service.enabled设置为true的情况下启动你的Worker。


在mesos模式下,在 spark.shuffle.service.enabled设置为true的情况下,在所有节点运行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh脚本。

在YARN模式下,按如下所示在每个NodeManager上启动shuffle 服务:

A),在编译Spark的时候要添加yarn属性。假如,已经添加该属性,并分发到集群中,跳过此步骤。

B),指定spark-<version>-yarn-shuffle.jar。假如你自己编译的Spark该jar应该在
下面的目录里$SPARK_HOME/network/yarn/target/scala-<version> ,以分发到集群的方式里,可以放到lib里面。

C),将该jar放到NodeManager的Classpath里面。


D),在每个节点的yarn-site.xml中,给属性yarn.nodemanager.aux-services增加一个spark_shuffle值,然后yarn.nodemanager.aux-services.spark_shuffle.class设置为org.apache.spark.network.yarn.YarnShuffleService.

E),重启所有的NodeManager。

 

2,资源申请策略

在高层次上,Spark在不再使用时应放弃executors ,在需要时申请executors 。
既然没有确定的方法可以预测即将被删除的executors 是否会在不久的将来执行任务,还是将要添加的新的执行者实际上是空闲的,那么我们需要一套启发式来确定
何时删除,请求executors 。

2.1,请求策略

允许动态申请内存的Spark应用程序在存在等待执行的task的时候会申请额外的Executors。


Spark会轮训询申请资源。当有spark.dynamicAllocation.schedulerBacklogTimeout秒的挂起任务时触发实际的请求,然后如果挂起的任务队列仍然存在,则每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒再次触发。另外每次申请的Executors会基于上次申请的数目以指数的形式增长。比如,应用程序第一轮增加了1个Executor,那么接着会是2,4,8.


指数增长政策的动机是双重的。应用程序申请Executor应该谨慎,证明少量Executor即可完成任务。这反映了TCP缓慢启动的理由。第二,应用程序应该能够及时提高其资源使用情况,以证明实际需要许多Executor。

2.2 删除策略

删除executors 的策略要简单得多。Spark
App在executors 空闲时间超过spark.dynamicAllocation.executorIdleTimeout 秒后删除Executors。请注意,在大多数情况下,这种情况与请求条件相互排斥,因为如果仍然有待执行的任务时,Executor不应该空闲。

 

3,Executors优雅退出

在动态分配之前,Spark
Executors在出现故障或退出相关应用程序时退出。在这两种情况下,与Executors相关联的所有状态不再需要,可以被安全地丢弃。然而,通过动态分配,当Executors被显式删除时,应用程序仍在运行。如果应用程序尝试访问由Executors存储或写入的状态,则必须执行重新计算状态。因此,Spark需要一种机制,通过在删除执行程序之前保留其状态才能正常退出Executors。


这个要求对于shuffle尤其重要。Shuffle期间,SparkExecutor首先写map输出到磁盘,然后当做文件服务器,提供给其它Executor获取这些输出文件。存在落后的tasks,他们运行的时间比其它tasks长,动态申请的Executor有可能在shuffle未结束之前就被移除了,在这种情况下,shuffle输出文件必须要重新计算,这个是很没必要的。

可以使用外部shuffle服务保存shuffle输出文件,从spark
1.2开始引入。此服务是指一个长期运行的进程,它们独立于Spark应用程序及其executors,在集群的每个节点上运行。如果服务已启用,Spark执行程序将从服务中获取shuffle文件,而不是从其它Executor。这意味着由执行人员写入的任何shuffle 状态可能会继续执行超出Executor的生命周期。


除了写shuffle文件之外,执行程序还可以在磁盘或内存中缓存数据。但是,当执行器被删除时,所有缓存的数据将不再可访问。为了避免这种情况,默认的包含缓存数据的executors 永远不会被删除。您可以使用spark.dynamicAllocation.cachedExecutorIdleTimeout配置此行为。在将来的版本中,缓存的数据可能会通过堆栈外的存储来保存,这个存储类似于通过外部shuffle服务来保存shuffle文件的方式。

 

四,Spark App内部调度

在给定的Spark应用程序(SparkContext实例)中,如果从单独的线程提交多个并行作业,则可以同时运行。“job”,在本节中,我们是指一个Spark
action(例如,save,collect)以及任何需要运行的任务以评估该动作。Spark的调度程序是完全线程安全的,并支持这种用例来启用提供多个请求的应用程序(例如,多个用户的查询)。


默认情况下,Spark的调度程序以FIFO方式运行作业。每个job会被划分成很多stage(例如,map阶段,reduce阶段),在第一个job运行技术之后,第二个job才有会去执行。如果在队列头部的job不需要使用集群的全部资源,那么后面的job可以立即执行。队列头部的job很大的话,其余的job必须推迟执行。

从Spark
0.8开始,也可以在作业之间配置公平的共享。在公平分享下,Spark以“循环”方式在任务之间分配tasks,使所有job获得大致相等的集群资源份额。这意味着长job运行期间提交的短job,也可以立即获取到资源,并且仍然可以获得良好的响应时间,而无需等待长job完成。此模式最适合多用户。

要启用公平调度程序,只需在配置SparkContext时将spark.scheduler.mode属性设置为FAIR:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

 

1,公平调度池


公平调度程序还支持将作业分组到池中,并为每个池设置不同的调度选项(例如权重)。这对于为更重要的job创建“高优先级”池是有用的,或将每个用户的job分组在一起,并给予用户相等的份额,而不管他们有多少并发作业,而不是给予作业相等的份额。这种方法是在Hadoop
Fair Scheduler之后建模的。这种方法是在Hadoop Fair Scheduler之后建模的。

没有任何干预,新提交的作业进入默认池,但是可以通过向提交的线程中的SparkContext添加spark.scheduler.pool“local
property”来设置作业的池。如下:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")


设置此本地属性后,在此线程中提交的所有作业(通过此线程中的调用到RDD.save,count,collect等)将使用此pool 名称。该设置是每个线程,使得线程可以代表同一用户运行多个作业变得容易。如果要清除线程与之关联的池,只需调用:

sc.setLocalProperty("spark.scheduler.pool", null)

 

2,池的默认行为


默认情况下,每个pool获得相同的集群份额(在默认池中每个job获得相同的共享资源),但是每个池中的作业依然是FIFO的顺序运行。例如,如果您为每个用户创建一个池,这意味着每个用户将获得该群集的相等份额,并且每个用户的查询将按顺序运行。

 

3,配置池属性

特定池的属性也可以通过配置文件进行修改。 每个池支持三个属性:

A),schedulingMode

这可以是FIFO或FAIR,以控制池中的作业以队列的形式顺序执行(默认),或公平分享池的资源。

B),weight


这可以控制池相对于其他池的共享。默认情况下,所有池的权重均为1.如果将特定池的权重设置为2,例如,它将获得两倍的资源相比其他活跃的池。设置诸如1000之类的高重量也使得可以在池之间实现优先级 - 实质上,weight-1000池将始终在任务激活时首先启动任务。

C),minShare


除了总体weight之外,每个池可以被给予管理员希望具有的最小份额(如一些CPU内核)。公平调度员总是尝试在根据权重重新分配额外的资源之前满足所有活动池的最小份额。因此,minShare属性可以是另一种确保池总是能够快速获得一定数量的资源(例如10个内核)的方法。默认情况下,每个池的minShare为0。

可以通过创建类似于conf /
fairscheduler.xml.template的XML文件,并在SparkConf中设置spark.scheduler.allocation.file属性来设置池属性。

conf.set("spark.scheduler.allocation.file", "/path/to/file")

XML文件的格式只是每个池的一个<pool>元素,其中包含不同的元素,用于各种设置。 例如:

<?xml version="1.0"?>
  <allocations>
    <pool name="production">
      <schedulingMode>FAIR</schedulingMode>
      <weight>1</weight>
      <minShare>2</minShare>
    </pool>
    <pool name="test">
      <schedulingMode>FIFO</schedulingMode>
      <weight>2</weight>
      <minShare>3</minShare>
    </pool>
  </allocations>

conf / fairscheduler.xml.template中还提供了一个完整示例。
请注意,没有在XML文件中配置的任何池将简单地获取所有设置(调度模式FIFO,权重1和minShare 0)的默认值。

 

尾巴:

后面会出文章写一个基于netty的Spark作为一种查询服务的Spark App。

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信