若泽大数据 www.ruozedata.com

ruozedata


  • 主页

  • 归档

  • 分类

  • 标签

  • 发展历史

  • Suche

每天起床第一句,看看Spark调度器

Veröffentlicht am 2019-01-18 | Bearbeitet am 2019-06-14 | in Spark Other | Aufrufe:

之前呢,我们详细地分析了DAGScheduler的执行过程,我们知道,RDD形成的DAG经过DAGScheduler,依据shuffle将DAG划分为若干个stage,再由taskScheduler提交task到executor中执行,那么执行task的过程,就需要调度器来参与了。

Spark调度器主要有两种模式,也是大家耳熟能详的FIFO和FAIR模式。默认情况下,Spark是FIFO(先入先出)模式,即谁先提交谁先执行。而FAIR(公平调度)模式会在调度池中为任务进行分组,可以有不同的权重,根据权重来决定执行顺序。

那么源码中是怎么实现的呢?

首先,当Stage划分好,会调用TaskSchedulerImpl.submitTasks()方法,以TaskSet的形式提交给TaskScheduler,并创建一个TaskSetManger对象添加进调度池。

1
2
3
4
5
6
7
8
9
10
11
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
//....
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
//.....
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

SchedulerBulider通过TaskSchedulerImpl.initialize()进行了实例化,并调用了SchedulerBulider.buildPools()方法。具体怎么个build,就要看用户选择的schedulingMode了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}

然后我们来看一下两个调度器的buildPools()方法。

1
2
3
override def buildPools() {
// nothing
}

FIFO什么也没干~~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
override def buildPools() {
var fileData: Option[(InputStream, String)] = None
try {
fileData = schedulerAllocFile.map { f =>
val fis = new FileInputStream(f)
logInfo(s"Creating Fair Scheduler pools from $f")
Some((fis, f))
}.getOrElse {
val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
if (is != null) {
logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
Some((is, DEFAULT_SCHEDULER_FILE))
} else {
logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
None
}
}

fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
} catch {
case NonFatal(t) =>
val defaultMessage = "Error while building the fair scheduler pools"
val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
.getOrElse(defaultMessage)
logError(message, t)
throw t
} finally {
fileData.foreach { case (is, fileName) => is.close() }
}

// finally create "default" pool
buildDefaultPool()
}
ruozedata WeChat Bezahlung
# 高级 # Spark
再谈,某头条公司Spark结构化流的SQL实现
Spark监控报错javax.servlet.http.HttpServletRequest.isAsyncStarted

ruozedata

若泽数据优秀博客汇总
155 Artikel
31 Kategorien
74 schlagwörter
RSS
GitHub B站学习视频 腾讯课堂学习视频 官网
|
若泽数据
|