Lemcoden

来自于大数据攻城狮的分享

spark源码分析资源层分析-1

上回我们整体分析spark源码整个Master与Worker角色之间的RPC调用启动流程,这次我们接着上回的从scheduled方法开始继续分析,具体分析Work是如何进行分配计算资源给Executor的

我们首先进入schedule方法当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
→ startExecutorsOnWorkers()
}

首先进入的是一个for循环,检测要等待启动的Drivers,但是当前我们并没有要启动driver列表,所以此次for循环并不会执行

如何给Worker分配Executor

开始进行资源规划

然后我们直接看最后的方法startExecutorsOnWorkers见名知义,这里的方法就是描述如何给Worker分配Executor计算资源的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}

进入到方法中来,首先抽取要等待启动的app列表,如果我们只用spark-submit提交了一个大数据应用,那么我们就会有一个要等待的app在这个列表当中,

然后抽取我们提交的参数 corePerExecutor对应我们的提交参数–executor-cores

如果没有的话那就默认每个executor分配一个CPU核心数

然后检测我们目前剩余的CPU核心数,如果剩余核心数大于能分配的核心数,那就开始进行核心分配,这其中有一个参数叫

spreadOutApps这个参数取自spark.deploy.spreadOut默认为false,这个参数和大数据的就近原则有很大的联系,这个等一下会聊到

进入到scheduleExecutorsOnWorkers方法当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
....
}

首先映入眼帘的是定义的一大堆初始参数,需要一个一个具体分析

corePerExecutor这个途虎不过分析过了,给每个Executor所分配的CPU核心数的参考值

minCoresPerExecutor最小给每个Excutore分配的CPU核心数,如果已经指定就用指定值,没有指定为1

oneExecutorPerWorker是否给Worker仅仅分配1个Executor,当coresPerExecutor参数没有指定的时候,因为没有给Executor指定核心数,所以默认是一个Executor将这个Worker的所有资源都占满了

memoryPerExecutor给定义的每个Executor分配的内存数,

numUsable可用的Worker

assignedCores要分配的核心数,这是一个数组,长度就是可用worker的长度,也就是说数组中一个单元映射一个Worker位

assignedExecutors要分配的Executor数,这也是一个数组,释义同上

coresToAssign目前要分配的总核心数,app.coresLeft 这个值笔者往上深究了很长时间中间牵扯到很多引用,我们只要知道这里这个的默认值就是我们提交时所定义的total-executor-cores,如果没指定就是Int最大值21亿,不过这里肯定会参考我们剩余worker里面可用的总核心数,这两个取最小值

好了初始参数了解完毕了,我们继续往下走

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
   private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
....
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
....
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}
....
}

接下来是一个局部函数,传入一个pos下标,目前猜测应该就是worker的下标,这个函数名字就是计算worker当中的资源是否够分配一个Executor的

整个函数翻译下来就是,

先判断剩余可分配的总核心数是否大于给每个Executor要分配的最小核心数,这个值要是为false肯定就不考虑是否是否能继续分配的问题了

然后判断这个worker当中除去已经分配的核心数,剩余核心数是否够分配,这个是正常的判断

最后判断一下是不是给worker只分配一个Executor,如果只分配一个Executor,那么默认所有计算资源全部沾满,所以不用考虑内存分配的问题了

如果分配多个Executor,那么判断一个之前分配了多少Executor总共多少内存,计算出剩余内存数看是否内存足够。

那么这个函数是在哪调用呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
....
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {.....}

// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor

// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}

就在当前函数的下面,是一个while循环,意思是抽取当前所有没有分配Executor的Woker列表

然后对每个worker进行遍历,对每个worker进行真正的Executor资源分配

前两句话就是对就是将总核心数以minCoresPerExecutor为单元分割给每个Executor

然后判断是否为oneExecutorPerWorker,如果是这个worker分配的Executor数始终为1,每次分割的CPU核心数也全都归到这一个Executor当中,

