ID | IEP-121 |
Author | |
Sponsor | |
Created |
|
Status | ACTIVE |
Data streamer (IEP-102: Data Streamer) needs more flexibility:
public interface DataStreamerTarget<T> { ... /** * Streams data with receiver. The receiver is responsible for processing the data and updating zero or more tables. * * @param publisher Producer. * @param options Options (can be null). * @param keyFunc Key function. The key is only used locally for colocation. * @param payloadFunc Payload function. The payload is sent to the receiver. * @param resultSubscriber Optional subscriber for the receiver results. * @param deploymentUnits Target deployment units. Can be empty. * @param receiverClassName Receiver class name. * @param receiverArgs Receiver arguments. * @return Future that will be completed when the stream is finished. * @param <E> Producer item type. * @param <V> Payload type. * @param <R> Result type. */ <E, V, R> CompletableFuture<Void> streamData( Flow.Publisher<E> publisher, @Nullable DataStreamerOptions options, Function<E, T> keyFunc, Function<E, V> payloadFunc, @Nullable Flow.Subscriber<R> resultSubscriber, List<DeploymentUnit> deploymentUnits, String receiverClassName, Object... receiverArgs); }
Where receiver is:
public interface DataStreamerReceiver<V, R> { CompletableFuture<List<R>> receive( List<V> page, DataStreamerReceiverContext ctx, Object... args); }
RecordView<Tuple> view = defaultTable().recordView(); try (var publisher = new SubmissionPublisher<CustomData>()) { streamerFut = view.<CustomData, String, Boolean>streamData( publisher, null, item -> Tuple.create().set("id", item.getId()), item -> item.serializeToString(), null, List.of(new DeploymentUnit("test", "1.0.0")), "org.foo.bar.StreamReceiver", "receiverArg1"); publisher.submit(new CustomData(1, "x")); }
Only a limited set of basic types is supported for receiver payload, arguments, and invocation result (same approach as Compute arguments).
The user is responsible for conversion (e.g. serialize to String or byte[]). payloadFunc is a good place to perform the serialization.
Receiver class should be deployed to the cluster prior to data streamer usage using the IEP-103: Code Deployment
While it is possible to use a dummy table as a workaround, it would be great to have a cleaner table-less streaming API.
This will be possible when PartitionManager is available as a result of the IEP-120: MapReduce API proposal. With the help of partitionFromKey we’ll be able to partition arbitrary data.
Out of scope currently.
TBD
Key | Summary | T | Created | Updated | Due | Assignee | Reporter | Priority | Priority | Priority | Priority | P | Status | Resolution |
---|