Creating a YARN Application using Scala
I have been recently playing with Apache Amaterasu, which is an amazing project that helps to deploy data pipelines. It's still incubating and has a super-friendly team of engineers working on it. Some exciting features are lined up. Don't take my word for it. Please check it out yourself.
Amaterasu launches containers (on YARN/Mesos) all by itself for each of the stages in your data pipeline. All you need to provide is your repository and a YAML based configuration.
I was just curious about launching containers on YARN and how the API works and thought I should give it a try myself. It's really intuitive if we understand a small set of constructs.
This post is an attempt on a minimal example of a YARN application on Scala.
Note : The complete code is available at github
There are three core classes in this App :
SampleYarnClient
ApplicationMaster
DummyApplication
(the business logic)
1. SampleYarnClient
This is the entry-point of the program. Does the following :
- Instantiates
YarnClient
. - Negotiates resources for the ApplicationMaster container with the help of the
YarnClient
. The way it does it is to initiate anApplicationSubmissionContext
which is just a wrapper around theResource
,Priority
andContainerLaunchContext
(among others). Let's quickly look at the code and we'll go over in detail on these three components of the SubmissionContext.
val yarnClient = YarnClient.createYarnClient()
...
val application = yarnClient.createApplication()
...
val context = application.getApplicationSubmissionContext
context.setAMContainerSpec(amContainer)
context.setApplicationName("Scala Yarn App")
context.setResource(resource)
context.setPriority(priority)
yarnClient.submitApplication(context)
a. Resource
Resources is a simple wrapper around CPU and Memory
val resource = Resource.newInstance(1024, 2) //1 GB memory and 2 cores
b. Priority
Priority is just an Integer - the higher the number, the higher the priority
val priority = Records.newRecord(classOf[Priority])
priority.setPriority(1)
c. ContainerLaunchContext
The ContainerLaunchRequest has three primary parameters in this simple example :
- Commands (
List[String]
): The bootstrap command (ideallyjava <MainClass>
) - LocalResources (
Map[String,LocalResource]
) : The jars and the other artifacts (properties, libraries etc) that's essential for running your command - Environment (
Map[String,String]
): The environment variables essential for the program
(the other important one is the Security Tokens which is not used here because my local cluster isn't kerberized)
def createContainerContext(commands: List[String], resources: Map[String, LocalResource], environment: Map[String, String]): ContainerLaunchContext = {
val launchContext = Records.newRecord(classOf[ContainerLaunchContext])
launchContext.setCommands(commands.asJava)
launchContext.setLocalResources(resources.asJava)
launchContext.setEnvironment(environment.asJava)
launchContext
}
Commands
Like I said, the commands are just a sequence of instructions you would like to execute to run the ApplicationMaster from the shell.
val commands = List(
"$JAVA_HOME/bin/java " +
" -Xmx256m " +
s" com.arunma.ApplicationMaster " +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
)
LocalResources
As for this program, you don't need any properties or configuration files. All you need is just the jar binary alone.
Note :
The way that you make any binary or resource available for YARN is to place the binaries in a HDFS location. This particular step involving the setting up of local resources just means that we are telling YARN to download the binaries from the HDFS location and place in in the local path of the container when launched.
val localResources = Map(
"SampleYarnApp-assembly-0.1.jar" -> setUpLocalResourceFromPath(yarnPath)
)
Environment
These are custom environment variables or just the classpath that needs to be set for your bundle to run.
2. ApplicationMaster
Now that we have discussed about the SampleYarnClient
, let's discuss the second. This class, as the name indicates, is your ApplicationMaster
(Duh!). It's responsible for launching the containers that is expected to run your "business logic". The steps are :
- The AppMaster uses the ResourceManager client (the
AMRMClient
) to raise a request for a container -ContainerRequest
. This example uses the async version of the client -AMRMClientAsync
that implements a series of callbacks -onContainersAllocated
,onContainersCompleted
,onError
etc . - When the RM allocates a container for the application, the
onContainersAllocated
callback gets invoked. - Within the
onContainersAllocated
(now that we have the handle to the container), the AppMaster then uses theNMClientAsync
to launch the "business" container (DummyApplication
). This is achieved by constructing anotherContainerLaunchContext
(the one that wraps commands, local resources and environment variables).
override def onContainersAllocated(containers: util.List[Container]): Unit = {
val commands = List(
"$JAVA_HOME/bin/java " +
" -Xmx256m " +
s" com.arunma.DummyApplication " +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
)
val localResources = Map(
"SampleYarnApp-assembly-0.1.jar" -> setUpLocalResourceFromPath(FileSystem.get(conf).makeQualified(new Path(sys.env("ARUN_JAR_PATH"))))
)
val containerLaunchContext = createContainerContext(commands, localResources, buildEnvironment(Map()))
containers.asScala.foreach { container =>
nmClient.startContainerAsync(container, containerLaunchContext)
}
}
3. DummyApplication
This is the "business logic". Not Scala
in the purest sense but it helps us see the logs. Note that because this is an infinite loop, we'll have to forcefully kill the application.
object DummyApplication extends App {
while(true) {
println("Niceeeeeeeeee !!! This is the core application that is running within the container that got negotiated by from Application Master !!!")
Thread.sleep(1000)
}
}
Usage:
$ hadoop jar /Users/arun/IdeaProjects/SampleYarnApp/target/scala-2.11/SampleYarnApp-assembly-0.1.jar com.arunma.SampleYarnClient /Users/arun/IdeaProjects/SampleYarnApp/target/scala-2.11/SampleYarnApp-assembly-0.1.jar
Alternatively, you could just do an
sbt assembly
and run the SampleYarnClient
from your IDE with the absolute path of the assembly jar as the first argument.