Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The end-goal is to allow a DataStream API user (for example Blink) to:
- Efficiently implement A* multi broadcast join - to have a single operator chain, where probe table (source) is read locally (inside the task that’s is actually doing the join), then joined with multiple other broadcasted tables.
- Assuming there are 2 or more sources, pre-partitioned on the same key. In that case we should be able to perform all of the table reading and the join inside a single Task.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
- DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
- Classes marked with the @Public annotation
- On-disk binary formats, such as checkpoints/savepoints
- User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
- Configuration settings
- Exposed monitoring information
Proposed Changes
In order to fulfil the motivation, we propose the following plan:
- Implement N-Ary Stream Operator as proposed above, however with support for the input selection.
- initially it can be just exposed via the `StreamTransformation`, without direct access from the `DataStream API` - Allow it to be chained with sources (implemented using the FLIP-27 SourceReader)
Optional follow up step would be to think about whether we need to support more complex chaining. Without this, motivating examples (I and II) could be implemented if all of the joins/filtering/mappings are compiled/composed into a single N-Ary Stream Operator (which could be chained with some other single input operators at the tail). We could also think about supporting of chaining a tree of for example TwoInputStreamOperators inside a single Task. However I’m leaving this as a follow up, since in that case, it’s not so easy to handle the `InputSelection` of multiple operators inside the tree.
A New Hierarchy of Stream Operators
We propose a new operator StreamOperatorNG (name yet to be decided):
abstract class StreamOperatorNG<OUT> { Collection<Input<?>> getInputs() // as well as all the other methods of existing StreamOperator // and AbstractStreamOperator: // setup()/open()/close()/snapshot()/restore() ... } abstract class Input<T> { // for determining whether two inputs are equal private final UUID uuid = UUID.randomUUID(); public abstract void processElement(StreamRecord<T> element) public abstract void processWatermark(Watermark watermark) }
An operator can have any number of inputs and when constructing the stream topology inputs streams have to be attached to the inputs.
Operator definition
private static class MyOperator extends StreamOperatorNG<String> { Input<String> input1 = new Input<String>() { @Override public void processElement(StreamRecord<String> element) throws Exception { System.out.println("GOT (ON INPUT 1): " + element.getValue()); output.collect(new StreamRecord<>(element.getValue())); } @Override public void processWatermark(Watermark watermark) { } }; Input<String> input2 = new Input<String>() { @Override public void processElement(StreamRecord<String> element) throws Exception { System.out.println("GOT (ON INPUT 2): " + element.getValue()); output.collect(new StreamRecord<>(element.getValue())); } @Override public void processWatermark(Watermark watermark) { } }; Input<String> input3 = new Input<String>() { @Override public void processElement(StreamRecord<String> element) throws Exception { System.out.println("GOT (ON INPUT 3): " + element.getValue()); output.collect(new StreamRecord<>(element.getValue())); } @Override public void processWatermark(Watermark watermark) { } }; @Override public Collection<Input<?>> getInputs() { List<Input<?>> result = new LinkedList<>(); result.add(input1); result.add(input2); result.add(input3); return result; } }
Usage
DataStream<String> source1 = ...; DataStream<String> source2 = ...; DataStream<String> source3 = ...; MyOperator myOperator = new MyOperator(); OperatorTransformation<String> transform = new OperatorTransformation<>( "My Operator", myOperator, BasicTypeInfo.STRING_TYPE_INFO, env.getParallelism()); transform.setInput(myOperator.input1, source1.getTransformation()); transform.setInput(myOperator.input2, source2.getTransformation()); transform.setInput(myOperator.input3, source3.getTransformation()); env.addOperator(transform);
Compatibility, Deprecation, and Migration Plan
- There will be no immediate impact to any of the existing users
- Follow up step might be deprecate existing TwoInputStreamOperator
Rejected Alternatives
The reject alternative was an N-ary stream operator, that would work based on array and indexes. For example with the following main elements processing method signature:
void processElement(int inputIndex, T element);
This was tested to have similar performance, with less clean API.