IDIEP-121
Author
Sponsor
Created

  

Status

DRAFT


Motivation

Data streamer (IEP-102: Data Streamer) needs more flexibility:

  • Transform data on the servers
  • Update more than one table

Use Cases

  • Process the data before updating the table. Raw data requires CPU-intensive parsing and validation, which is better done on the server rather than client.
  • Stream denormalized data into multiple tables: e.g. a stream of Orders should populate Order, Customer, Address tables, which are colocated by CustomerId. Instead of populating those tables separately from the client, we can stream raw data to the servers. Every raw entry will result in 3 table updates.
  • Distributed stream processing. For example, we need to resize and watermark a lot of images and push them to external storage. Here we don't update any tables, but use server-side CPUs and streamer batching functionality to distribute the load.

Description

Requirements

  • Stream arbitrary data into the cluster
    • Data items do not need to conform to any schema of any table
  • Process the data on the servers with a user-defined receiver
    • Receiver can do anything. It may update zero or more tables, use transactions, invoke any other Ignite APIs.
  • Return receiver invocation results back to the caller (optionally, one value per data item). Can be used to report processing errors.
  • Partition awareness: items should be partitioned using the result of user-provided keyFunction and sent in batches to the corresponding nodes
    • Improves table update performance
    • Performs load balancing when there are no table updates
  • Receiver invocation guarantees: at-least-once. In case of connection issues the receiver can be invoked more than once for the same batch.

API

  • Keep existing basic streamer API as is
  • Add another overload with receiver


DataStreamerTarget.java
public interface DataStreamerTarget<T> {
...


/**
* Streams data with receiver. The receiver is responsible for processing the data and updating zero or more tables.
*
* @param publisher Producer.
* @param options Options (can be null).
* @param keyFunc Key function. The key is only used locally for colocation.
* @param payloadFunc Payload function. The payload is sent to the receiver.
* @param resultSubscriber Optional subscriber for the receiver results.
* @param deploymentUnits Target deployment units. Can be empty.
* @param receiverClassName Receiver class name.
* @param receiverArgs Receiver arguments.
* @return Future that will be completed when the stream is finished.
* @param <E> Producer item type.
* @param <V> Payload type.
* @param <R> Result type.
*/
<E, V, R> CompletableFuture<Void> streamData(
       Flow.Publisher<E> publisher,
       @Nullable DataStreamerOptions options,
       Function<E, T> keyFunc,
       Function<E, V> payloadFunc,
       @Nullable Flow.Subscriber<R> resultSubscriber,
       List<DeploymentUnit> deploymentUnits,
       String receiverClassName,
       Object... receiverArgs);
}

Where receiver is:

DataStreamerReceiver.java
public interface DataStreamerReceiver<V, R> {
CompletableFuture<List<R>> receive(
       List<V> page,
       DataStreamerReceiverContext ctx,
       Object... args);
}
  • Receiver is invoked with the entire page (batch) of data as sent by the streamer. This provides an opportunity for more efficient batch processing for the user.
  • Full Ignite API is available to the receiver via DataStreamerReceiverContext
  • When resultSubscriber is not null, it is invoked for every result of receive method invocation

Usage Example


RecordView<Tuple> view = defaultTable().recordView();

try (var publisher = new SubmissionPublisher<CustomData>()) {
   streamerFut = view.<CustomData, String, Boolean>streamData(
           publisher,
           null,
           item -> Tuple.create().set("id", item.getId()),
           item -> item.serializeToString(),
           null,
           List.of(new DeploymentUnit("test", "1.0.0")),
           "org.foo.bar.DemoReceiver",
           "receiverArg1");


   publisher.submit(new CustomData(1, "x"));
}


class DemoReceiver implements DataStreamerReceiver<String, Boolean> {
    @Override
    public CompletableFuture<List<Boolean>> receive(List<String> page, DataStreamerReceiverContext ctx, Object... args) {
        RecordView<Tuple> view = ctx.ignite().tables().table(TABLE_NAME).recordView();
        List<Boolean> res = new ArrayList<>();

        ctx.ignite().transactions().runInTransaction(tx -> {
            for (String item : page) {
                CustomData data = CustomData.deserializeFromString(item);

                boolean success = view.insert(tx, processData(data, args));
                res.add(success);
            }
        });

        return CompletableFuture.completedFuture(res);
    }

    private Tuple processData(CustomData data, Object... args) {
        // Intensive processing goes here.
    }
} 

Data Serialization

Only a limited set of basic types is supported for receiver payload, arguments, and invocation result (same approach as Compute arguments).

The user is responsible for conversion (e.g. serialize to String or byte[]). payloadFunc is a good place to perform the serialization.

Receiver Deployment

Receiver class should be deployed to the cluster prior to data streamer usage using the IEP-103: Code Deployment

Partition Awareness

  • Partition awareness is based on keyFunc and underlying table partition distribution
  • Best effort approach (no partition pinning): we try to invoke the receiver on the node where target partition is located
  • The underlying table does not have to be updated by the receiver, we only use it for partitioning
    • Extreme case: dummy empty table, single int64 column, keyFunction returns AtomicLong#incrementAndGet values for load balancing.


Streaming Without Table

While it is possible to use a dummy table as a workaround, it would be great to have a cleaner table-less streaming API.

This will be possible when PartitionManager is available as a result of the IEP-120: MapReduce API proposal. With the help of partitionFromKey we’ll be able to partition arbitrary data.

Out of scope currently.

Error Handling

  • Unhandled exception in the receiver stops the streamer. Exception is rethrown to the client.
  • Receiver implementation can handle exceptions and return success/failure status to the resultSubscriber
  • Connection errors are retried automatically
  • If the request is already sent, we don’t assume an idempotent receiver. Retry behavior is up to RetryPolicy (user-defined)

Risks and Assumptions

None.

Discussion Links

Reference Links

Tickets

Key Summary T Created Updated Due Assignee Reporter Priority Priority Priority Priority P Status Resolution
Loading...
Refresh

  • No labels