Status

Current state: Accepted

Discussion thread[Discuss] SEP-23: Simplify Job Runner

JIRASAMZA-2405 

Released: 

Problem

Samza Yarn follows a multi-stage deployment model, where Job Runner, which runs on the submission host, reads configuration, performs planning and persists config in the coordinator stream before submitting the job to Yarn cluster. In Yarn, Application Master (AM) reads config from coordinator stream before spinning up containers to execute. Split of responsibility between job runner and AM is operationally confusing, and makes debugging the pipeline difficult with multiple points of failure. In addition, since planning invokes user code, it usually requires isolation on the runner from security perspective to guard the framework from malicious user code, or a malicious user can gain access to other user jobs running on the same runner

Proposed Changes

We will simplify job runner to only submit the Samza job to Yarn with provided submission configs, without invoking other complex logic such as config retrieval, planning, DAG generation, coordinator stream persisting etc. 

These configs include

  • configs directly related to job submission, such as yarn.package.path, job.name etc.
  • configs needed by the config loader on AM to fetch job config, such as path to the property file in the tarball, all of such configs will have a job.config.loader.properties prefix.
  • configs that users would like to override.

In addition, these configs can only be supplied by --config, job runner will not read configs from local file anymore. We will use the term "submission config" to refer to configs provided using --config during submission.

This is in favor because we don't need to maintain the old launch workflow and we can eliminate the need to read configs multiple times. This is also consistent with other stream processing frameworks, such as Flink, Spark and Dataflow.

In order to simplify job runner, we need to move the config retrieval logic from runner to AM, which is a prerequisite of planning, DAG generation. We will achieve this by providing a new config loader interface, which will be used by AM to fetch config directly. AM will invoke config loader to fetch config, perform planning, generate DAG and persist the final config back to coordinator stream for backward compatibility. This suggests that AM may need extra CPU and/or memory compared to existing flow. 

We will force users to update how they start their Samza jobs. In case of problems, users should be able to roll back to Samza 1.3, see Compatibility, Deprecation, and Migration Plan for more details.

Public Interfaces

The following submission config will be introduced to configure loader class on AM to fetch config, which points to a ConfigLoader class:

  • job.config.loader.class

and the following config prefix:

  • job.config.loader.properties.

ConfigLoader

Interface which AM relies on to read configuration. It takes in a properties map, which defines the variables it needs in order to get the proper config.

This interface will replace the existing ConfigFactory interface as we no longer need complex configs in runner anymore. Providing minimum Yarn related configs using --config when invoking run-app.sh will be sufficient.

public interface ConfigLoader {
  /**
   * Build a specific Config given job submission config.
   * @param config Config specified during job submission containing information necessary for this ConfigLoader to fetch the complete config.
   * @return Newly constructed Config.
   */
  Config getConfig(Config config);
}


All the configs provided in the start up script will be passed to AM through environment variable and loaded by the designated config loader to load the complete config. Config provided by startup script will override those read by the loader.

The full list of configs can be found in References#Complete list of job submission configs

Take wikipedia-feed in Hello Samza as an example:

deploy/samza/bin/run-app.sh \
  --config app.class=samza.examples.wikipedia.task.application.WikipediaFeedTaskApplication \
  --config job.name=wikipedia-stats \
  --config job.factory.class=org.apache.samza.job.yarn.YarnJobFactory \
  --config yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz \
  --config job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory \
  --config job.config.loader.properties.path=/__package/config/wikipedia-feed.properties

Work with Beam

See Work with Beam on how Samza Beam jobs work with simplified Job Runner. 

Rejected Alternatives

The above approach requires existing users to update its way to start a Samza job. Alternatively, we may keep the ability for runner to read from a local config, and AM will load the config using with the loader again.

Option 1 - Coexist ConfigFactory and ConfigLoader

ConfigFactory will be used to read configs during start up, which provides start up configs as of today.

ConfigLoader will be used on AM to fetch complete configs for the job to run.

This is rejected because coexist both interfaces brings confusion on their usage, in addition, reading configs multiple times introduces extra complexity in the workflow.

Option 2 - Launch aware ConfigLoader

ConfigLoader takes in a signal for it to know whether it is being invoked on the runner or on AM, then it can fetch configs accordingly based on the input properties. For example, when the input config path is /config/wikipedia-feed.properties, ConfigLoader will read from "/config/wikipedia-feed.properties" on runner and read from "/__package/config/wikipedia-feed.properties" on AM, as all Samza job tarballs are bing unzipped under "__package" folder.

