This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Page tree
Skip to end of metadata
Go to start of metadata

Introduction

Desiderata

The goal of Spark and REEF integration is to seamlessly run REEF jobs as part of the Spark application. That is, the user would launch a Spark application (e.g. on YARN cluster), e.g. using standard spark-submit script; that Spark application, in turn, will start a REEF job that shares some or all resources with the parent Spark job; eventually, REEF application will return control back to Spark. One example of such integration could be REEF implementation of a machine learning algorithm launching from the Spark feature extraction pipeline.

We expect that the integration will require no changes to the Spark core itself, and only minimal (say, 3 to 10 LOC) fixes to the code of the user's Spark application.

Target resource managers

We have the following priorities for the target runtimes:

YARN: For the first release, our primary target is the YARN resource manager.

Local: Running Spark+REEF in local mode is also highly desirable for debugging purposes. We expect the implementation of the local mode to appear as a side effect of YARN integration.

Mesos: Integration with Mesos is currently at lower priority comparing to YARN and local runtimes. We may reconsider it later depending on business needs and available resources/volunteers. Note that, depending on the integration approach, a Spark+REEF application may work on Mesos without Mesos-specific fixes to the code.

In the subsequent sections, unless stated otherwise, we will assume YARN cluster as a primary runtime environment.

Non-goals

Currently, we only discuss Spark+REEF integration, but not the other way around. That is, we do not plan to run Spark applications from REEF.

Types of Spark+REEF integration

Unmanaged Application Master in REEF

When YARN application registers in Unmanaged AM mode, the resource manager does not provision a new container for the Application Master and expects that this process is managed by the user. That allows to start AM on the client machine or launch several AMs in one YARN container.

Starting REEF Driver in Unmanaged AM mode from Spark allows REEF and Spark Drivers to run in the same JVM process, share the data and react to each other's events. The Resource Manager still treats REEF and Spark as two different applications and manages their resources separately.

Unmanaged AM mode allows tight integration of REEF and Spark Drivers. It requires very little changes to the existing Spark code (just a few lines to launch the REEF job) and no changes at all to the REEF application.

It is also possible to launch several Unmanaged AM REEF jobs (even simultaneously) from the same Spark Driver.

Still, Unmanaged AM mode allows the Driver-side integration only. It does not help with the worker nodes' (REEF Evaluators and Spark Executors) integration.

Unmanaged AM status in REEF

Unmanaged AM mode is fully implemented as part of REEF YARN Java runtime since REEF release 0.17. It requires YARN resource manager version 2.7.3 or greater.

There is a Scala example of REEF application running from Spark in Unmanaged AM mode at org.apache.reef.examples.hellospark package in REEF.

Data-driven integration

One possible workaround to achieve a certain level of Spark and REEF integration is to use the high-level Spark API. This way, REEF would never communicate with the resource manager directly; instead, REEF runtime would use Spark functions like .map() to launch REEF Evaluators (and Tasks) at Spark-managed Executor containers.

An obvious drawback of such approach is that we will always need some (partitioned) Spark data to allocate REEF Evaluators; that means using fake partitions to allocate Evaluators that do not require data. We need to know exactly how Spark partitions data across its containers and make sure that our strategy will not fail with the new releases of Spark.

Still, the simplicity of data-driven approach and its independence from any particular RM makes it a good candidate for at least the first phase of Spark+REEF integration.

Also, note that we can still combine data-driven Evaluator allocation with the Unmanaged AM REEF Driver, using the multi-runtime feature of REEF (see reef-runtime-multi REEF module).

High-level REEF API

There are two possible ways to expose data-driven Evaluator allocation in REEF API: via REEF DataLoader service or using the specialized EvaluatorRequestor. We'll discuss both approaches below.

DataLoader service

DataLoader service is part of the standard REEF API in both Java and C# and is used in many REEF applications. It can be viewed as a partial implementation of REEF Driver that handles management of REEF Evaluators according to the configuration defined with DataLoadingRequestBuilder. A typical configuration looks like this:

