ID | IEP-120 |
Author | Mikhail Pochatkin |
Sponsor | Mikhail Pochatkin |
Created | |
Status | DRAFT |
The AI2/GG8 Compute API has a MapReduce mechanism for generating jobs and launching them on arbitrary nodes of the cluster. It is necessary to support this functionality in AI3/GG9.
It is proposed to make the mechanism for using MapReduce tasks identical to Compute jobs. The user must write an implementation of the ComputeTask interface and deploy it using the Code Deployment. After this, the user can invoke a MapReduce task on any node in the cluster with custom arguments, having previously specified the deployment unit containing the task class.
MapReduce task implies two actions: generating jobs into a map function and merging the results of these jobs into a reduce function. Let's describe the map function in detail.
The signature of map function will be follow:
List<ComputeJobRunner> split( TaskExecutionContext context, @Nullable Object[] args ); public interface TaskExecutionContext { Ignite ignite(); }
As we can see, map function should provide Compute job class name to list of options where options is follow class
public class ComputeJobRunner { //Nodes to execute, one random node for execution, other for failover. private Set<ClusterNode> nodes; //Job execution options (priority, maxRetry). private JobExecutionOptions options = JobExecutionOptions.DEFAULT; //Deployment units for job. private List<DeploymentUnit> units = Collections.emptyList(); //Class name of executing job. private String jobClassName; //Job args. @Nullable private Object[] args; }
The reduce function will have follow signature
CompletableFuture<R> reduce(Map<UUID, CompletableFuture<?>> results);
Where the result map will contain generated job ID to job result. The reduce function will be called on the task coordinator node.
By analogy with a job coordinator, we introduce the concept of a task coordinator. The task coordinator is the node on which the map-reduce was called. This node can be the same as a node included in the map function pipeline or can be completely separate. The main task of the coordinator node is to collect the results of jobs obtained as a result of the map and collect the final result using the reduce function. After this, the result is returned to the initiator's client. As an obvious invariant here we have that the task coordinator is the job coordinator for every generated job.
One of the basic scenarios for using MapReduce is to perform some kind of operation on a table (or collocated tables), which is very similar to SQL. Let's describe the main scenarios.
The user has some kind of job that reads data from the table, performs some kind of operation on it and returns it. The user may want to improve this process by splitting job execution to different nodes in the cluster so that each job reads only the local data of the node where it is running, performs operation on it, returns and as result reduces it.
For all this, it is proposed to expand the public API of tables and add a new entity that allows you to obtain information about the partitions of this table.
public class Table { ... /** * Gets a partition manager of a table. * * @return Partition manager. */ PartitionManager<HashPartition> partitions(); ... }
public class HashPartition implements Serializable { private final int partitionId; public HashPartition(int partitionId) { this.partitionId = partitionId; } public int partitionId() { return partitionId; } }
public interface PartitionManager<T> { ClusterNode partitionLocation(T partition); Map<T, ClusterNode> allPartitions(); <K> T partitionFromKey(K key, Mapper<K> mapper); T partitionFromKey(Tuple key); }
Using this API, the user can, for example, execute a job on each partition of a table and reduce to task result
public interface PartitionManager<T> { ClusterNode partitionLocation(T partition); Map<T, ClusterNode> allPartitions(); <K> T partitionFromKey(K key, Mapper<K> mapper); T partitionFromKey(Tuple key); }
public class Task implements ComputeTask<Integer> { @Override public List<JobExecutionParameters> map(JobExecutionContext taskContext, @Nullable Object[] args) { return taskContext.ignite() .tables() .table("tableName") .partitions() .allPartitions() .entrySet() .stream() .map(e -> JobExecutionParameters.builder() .node(e.getValue()) .jobClassName("className") .args(e.getKey()) .build() ).collect(Collectors.toList()); } @Override public Integer reduce(Map<UUID, ?> results) { return results.values().stream() .map(Integer.class::cast) .reduce(Integer::sum) .orElse(0); } }
It is important to note that we do not guarantee that when a job is executed on a node, that the partition will be strictly colocated on this node, it can easily move to another node, but in this case we will not take any action. Those. In this scenario, we are counting on the best effort.
class IgniteCompute { ... <R> TaskExecution<R> mapReduceAsync( List<DeploymentUnit> units, String taskClassName, Object... args ); ... }
public interface TaskExecution<R> { CompletableFuture<R> resultAsync(); CompletableFuture<Map<ClusterNode, JobStatus>> statusesAsync(); default CompletableFuture<Map<ClusterNode, UUID>> idsAsync() { return statusesAsync().thenApply(statuses -> statuses != null ? statuses.entrySet().stream().collect(toMap(Entry::getKey, entry -> entry.getValue().id())) : null ); } CompletableFuture<Void> cancelAsync(); }
All mechanisms from will be applicable for generated jobs.
No any risks
// Links to various reference documents, if applicable.