如果不是那么每次分配时,executor数加1,后面会讲到,最后计算的时候,会将当前worker累计的核心数除以Executor数来计算具体每个Executor分配到多少计算资源。

就近原则

然后有一个spreadOutApps布尔值会判断一下,如果为True,则目前只分配一次,然后分配下一个Worker,但是这样分配的用意是什么?

举个例子,如果我们设置分配的总核心数为9,每个Executor分配三个核心,然后我们有三台机器9核心机器,每个机器一个worker。

如果设置spreadOutApps为false,那么我们会在遍历第一个woker,给第一个Worker分配三个Executor,每个Executor三个核心数

而如果为false,我们会遍历每个worker,每个worker分配一个Executor,每个Executor核心数还为三。

那么我们就知道了,spreadOutApps,就是是否你要将计算资源平铺给每个Worker,将Executor平铺给每个Worker,

这就非常好的体现了我们大数据的一个原则,就近原则,数据就近计算资源计算我们计算资源尽可能进行平铺,那么最后,数据就不用跨集群去找计算资源了,而是计算资源就近找数据。

规划完成,开始分配

最后我们的方法返回了assignedCores数组,要分配的核心数列表

1
2
3
4
5
6
7
8
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}

最后对核心数进行真正的分配,也是遍历每个可用的Worker,然后通过allocateWorkerResourceToExecutors方法进行分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}

1.如果设置了Executor要分配的核心数,那么用当前worker分配的核心总数除以每个Exucutor的核心数那么就是当前worker的Executor数,如果没有设置,那么当前worker就分配一个Exucutor,并且将要分配的核心总数全部分配给这一个Executor

app.addExecutor(worker, coresToAssign)返回Executor组装好的执行信息

然后LaunchExecutor方法

1
2
3
4
5
6
7
8
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

其实就是给worker以及Worker的driver发送启动Executor的命令

好了这次的源码分析就到这里,下期见 ByeBye

背景描述

接了一个简单的项目,我们直接以前端嵌入spring的方式开发,vue的前端工程师打好包,我们直接放入到resourcse/static目录下,在这样的情况下踩了一些坑

Read more »

事情缘由

笔者最近要参加ECE(ElastickSearch Certificated Engineer)考试,但是遇到个问题

我的电脑配置足够开es虚拟机集群但是公司的内网IP地址和家里的内网IP地址不同(公司是以10.72.33开头的网段,而家里的网段是以192.168.0开头的网段),导致如果在两地练习ES需要频繁的更改虚拟机的IP地址,这是让人很头疼的事.

一种方法是自己买一个路由器,将自己的路由器的网关地址修改成公司的网关地址.然后我们的电脑连接自己的路由器就可以了.

因为疫情缘故,买路由器快递有点慢,后来看到有将ubuntu改造成DHPC服务器的方法,正好自己曾经买了一个工控主机,所以就决心试一下

准备环境

首先需要ubuntu server20.04的系统,其他版本修改IP的方法,请自行百度

关于Ubuntu办公软件

原生的office套件着实让人头疼,不仅打开缓慢,UI界面也是90年代的风格

所以还是WPS吧,适合国人的操作习惯,并且有原生的linux版本支持

不需要再不断繁琐的调试wine这样的反人类的虚拟层

关于markdown编辑器

Read more »

es踩坑日记(持续更新…)

最近有接到es的开发任务,比较紧急,而我又是es小白,没时间系统化的学习,故记录一下自己的踩坑点,回头会将这些零碎知识真正系统化.

提前说明:

  1. 目前的两次踩坑基于Java的RestHighLevelClient
  2. 本人是根据mysql的sql基础进行从已知推未知,精通es开发的小伙伴可以略过

多条件查询前提

Read more »

从linux内核函数角度,JAVA角度聊IO发展史,并叙述netty

前言

前些天一直在看netty,感觉网络博客少有从内核函数的角度直接剖析io到nio的发展,并和java的netty框架结合起来的,导致很多人对netty的概念仅仅停留在抽象的角度,就损失很多细节方面的东西以及对io模型的整体把控,所以写了这一篇博客,希望读者读后能发现新的东西,以及有一个系统的知识,当然因为时间问题,本篇博客会写的简单一些,后期可能会写出更加细节系统化的系列.

