Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
List<ComputeJobRunner> split(

        TaskExecutionContext context,

        @Nullable Object[] args

);

public interface TaskExecutionContext {

    Ignite ignite();

}


As we can see, map function should provide Compute job class name to list of options where options is follow class 

...

Code Block
languagejava
public class ComputeJobRunner {

  //Nodes to execute, one random node for execution, other for failover.

    private  private Set<ClusterNode> nodes;

  //Job execution options (priority, maxRetry).

    private  private JobExecutionOptions options = JobExecutionOptions.DEFAULT;

  //Deployment units for job.

  private List<DeploymentUnit> units = Collections.emptyList();

  //Class name of executing job.

  private String jobClassName;

      //Job args.
  @Nullable
    @Nullable

    private  private Object[] args;

}


Reduce

The reduce function will have follow signature 

...

Code Block
languagejava
public class Table {

  ...

  /**

   * Gets a partition manager of a table.

   *

   * @return Partition manager.

   */

  PartitionManager<HashPartition> partitions();

  ...

}



Code Block
languagejava
public class HashPartition implements Serializable {

    private final int partitionId;

    public HashPartition(int partitionId) {

        this.partitionId = partitionId;

    }

    public int partitionId() {

        return partitionId;

    }

}


Code Block
languagejava
public interface PartitionManager<T> {

    ClusterNode partitionLocation(T partition);

    Map<T, ClusterNode> allPartitions();

    <K> T partitionFromKey(K key, Mapper<K> mapper);

    T partitionFromKey(Tuple key);

}


Using this API, the user can, for example, execute a job on each partition of a table and reduce to task result

...

Code Block
languagejava
public interface PartitionManager<T> {

    ClusterNode partitionLocation(T partition);

    Map<T, ClusterNode> allPartitions();

    <K> T partitionFromKey(K key, Mapper<K> mapper);

    T partitionFromKey(Tuple key);

}


Code Block
languagejava
public class Task implements ComputeTask<Integer> {

        @Override

        public    @Override
    public List<JobExecutionParameters> map(JobExecutionContext taskContext, @Nullable Object[] args) {

            return        return taskContext.ignite()

                                    .tables()

                                    .table("tableName")

                                    .partitions()

                                    .allPartitions()

                                    .entrySet()

                                    .stream()

                                    .map(e -> JobExecutionParameters.builder()

                                                     .node(e.getValue())

                                                    .jobClassName("className")

                                                    .args(e.getKey())

                                                    .build()

                                    ).collect(Collectors.toList());

            }

        @Override    @Override

        public    public Integer reduce(Map<UUID, ?> results) {

            return        return results.values().stream()

                                    .map(Integer.class::cast)

                                    .reduce(Integer::sum)

                                    .orElse(0);

            }

    }


It is important to note that we do not guarantee that when a job is executed on a node, that the partition will be strictly colocated on this node, it can easily move to another node, but in this case we will not take any action. Those. In this scenario, we are counting on the best effort.

...

Code Block
languagejava
class IgniteCompute {

  ...

    <R>  <R> TaskExecution<R> mapReduceAsync(

            List<DeploymentUnit> units,

            String taskClassName,

            Object... args

    );

  ...

}


Code Block
languagejava
public interface TaskExecution<R> {

    CompletableFuture<R> resultAsync();

    CompletableFuture<Map<ClusterNode, JobStatus>> statusesAsync();

    default CompletableFuture<Map<ClusterNode, UUID>> idsAsync() {

        return statusesAsync().thenApply(statuses ->

                statuses != null

                        ? statuses.entrySet().stream().collect(toMap(Entry::getKey, entry -> entry.getValue().id()))

                        : null

        );

    }

    CompletableFuture<Void> cancelAsync();

}


Queues, priority, failover, cancellation

All mechanisms from IEP-118: Compute: queues, priority, failover, cancellation - Apache Ignite - Apache Software Foundation  will be applicable for generated jobs.

...

No any risks

Reference Links

// Links to various reference documents, if applicable.IEP-118: Compute: queues, priority, failover, cancellation - Apache Ignite - Apache Software Foundation

Tickets

[IGNITE-22044] Ignite 3 MapReduce - ASF JIRA (apache.org)