You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

IDIEP-120
AuthorMikhail Pochatkin  
Sponsor Mikhail Pochatkin 
Created
StatusDRAFT


Motivation

The AI2/GG8 Compute API has a MapReduce mechanism for generating jobs and launching them on arbitrary nodes of the cluster. It is necessary to support this functionality in AI3/GG9.

Requirements

  1. Using MapReduce, the user can start any number of jobs on arbitrary clusters.
  2. Colocation mapping should be supported.
  3. All existing job mechanisms (failover, cancellation, priorities) must be applicable to MapReduce tasks.

Description

Design


It is proposed to make the mechanism for using MapReduce tasks identical to Compute jobs. The user must write an implementation of the ComputeTask interface and deploy it using the Code Deployment. After this, the user can invoke a MapReduce task on any node in the cluster with custom arguments, having previously specified the deployment unit containing the task class.

Job generation

MapReduce task implies two actions: generating jobs into a map function and merging the results of these jobs into a reduce function. Let's describe the map function in detail.

The signature of map function will be follow:

List<ComputeJobRunner> split(

        TaskExecutionContext context,

        @Nullable Object[] args

);

public interface TaskExecutionContext {

    Ignite ignite();

}


As we can see, map function should provide Compute job class name to list of options where options is follow class 


public class ComputeJobRunner {

  //Nodes to execute, one random node for execution, other for failover.

    private Set<ClusterNode> nodes;

  //Job execution options (priority, maxRetry).

    private JobExecutionOptions options = JobExecutionOptions.DEFAULT;

  //Deployment units for job.

  private List<DeploymentUnit> units = Collections.emptyList();

  //Class name of executing job.

  private String jobClassName;

    //Job args.

    @Nullable

    private Object[] args;

}


Reduce

The reduce function will have follow signature 

CompletableFuture<R> reduce(Map<UUID, CompletableFuture<?>> results);

Where the result map will contain generated job ID to job result. The reduce function will be called on the task coordinator node.

Task coordinator

By analogy with a job coordinator, we introduce the concept of a task coordinator. The task coordinator is the node on which the map-reduce was called. This node can be the same as a node included in the map function pipeline or can be completely separate. The main task of the coordinator node is to collect the results of jobs obtained as a result of the map and collect the final result using the reduce function. After this, the result is returned to the initiator's client. As an obvious invariant here we have that the task coordinator is the job coordinator for every generated job.

Colocation

One of the basic scenarios for using MapReduce is to perform some kind of operation on a table (or collocated tables), which is very similar to SQL. Let's describe the main scenarios.

The user has some kind of job that reads data from the table, performs some kind of operation on it and returns it. The user may want to improve this process by splitting job execution to different nodes in the cluster so that each job reads only the local data of the node where it is running, performs operation on it, returns and as result reduces it. 

For all this, it is proposed to expand the public API of tables and add a new entity that allows you to obtain information about the partitions of this table.


public class Table {

  ...

  /**

   * Gets a partition manager of a table.

   *

   * @return Partition manager.

   */

  PartitionManager<HashPartition> partitions();

  ...

}



public class HashPartition implements Serializable {

    private final int partitionId;

    public HashPartition(int partitionId) {

        this.partitionId = partitionId;

    }

    public int partitionId() {

        return partitionId;

    }

}


public interface PartitionManager<T> {

    ClusterNode partitionLocation(T partition);

    Map<T, ClusterNode> allPartitions();

    <K> T partitionFromKey(K key, Mapper<K> mapper);

    T partitionFromKey(Tuple key);

}


Using this API, the user can, for example, execute a job on each partition of a table and reduce to task result


public interface PartitionManager<T> {

    ClusterNode partitionLocation(T partition);

    Map<T, ClusterNode> allPartitions();

    <K> T partitionFromKey(K key, Mapper<K> mapper);

    T partitionFromKey(Tuple key);

}
public class Task implements ComputeTask<Integer> {

        @Override

        public List<JobExecutionParameters> map(JobExecutionContext taskContext, @Nullable Object[] args) {

            return taskContext.ignite()

                    .tables()

                    .table("tableName")

                    .partitions()

                    .allPartitions()

                    .entrySet()

                    .stream()

                    .map(e -> JobExecutionParameters.builder()

                            .node(e.getValue())

                            .jobClassName("className")

                            .args(e.getKey())

                            .build()

                    ).collect(Collectors.toList());

        }

        @Override

        public Integer reduce(Map<UUID, ?> results) {

            return results.values().stream()

                    .map(Integer.class::cast)

                    .reduce(Integer::sum)

                    .orElse(0);

        }

    }


It is important to note that we do not guarantee that when a job is executed on a node, that the partition will be strictly colocated on this node, it can easily move to another node, but in this case we will not take any action. Those. In this scenario, we are counting on the best effort.


class IgniteCompute {

 ...

    <R> TaskExecution<R> mapReduceAsync(

            List<DeploymentUnit> units,

            String taskClassName,

            Object... args

    );

...

}


public interface TaskExecution<R> {

    CompletableFuture<R> resultAsync();

    CompletableFuture<Map<ClusterNode, JobStatus>> statusesAsync();

    default CompletableFuture<Map<ClusterNode, UUID>> idsAsync() {

        return statusesAsync().thenApply(statuses ->

                statuses != null

                        ? statuses.entrySet().stream().collect(toMap(Entry::getKey, entry -> entry.getValue().id()))

                        : null

        );

    }

    CompletableFuture<Void> cancelAsync();

}


Queues, priority, failover, cancellation

All mechanisms from  will be applicable for generated jobs.

Risks and Assumptions

No any risks

Reference Links

// Links to various reference documents, if applicable.

Tickets

[IGNITE-22044] Ignite 3 MapReduce - ASF JIRA (apache.org)

  • No labels