先聊linux

Read more »

书接上文

上次源码分析之后,笔者自己通读了一遍,发现有些地方,有些问题没有说明白,所以就上篇博客遗留的问题做一个回答

序列化器在哪用到了?

上此聊到RPCEnv对象创建的时候,会创建序列化器,2.3.4版本使用Java默认的序列化器,然后在哪里用呢?

这里先给出NettyRpcHandler recive的代码(怎么找到的?请看上篇博客)

1
2
3
4
5
6
7
  override def receive(
client: TransportClient,
→ message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val messageToDispatch = internalReceive(client, message)
dispatcher.postRemoteMessage(messageToDispatch, callback)
}

我们看到message原本是字节缓存,通过internalReceive方法组装成RequestMessage对象

Read more »

问题陈述

前两天准备更新一篇源码阅读相关的博客,但是使用hexo d部署到github上面之后页面死活不更新

详细查看后发现,之前看hexo官网的中文文档,所使用持续集成功能所托管的网站,travis-ci.org网站搬迁到travis-ci.com并且集成服务也做了一些修改,更加的定制化.

不想看中间流程的同学可以直接进入到最后一个标题

尝试解决问题

我尝试在新的travis-ci.com上托管持续集成的功能,发现会发生很多的错误,并且新的持续集成功能定制化的功能很多,这导致我需要一步一步把travics-ci的官方文档看一边,才知道哪些流程出错了,哪些定制化开发流程需要重新配置.

但是,对于我一个node.js一点都不懂的java工程师而言,这样的学习成本很高,并且程序员都比较繁忙,很难抽出空来再去学习本专业无关的事.(仅指nodejs语言,非集成部署方式)

Read more »

spark-core源码分析01(RPC环境)

本篇源码分析,主要就Spark Standlone(spark2.3.4版本)资源管理的RPC调用部分进行总结

RPC调用概述

RPC调用其实并不是很高深,它特指某类通信技术,它的应用其实特别广泛,我们经常所说的http协议也是一种特殊RPC调用,http协议就定义了请求的方式方法post,get,delete,update

而通过看源码我们会发现,spark中Master和Worker之间也定义了相似的消息投递规则即send,ask,recive,reciveAndReply

RPC的原理很简单,但是落地到实际生产环境中需要做的细活很多,比如拆包粘包问题,动态代理库的使用,线程池,链接池,传输层的封装等等

这里简单给出RPC框架的简单架构

Read more »

起因

作为ubuntu开源爱好者,迅雷那样的流氓软件当然能不使用就不使用

那么,我们使用什么开源软件的进行下载呢?

首先我们看一下都有哪些下载协议

第一种thunder://QUFtYWduZXQ6P3h0PXVybjpidGloOjAzRjYxODA0RTFFQzFCMTQyQzU0RERCNUQ3QjhCRUQ2OUIxREY2MzhaWg==

不得不说这完全是迅雷自己yy出来的一种协议,其实就是其他协议中包了个壳子,只要将协议后面的等号去掉,将中间的一堆乱码,base64解码一下就可以出来,比如上面的协议解码出来是这样子

1
2
3
//在unbuntu中通过base64命令,将代码解码,解码出来的数据去掉开头的AA和结尾的ZZ就是真正的下载url
lemcoden@unbuntu:~$ echo "QUFtYWduZXQ6P3h0PXVybjpidGloOjAzRjYxODA0RTFFQzFCMTQyQzU0RERCNUQ3QjhCRUQ2OUIxREY2MzhaWg==" | base64 -d
AAmagnet:?xt=urn:btih:03F61804E1EC1B142C54DDB5D7B8BED69B1DF638ZZ

解码出来如果是http就用浏览器直接下载,如果不是,像上面这种的BT种子协议,我们用trasmission

第二种

Read more »
0%