IDIEP-119
Author
Sponsor
Created

 

Status

DRAFT

Motivation

Current version of the Compute API supports the following types of objects to be passed as arguments to jobs and to be returned as result from jobs: 

  1. Primitive types: Boolean, Byte, Short, Integer, Long, Float, Double.
  2. Complex build-in types: String, BigDecimal, UUID, BitSet, LocalDate, LocalTime, LocalDateTime, Instant, BigInteger, Duration, Period
  3. byte[].

Looking at the Key-Value Table API or at the RecordView API, it is clear that the Compute API should support user-defined objects as well. At least Pojo mapping should be supported in the Compute API as it is supported in the Key-Value Table API.

Definitions

Serialization -- the process of packing any type in any supported language into byte[]. "object" -> serialization -> byte[]. Synonym for Marshalling.

Deserialization -- the process of unpacking byte[] into the object instance in any supported language. byte[] -> deserialization -> "object". Synonym for Unmarshaling.

Mapping -- Ignite-specific process of projecting column names into pojo names. "Ignite table row" <-> Mapper <-> "object". 

Client-side marshaling -- serialization and deserialization defined on the client. It means that only client code does serialization/deserialization but for server-side this is just byte[]. Server might not have client types.

Server-side marshaling -- serialization and deserialization defined in the server. In the context of compute API, this is how job arguments and return types are converted into/from byte[]. Server-side serializers and deserializers are deployed as a part of the deployment unit.

Ignite 3 types definition

PrimitiveType := boolean, tinyint, smallint, int, bigint, real, double, varchar, varbinary, time, timestamp, date, decimal, uuid

PrimitiveType defines every type that is supported by SQL. Each platform has a PrimitiveType equivalent. Ignite can automatically convert to/from PrimitiveType. See ColumnType in catalog.

Tuple := [(String, PrimitiveType)...]

Pojo := Plain Java Object that has only PrimitiveType that can be automatically converted to/from Tuple.

UserType := any object on any platform.

KeyValue API and Record API are working with PrimitiveType and Tuple OOTB. For UserType they require the type to be Pojo with exactly the same field-column mapping (automap) or the user has to provide a mapper (give me mapping for class fields and column names, so I will convert your object into table row).

Compute API supports PrimitiveType OOTB. Compute API does not have any schema. It is wider than it. It can work with any UserType. It means that “mapping” is not the case for Compute. The user has to define “marshallers” for its UserType on both client and server side.

Design

Compute

Low level “compute job” is defined as a primitive unit of an abstract “execution” that accepts a single user-defined argument and returns a single user-defined result. The job is defined by the Java class that must implement the ComputeJob<T, R> interface. The only way to ship the job class into an Ignite 3 cluster is to assemble it with a deployment unit and send it to the cluster via a REST API. The job can be referenced by deployment unit + fqn. 

Here is the ComputeJob interface definition:

interface ComputeJob<T, R> {
  CompletableFuture<R> executeAsync(JobExecutionContext ctx, T input);
  
  default Marshaller<T, ?> inputMarshaller() {
    return null;
  }
  default Marshaller<R, ?> resultMarhaller() {
    return null;
  }
}


Both input and output types can be a UserType, Tuple, or PrimitiveType. Ignite 3 supports PrimitiveTypes (1) and Tuples (2) OOTB meaning that the user should be able to declare compute jobs with those types without any additional marshallers. 

class MyPrimitiveComputeJob implements ComputeJob<byte[], Long> { // (1)
  CompletableFuture<Long> executeAsync(JobExecutionContext ctx, byte[] data) {
    var kv = ctx.ignite().tables().table("test").kvView();
    var tup = toTuple(data); // user code
    return count(kv.get(null, tup)); // user code
  } 
}

class MyTupleComputeJob implements ComputeJob<Tuple, Tuple> { // (2)
  CompletableFuture<Tuple> executeAsync(JobExecutionContext ctx, Tuple key) {
    return ctx.ignite().tables().table("test").kvView().get(null, key);
  }
}


For UserTypes we apply the User Type Marshaling strategy: 

if user has defined the marshaller for the type in the job implementation then use it (3), 

  else if UserType is Pojo then BinaryTupleMarshalling is applied (1), 

    if UserType has nested Pojos, then apply BinaryTupleMarshalling recursively (2). 


class Pojo {
  int i, j;
  getI(); getJ();
}
// No user marshaller is defined, use default field-by-field binary tuple marshalling 
class MyPojoComputeJob implements ComputeJob<Pojo, Pojo> { // (1)
  CompletableFuture<Pojo> executeAsync(JobExecutionContext ctx, Pojo data) {
    return future(data) ;
  }
}

class MyNestedPojo {
  long l;
  Pojo p;
  getL(); getP();
}
// Default nested field-by-field marshalling is applied
class MyNestedPojoComputeJob implements ComputeJob<MyNestedPojo, MyNestedPojo> { // (2)
  CompletableFuture<MyNestedPojo> executeAsync(JobExecutionContext ctx, MyNestedPojo data) {
    return future(data);
  }
}

