spark源码分析资源层分析-1

spark源码分析资源层分析-1

上回我们整体分析spark源码整个Master与Worker角色之间的RPC调用启动流程,这次我们接着上回的从scheduled方法开始继续分析,具体分析Work是如何进行分配计算资源给Executor的

我们首先进入schedule方法当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
→ startExecutorsOnWorkers()
}

首先进入的是一个for循环,检测要等待启动的Drivers,但是当前我们并没有要启动driver列表,所以此次for循环并不会执行

如何给Worker分配Executor

开始进行资源规划

然后我们直接看最后的方法startExecutorsOnWorkers见名知义,这里的方法就是描述如何给Worker分配Executor计算资源的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}

进入到方法中来,首先抽取要等待启动的app列表,如果我们只用spark-submit提交了一个大数据应用,那么我们就会有一个要等待的app在这个列表当中,

然后抽取我们提交的参数 corePerExecutor对应我们的提交参数–executor-cores

如果没有的话那就默认每个executor分配一个CPU核心数

然后检测我们目前剩余的CPU核心数,如果剩余核心数大于能分配的核心数,那就开始进行核心分配,这其中有一个参数叫

spreadOutApps这个参数取自spark.deploy.spreadOut默认为false,这个参数和大数据的就近原则有很大的联系,这个等一下会聊到

进入到scheduleExecutorsOnWorkers方法当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
....
}

首先映入眼帘的是定义的一大堆初始参数,需要一个一个具体分析

corePerExecutor这个途虎不过分析过了,给每个Executor所分配的CPU核心数的参考值

minCoresPerExecutor最小给每个Excutore分配的CPU核心数,如果已经指定就用指定值,没有指定为1

oneExecutorPerWorker是否给Worker仅仅分配1个Executor,当coresPerExecutor参数没有指定的时候,因为没有给Executor指定核心数,所以默认是一个Executor将这个Worker的所有资源都占满了

memoryPerExecutor给定义的每个Executor分配的内存数,

numUsable可用的Worker

assignedCores要分配的核心数,这是一个数组,长度就是可用worker的长度,也就是说数组中一个单元映射一个Worker位

assignedExecutors要分配的Executor数,这也是一个数组,释义同上

coresToAssign目前要分配的总核心数,app.coresLeft 这个值笔者往上深究了很长时间中间牵扯到很多引用,我们只要知道这里这个的默认值就是我们提交时所定义的total-executor-cores,如果没指定就是Int最大值21亿,不过这里肯定会参考我们剩余worker里面可用的总核心数,这两个取最小值

好了初始参数了解完毕了,我们继续往下走

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
   private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
....
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
....
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}
....
}

接下来是一个局部函数,传入一个pos下标,目前猜测应该就是worker的下标,这个函数名字就是计算worker当中的资源是否够分配一个Executor的

整个函数翻译下来就是,

先判断剩余可分配的总核心数是否大于给每个Executor要分配的最小核心数,这个值要是为false肯定就不考虑是否是否能继续分配的问题了

然后判断这个worker当中除去已经分配的核心数,剩余核心数是否够分配,这个是正常的判断

最后判断一下是不是给worker只分配一个Executor,如果只分配一个Executor,那么默认所有计算资源全部沾满,所以不用考虑内存分配的问题了

如果分配多个Executor,那么判断一个之前分配了多少Executor总共多少内存,计算出剩余内存数看是否内存足够。

那么这个函数是在哪调用呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
....
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {.....}

// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor

// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}

就在当前函数的下面,是一个while循环,意思是抽取当前所有没有分配Executor的Woker列表

然后对每个worker进行遍历,对每个worker进行真正的Executor资源分配

前两句话就是对就是将总核心数以minCoresPerExecutor为单元分割给每个Executor

然后判断是否为oneExecutorPerWorker,如果是这个worker分配的Executor数始终为1,每次分割的CPU核心数也全都归到这一个Executor当中,

如果不是那么每次分配时,executor数加1,后面会讲到,最后计算的时候,会将当前worker累计的核心数除以Executor数来计算具体每个Executor分配到多少计算资源。

就近原则

然后有一个spreadOutApps布尔值会判断一下,如果为True,则目前只分配一次,然后分配下一个Worker,但是这样分配的用意是什么?

举个例子,如果我们设置分配的总核心数为9,每个Executor分配三个核心,然后我们有三台机器9核心机器,每个机器一个worker。

如果设置spreadOutApps为false,那么我们会在遍历第一个woker,给第一个Worker分配三个Executor,每个Executor三个核心数

而如果为false,我们会遍历每个worker,每个worker分配一个Executor,每个Executor核心数还为三。

那么我们就知道了,spreadOutApps,就是是否你要将计算资源平铺给每个Worker,将Executor平铺给每个Worker,

这就非常好的体现了我们大数据的一个原则,就近原则,数据就近计算资源计算我们计算资源尽可能进行平铺,那么最后,数据就不用跨集群去找计算资源了,而是计算资源就近找数据。

规划完成,开始分配

最后我们的方法返回了assignedCores数组,要分配的核心数列表

1
2
3
4
5
6
7
8
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}

最后对核心数进行真正的分配,也是遍历每个可用的Worker,然后通过allocateWorkerResourceToExecutors方法进行分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}

1.如果设置了Executor要分配的核心数,那么用当前worker分配的核心总数除以每个Exucutor的核心数那么就是当前worker的Executor数,如果没有设置,那么当前worker就分配一个Exucutor,并且将要分配的核心总数全部分配给这一个Executor

app.addExecutor(worker, coresToAssign)返回Executor组装好的执行信息

然后LaunchExecutor方法

1
2
3
4
5
6
7
8
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

其实就是给worker以及Worker的driver发送启动Executor的命令

好了这次的源码分析就到这里,下期见 ByeBye