Retainability is one of the key features that REEF provides: when tasks end, REEF applications can reuse the evaluators. In this tutorial, we introduce Task scheduler example to see how REEF applications use that feature.

The tutorial has two parts. First, we explain what Task scheduler does and how users can run it. Second, we will see how evaluators are reused from the code.

Overview

Task scheduler is a REEF application similar to the YARN's Distributed shell example. It executes shell commands on a configurable number of machines.
By submitting subsequent commands to retained Evaluators, the scheduler avoids the latency of spawning new containers. The application provides a RESTful API for Evaluator management and Task submission.

How to launch Task Scheduler

Prerequisites

You have compiled REEF locally, and have tried out HelloREEFHttp.

Task Scheduler runs on the local and on the YARN runtime. The commands to launch are similar to the ones in HelloREEF.

Local runtime

> java -cp lang/java/reef-examples/target/reef-examples-{$REEF_VERSION}-shaded.jar org.apache.reef.examples.scheduler.SchedulerREEF

YARN runtime

> yarn jar lang/java/reef-examples/target/reef-examples-{$REEF_VERSION}-shaded.jar org.apache.reef.examples.scheduler.SchedulerREEFYarn

How it works

RESTful API

Users can send the HTTP request to the server via URL :

http://{address}:{port}/reef-example-scheduler/v1

The possible requests are as follows:

  • /list: lists all the Tasks’ statuses.
  • /clear: clears all the Tasks that are waiting in the queue, and returns the number of removed Tasks.
  • /submit?cmd=COMMAND: submits a Task to execute COMMAND, and returns the Task id.
  • /status?id=ID: returns the status of the Task whose id is ID.
  • /cancel?id=ID: cancels the Task whose id is ID.
  • /max-eval?num={num}: limits the maximum number of Evaluators.

The submitted tasks are executed sequentially. Each task runs on an evaluator, and the maximum number of evaluators is specified by the max-eval request. You can find the result of a task in the evaluator's log.

 

Reusing the Evaluators

 

When a Task completes, CompletedTaskHandler.onNext() is called, which can be found at SchedulerDriver.java. This is because the handler is bound to ON_TASK_COMPLETED in the driver configuration. (see SchedulerREEF.java)

 

final class CompletedTaskHandler implements EventHandler<CompletedTask> {
  @Override
  public void onNext(final CompletedTask task) {
    final int taskId = Integer.valueOf(task.getId());
    synchronized (SchedulerDriver.this) {
      ...
      final ActiveContext context = task.getActiveContext();
      if (retainable) {
        retainEvaluator(context);
      } else {
        reallocateEvaluator(context);
      }
      ...
    }
  } 


In the CompletedTaskHandler, the active Context can be accessed from the `CompletedTask` object, which makes it possible to reuse the Evaluator.

private synchronized void retainEvaluator(final ActiveContext context) {
  if (scheduler.hasPendingTasks()) {
    scheduler.submitTask(context);
  } else if (nActiveEval > 1) {
    nActiveEval--;
    context.close();
  } else {
    state = State.READY;
    waitForCommands(context);
  }
}

In the `ratainEvaluator()` method, `scheduler.submitTask(context)` makes the Evaluator run another Task if there is a pending Task. Otherwise, the Evaluator is relaesed by closing the active Context, or waits until a new command arrives.

To turn off this feature, add command line argument `-retain false` when you launch the application: Evaluators are released when Tasks end, and a new Evaluator is allocated whenever a Task starts.

 

 

  • No labels