This approach is rejected because the expected assumption is too tight and does not have much flexibility. In addition, implementation of ConfigLoader will depend on the deployment of a Samza job, which should be independent and completely decoupled.

Option 3 - Launch aware ConfigLoader with additive properties

ConfigLoader takes in a signal for it to know whether it is being invoked on the runner or on AM, then it can fetch corresponding configs accordingly in the input properties.Take wikipedia-feed in Hello Samza as an example:

deploy/samza/bin/run-app.sh \
  --config job.config.loader.class==org.apache.samza.config.loader.PropertiesConfigLoader \
  --config job.config.loader.properties.local.path=/config/wikipedia-feed.properties
  --config job.config.loader.properties.remote.path=/__package/config/wikipedia-feed.properties

ConfigLoader will use "job.config.loader.properties.local.path" when running on runner and "job.config.loader.properties.remote.path" on AM.

This approach is rejected as it causes excessive responsibility for users to configure multiple properties. In addition, implementation of ConfigLoader will depend on the deployment of a Samza job, which should be independent and completely decoupled.

Implementation and Test Plan

JobConfig

We will add one new config in JobConfig as well as a config prefix that wraps the properties needed for the loader to load config:

// Configuration to a fully qualified class name to load config from.
public static final String CONFIG_LOADER_CLASS = "job.config.loader.class";
// Prefix to configure the properties needed for the config loader to fetch the full config
public static final STring CONFIG_LOADER_PROPERTIES_PREFIX = "job.config.loader.properties."

PropertiesConfigLoader

Default implementation of ConfigLoader, which reads "path" from the input submission config, which leads to a property file.

public class PropertiesConfigLoader extends ConfigLoader {
  /**
   * Build a specific Config given job submission config.
   * @param config Config specified during job submission containing information necessary for this ConfigLoader to fetch the complete config.
   * @return Newly constructed Config.
   */
  override def getConfig(config: Config): Config = {
    val path = config.get("job.config.loader.properties.path")

    val props = new Properties()
    val in = new FileInputStream(path)

    props.load(in)
    in.close()

    debug("got config %s from config %s" format (props, path))

    new MapConfig(props.asScala.asJava, config)
  }
}


RemoteApplicationRunner

RemoteApplicationRunner#run will simplify submit the job to Yarn given the submission cCompatibility, Deprecation, and Migration Planonfigs.

@Override
public void run(ExternalContext externalContext) {
  JobRunner runner = new JobRunner(config);
  runner.getJobFactory().getJob(config).submit();
}

YarnJob

YarnJob#buildEnvironment will wrap the provided submission config as env variable to pass to Yarn.


Compatibility, Deprecation, and Migration Planprivate[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig,
    jobConfig: JobConfig): Map[String, String] = {
    val envMapBuilder = Map.newBuilder[String, String]

    envMapBuilder += ShellCommandConfig.ENV_CONFIG ->
      Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config))


ClusterBasedJobCoordinator

ClusterBasedJobCoordinator#main will construct the application config through config loader provided in environment variables.

Compatibility, Deprecation, and Migration Plan

Backward Incompatible

Changes will be announced in Samza 1.3 and takes effect in Samza 1.4

Users need to change job submission script and provide related configs explicitly through --config, instead of using --config-factory and --config-path to load local file.

Config Rewriters won't be invoked on job runner anymore, i.e. if the config rewriter is rewriting the job submission configs, it won't take effect anymore, users are expected to pass them explicitly.

Rollback Plan

In case of a problem in Samza 1.4, users are able to rollback to Samza 1.3 and keep the old start up flow using --config-path & --config-factory.

References

  • Complete list of job submission configs

[Required] job.factory.class 
[Required] job.name
[Required] yarn.package.path
[Optional] app.runner.class 
[Optional] yarn.resourcemanager.address
[Optional] job.id
[Optional] yarn.application.type
[Optional] fs.*.impl
[Optional] samza.cluster.based.job.coordinator.dependency.isolation.enabled
[Optional] yarn.am.opts
[Optional] yarn.am.java.home
[Optional] yarn.am.container.memory.mb
[Optional] yarn.am.container.cpu.cores
[Optional] yarn.queue
[Optional] yarn.am.container.label
[Optional] yarn.resources.*



  • No labels