当我们按照官网的介绍,执行
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
时,Spark内部是如何提交这个job的呢?
那就看看SparkSubmit.scala做了什么
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
// 根据传入的参数匹配对应的执行方法
appArgs.action match {
/**
* Submit the application using the provided parameters.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
* 二步:prepareSubmitEnvironment 和 doRunMain
*/
case SparkSubmitAction.SUBMIT => submit(appArgs)
// Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
case SparkSubmitAction.KILL => kill(appArgs)
// Request the status of an existing submission using the REST protocol.
// Standalone and Mesos cluster mode only.
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
然后看看submit方法详细内容:
private def submit(args: SparkSubmitArguments): Unit = {
/**
* Prepare the environment for submitting an application.
* This returns a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* Exposed for testing.
* 前面提交job脚本里面的master,deploy-mode等参数 全在这个方法里面会触发不同的执行操作
*/
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
/**
* Run the main method of the child class using the provided launch environment.
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
/**
* Run the main method of the child class using the provided launch environment.
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
其中prepareSubmitEnvironment最重要的代码:
if (deployMode == CLIENT || isYarnCluster) {
childMainClass = args.mainClass
...
}
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
...
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
...
}
...
}
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
...
}
if (isMesosCluster) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
...
}
runMain所需的参数就是prepareSubmitEnvironment的返回值
// runMain里面通过java的反射得到mainClass
mainClass = Utils.classForName(childMainClass)
// 得到main方法
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
// 执行main方法
mainMethod.invoke(null, childArgs.toArray)