final Configuration config = new DataLoadingRequestBuilder()
    .setInputFormatClass(TextInputFormat.class)
    .setInputPath("hdfs://data/criteo")
    .setNumberOfDesiredSplits(50)
    .addComputeRequest(
        EvaluatorRequest.newBuilder()
          .setNumber(1)
          .setMemory(4096)
          .build())
    .renewFailedEvaluators(false)
    .setDriverConfigurationModule(
        DriverConfiguration.CONF
          .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getAllClasspathJars())
          .set(DriverConfiguration.DRIVER_MEMORY, "4096")
          .set(DriverConfiguration.ON_CONTEXT_ACTIVE, ...)
          .set(DriverConfiguration.ON_TASK_RUNNING, ...)
          .set(DriverConfiguration.ON_TASK_COMPLETED, ...)
          .set(DriverConfiguration.DRIVER_IDENTIFIER, "Criteo-train"))
    .build();

Here we define a data loading Driver that will split the data into partitions and allocate 50 Evaluators, one per each partition. .addComputeRequest() call specifies the request for one additional Evaluator not associated with the data (e.g. to use as a master node).

One example of data loader use can be found e.g. in REEF sample implementation of batch gradient descent in org.apache.reef.examples.group.bgd.BGDClient class.

REEF applications that use data loader can be integrated with Spark by extending the request builder (e.g. by making .setInputPath() accept RDDs). All other Spark-specific details can be completely hidden from the REEF user and implemented in the reef-runtime-spark module.

Extended EvaluatorRequestor

Another way to expose Spark-specific API in REEF can be through the class derived from the evaluator requestor (say, SparkEvaluatorRequestor) that will be injected at the Driver. The user would define REEF Driver as usual, but use SparkEvaluatorRequestor to allocate the Evaluators, e.g.

SparkEvaluatorRequest.newBuilder()
    .usePartitions(rdd)
    .setNumber(1)
    .setMemory(4096)
    .build();

Here .usePartitions(rdd) will associate REEF Evaluators with corresponding partitions of the Spark RDD. A call .setNumber(1) requests one additional Evaluator that is not associated with any Spark data.

As with the Data Loader, the rest of the implementation would be encapsulated in the reef-runtime-spark module.

Using low-level Spark API

Harder to implement, but more flexible approach would be to use the low-level Spark API and use it as a layer between REEF application and the Resource Manager, without communicating with the RM directly. Spark API has methods to allocate and de-allocate containers; REEF runtime can use them to launch Evaluators. REEF Driver also has to intercept messages from Resource Manager to Spark Driver and translate them into corresponding Driver events. Low-level Spark API allows such event listeners, but the implementation might need to distinguish events originated by REEF from those originated by other parts of the Spark application (e.g. using RM tags).

Using low-level Spark API looks very promising in the long run and is definitely more flexible than the data-driven approach described earlier. Still, we need to do more research to make sure that the existing Spark methods and listeners are sufficient to implement the entire REEF runtime.

Support for REEF.NET

Integrating REEF.NET applications with Spark is very important. However, some peculiarities in REEF code make such integration harder than it should be. In this section we will discuss some of the REEF issues related to Spark integration.

REEF.NET Bridge

Regardless of the runtime, REEF Driver is always a Java application. It uses Java JNI to invoke the driver-side event handlers implemented in .NET. A layer of C++ code that connects Java and C# parts of the REEF Driver is called REEF.NET Bridge. It is fast and reliable, and works fine for cases when REEF user controls the application submission (e.g. via REEF Client).

However, using REEF.NET Bridge makes impossible using standard Spark submission scripts (like spark-submit) to launch the Spark+REEF application, because REEF.NET Driver entry point is not a JAR, but a C++ application compiled as a Windows .EXE file. That makes it impossible to launch existing REEF .NET code from Spark without changing the Spark submission code.

More Spark-friendly approach would be to start REEF Driver purely in Java, and launch .NET VM as a separate process that communicates vith JVM part via TCP. Replacing REEF.NET C++ Bridge with TCP protocol should be completely transparent to both Spark and the existing REEF.NET applications.

Work on the TCP Bridge is already in progress as part of REEF .NET Core / Linux migration effort. We expect the first version of the new bridge to appear in REEF 0.17 release.