// User-defined marshallers are used
class MyComplexComputeJob implements ComputeJob<UserType1, UserType2> { // (3)
  CompletableFuture<UserType2> executeAsync(JobExecutionContext ctx, UserType1 data) {
    return future(new UserType2(...));
  }
  Marshaller<byte[], UserType1> inputMarshaller() {
    return new MyKryoMarshaller<>();
  }
  Marshaller<byte[], UserType2> resultMarshaller() {
    return new MyKryoMarshaller<>();
  }
}


Marshallers

Marshaller is defined by the following interface that is a part of the public API.

interface Marshaller<T, R> {
  R marshal(T object);
  T unmarshal(R raw)
}

interface TupleMarshaller<T> extends Marshaller<T, Tuple> {
  default Tuple marshal(T object) {
    return BinaryTupleMarshallilng.marshal(object);
  }
  default T unmarshal(Tuple raw) {
    return BinaryTupleMarshalling.unmarshal(raw);
  }
}

interface ByteArrayMarshaller<T> extends Marshaller<T, byte[]> {
  default byte[] marshal(T object) {
    return JavaSerializationMarshalling.marshal(object);
  }
  default T unmarshal(byte[] raw) {
    return JavaSerializationMarshalling.unmarshal(raw);
  }
}

// a.k.a Mapper - or maybe PojoMapper
class FieldByFieldTupleMarshaller<T> implements TupleMarshaller<T> {
  static FieldByFieldTupleMarshaller<T> of(Class<T> clazz) { ... } // todo
}


Binary Tuple With Schema serialization

BinaryTuple with schema (BT) is the universal serialization format for Ignite 3. It can be used in both KV and Compute APIs. BT implements Tuple interface. BT is used as the default Tuple serialization format in Compute API. Here is the scheme for compute call message: BT Header has a schema. The schema can be either inline or global. Global schema is referenced by id in catalog and resolved lazily. Inline schema is packed into header and transferred as a part of BT.

BinaryTupleWithSchema := |schema, payload|

schema := |inlineSchema or schemaId|
schemaId := |long|
inlineSchema := |col, [col]|
col := |colName, colTypeId|
colName := |string|
colTypeId := |int|

payload := |byte[]|


Given tuple object:

PlainObject {
  int x,
  long y,
  Sting s,
  byte[] b
}

var tup = Tuple.create();
tup.set("x", 1);
tup.set("y", 2L);
tup.set("s", "Hi");
tup.set("b", new byte[1]);

The inline schema should look like this:

BinaryTupleWithSchema := |inlineSchema, payload|

inlineSchema := |"x", <intId>, "y", <longId>, "s", <stringId>, "b", <byteId>|

payload := |1, 2, 2, 'H', 'i', 0|



The main advantage of this new format is that it allows sending binary tuples that can be projected on the server side and vice versa. For example, a binary tuple sent as an argument to the Compute job can be copied directly from the incoming byte array. Only when the job accesses the value from the tuple is it "materialized." Mutable operations are supported through an additional tuple instance that holds "updates" to the original binary tuple. This class is inspired by MutableTupleBinaryTupleAdapter.


class TupleProjection implements Tuple {
    private final BinaryTuple data;
    private final Schema schema;
    private final Tuple mutableTuple;
    // ...
}


TupleProjection is passed into the Compute job as a Tuple instance.

We also say that any Pojo can be marshaled with BinaryTuple protocol through inline schema (automap analog). 

Client-side mapping

The client should have an API for defining a marshaller for its pojos that are used as arguments and/or return values in the compute API.

Proposed API:

var remoteMethod = client.compute(
    JobDescriptor.builder()
            .units(deploymentUnit)
            .jobClassName("MyJob")
            .argumentMarshaller(
                    // Mapper here is an instance of TupleMarshaller<ClientPojo> 
                    Mapper.builder(ClientPojo.class) 
                            .map("s", "s")
                            .map("JJJ", "j")
                            .build()
            )
            .resultMarshaller(new MyJsonMarshaller<>())
            .build()
);


Nested BT

The BT format does not define what kind of types it is supposed to have. It is obvious that PrimitiveTypes are supported. But we also support nested BT with recursive serialization. It means that we support nested pojos as well.

KV API changes

The support for Tuples in the Compute API was inspired by KV API. During the design we understood that the Compute API needs more than just Tuple but byte[]. So the Marshaller was introduced. But here is the question. Why can’t we use marshallers for KV API also? Byte[], byte[] KV with marshallers defined on the client side is possible. This is the topic for the future designs.

Opened tickets

Key Summary T Created Updated Due Assignee Reporter P Status Resolution
Loading...
Refresh


Closed tickets

Key Summary T Created Updated Due Assignee Reporter Priority Priority Priority Priority P Status Resolution
Loading...
Refresh

  • No labels