defmain(argStrings: Array[String]) { Thread.setDefaultUncaughtExceptionHandler(newSparkUncaughtExceptionHandler( exitOnUncaughtException = false)) Utils.initDaemon(log) val conf = newSparkConf val args = newWorkerArguments(argStrings, conf) val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir, conf = conf) val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1, "Starting multiple workers on one host is failed because we may launch no more than one " + "external shuffle service on each host, please set spark.shuffle.service.enabled to " + "false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.") rpcEnv.awaitTermination() }
metricsSystem.registerSource(workerSource) metricsSystem.start() // Attach the worker metrics servlet handler to the web ui after the metrics system is started. metricsSystem.getServletHandlers.foreach(webUi.attachHandler) }
privatedefregisterWithMaster() { // onDisconnected may be triggered multiple times, so don't attempt registration // if there are outstanding registration attempts scheduled. registrationRetryTimer match { caseNone => registered = false → registerMasterFutures = tryRegisterAllMasters() connectionAttemptCount = 0 → registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( newRunnable { overridedefrun(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) } }, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) caseSome(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + " attempt scheduled already.") } }
privatedefchangeMaster(masterRef: RpcEndpointRef, uiUrl: String, masterAddress: RpcAddress) { // activeMasterUrl it's a valid Spark url since we receive it from master. activeMasterUrl = masterRef.address.toSparkURL activeMasterWebUiUrl = uiUrl masterAddressToConnect = Some(masterAddress) master = Some(masterRef) connected = true if (reverseProxy) { logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId") } // Cancel any outstanding re-registration attempts because we found a new master → cancelLastRegistrationRetry() }
有个cancelLastRegistrationRetry,继续往里走
1 2 3 4 5 6 7 8
privatedefcancelLastRegistrationRetry(): Unit = { if (registerMasterFutures != null) { registerMasterFutures.foreach(_.cancel(true)) registerMasterFutures = null } → registrationRetryTimer.foreach(_.cancel(true)) registrationRetryTimer = None }
overridedefmain(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = newSparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { caseSparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) caseSparkSubmitAction.KILL => kill(appArgs) caseSparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
privatedefsubmit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
defdoRunMain(): Unit = {...}
// In standalone cluster mode, there are two submission gateways: // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. → if (args.isStandaloneCluster && args.useRest) { try { // scalastyle:off println printStream.println("Running Spark using the REST application submission protocol.") // scalastyle:on println doRunMain() } catch { ...... } // In all other modes, just run the main class as prepared } else { → doRunMain() } }
privatedefdoPrepareSubmitEnvironment( args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { // Return values val childArgs = newArrayBuffer[String]() val childClasspath = newArrayBuffer[String]() val sparkConf = newSparkConf() → var childMainClass = ""
// Set the cluster manager → val clusterManager: Int = args.master match { case"yarn" => YARN case"yarn-client" | "yarn-cluster" => printWarning(s"Master ${args.master} is deprecated since 2.0." + " Please use master \"yarn\" with specified deploy mode instead.") YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local") -1 } ....... if (args.isStandaloneCluster) { → childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS } ....... if (deployMode == CLIENT) { → childMainClass = args.mainClass } ........ } → private[deploy] valSTANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
overridedefstart(args: Array[String], conf: SparkConf): Unit = { → val mainMethod = klass.getMethod("main", newArray[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { thrownewIllegalStateException("The main method in the given main class must be static") }
val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v }
→ mainMethod.invoke(null, args) }
直接拿到我们提交的类,通过反射运行了,也就是我们常常说的client模式程序运行在本地
再看一下,cluster模式下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
overridedefstart(args: Array[String], conf: SparkConf): Unit = { → val driverArgs = newClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) { conf.set("spark.rpc.askTimeout", "10s") } Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, newSecurityManager(conf))
overridedefonStart(): Unit = { driverArgs.cmd match { case"launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. → val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" ..... → val command = newCommand(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) ...... → val driverDescription = newDriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, command) → asyncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription))
case"kill" => val driverId = driverArgs.driverId asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) } }
// TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")) } }
privatedefschedule(): Unit = { if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors → val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 → for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } → startExecutorsOnWorkers() }
/** Starts a thread to run and manage the driver. */ private[worker] defstart() = { → newThread("DriverRunner for " + driverId) { overridedefrun() { var shutdownHook: AnyRef = null try { → shutdownHook = ShutdownHookManager.addShutdownHook { () => logInfo(s"Worker shutting down, killing driver $driverId") kill() }
// prepare driver jars and run driver → val exitCode = prepareAndRunDriver()
// set final state depending on if forcibly killed and process exit code finalState = if (exitCode == 0) { Some(DriverState.FINISHED) } elseif (killed) { Some(DriverState.KILLED) } else { Some(DriverState.FAILED) } } catch { .... }
// notify worker of final driver state, possible exception worker.send(DriverStateChanged(driverId, finalState.get, finalException)) } }.start() }
private[worker] defprepareAndRunDriver(): Int = { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir)
defsubstituteVariables(argument: String): String = argument match { case"{{WORKER_URL}}" => workerUrl case"{{USER_JAR}}" => localJarFilename case other => other }
// TODO: If we add ability to submit multiple jars they should also be added here → val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
private[worker] defrunCommandWithRetry( → command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = { var exitCode = -1 // Time to wait between submission retries. var waitSeconds = 1 // A run of this many seconds resets the exponential back-off. val successfulRunDuration = 5 var keepTrying = !killed
// Only for use by ProcessBuilder.start() static Process start(String[] cmdarray, java.util.Map<String,String> environment, String dir, ProcessBuilder.Redirect[] redirects, boolean redirectErrorStream) throwsIOException { .......
FileInputStream f0 = null; FileOutputStream f1 = null; FileOutputStream f2 = null; try { if (redirects == null) { std_fds = new int[] { -1, -1, -1 }; } else { std_fds = new int[3];
UNIXProcess(final byte[] prog, final byte[] argBlock, final int argc, final byte[] envBlock, final int envc, final byte[] dir, final int[] fds, final boolean redirectErrorStream) throwsIOException {
/** * Creates a process. Depending on the {@code mode} flag, this is done by * one of the following mechanisms: * <pre> * 1 - fork(2) and exec(2) * 2 - posix_spawn(3P) * 3 - vfork(2) and exec(2) * * (4 - clone(2) and exec(2) - obsolete and currently disabled in native code) * </pre> * @param fds an array of three file descriptors. * Indexes 0, 1, and 2 correspond to standard input, * standard output and standard error, respectively. On * input, a value of -1 means to create a pipe to connect * child and parent processes. On output, a value which * is not -1 is the parent pipe fd corresponding to the * pipe which has been created. An element of this array * is -1 on input if and only if it is <em>not</em> -1 on * output. * @return the pid of the subprocess */ → private native int forkAndExec(int mode, byte[] helperpath, byte[] prog, byte[] argBlock, int argc, byte[] envBlock, int envc, byte[] dir, int[] fds, boolean redirectErrorStream) throwsIOException;
defmain(args: Array[String]) { args.toList match { ...... // Delegate to supplied main class val clazz = Utils.classForName(mainClass) → val mainMethod = clazz.getMethod("main", classOf[Array[String]]) → mainMethod.invoke(null, extraArgs.toArray[String]) ....... rpcEnv.shutdown()
defstart() { // Just launch an rpcEndpoint; it will call back into the listener. endpoint.set(rpcEnv.setupEndpoint("AppClient", newClientEndpoint(rpcEnv))) }