Status:

Current stateReleased

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-td14204.html

JIRA:  

Released: Flink 1.3

Motivation

Side outputs(a.k.a Multi-outputs)  is one of highly requested features in high fidelity stream processing use cases. With this feature, Flink can

Public Interfaces

Proposed Changes

We want to introduce outputTag and support operator collect arbitrary types of records with defined output Tags. In this prototype, it demonstrated how things works in raw/hacky form.

API Changes

User may declare multiple output tags and use get output stream with one previously defined outputtag.

final OutputTag<S> sideOutput1 = new OutputTag<S>(SValue) {}; 

Update userFunctions using RichCollector(Or MultiCollector) the rationale behind is Collector has been used in lot of places other than stream and transformation, adding direct to Collector interface will introduce many empty methods in those classes.

public interface RichCollector<T> extends Collector<T>{
  <S> void collect(OutputTag<S> tag, S value);
}

FlatMapFunction as example

flatMap(String value, Collector<Tuple2<String, Integer>> out){
           CollectorWrapper wrapper = new CollectorWrapper<>(out);
    //out.collect(new Tuple2<String, Integer>(token, 1));

                 wrapper.collect(new Tuple2<String, Integer>(token, 1));

        wrapper.collect(sideOutput1, "sideout");
}

User may pass outputtag defined eearlier and get a corresponding outputstream.

There can be more than one outputtag share same type, however, getSideOutput only returns collected record with exams same outputtag type and value.

    flatMap(..).getSideOutput(sideOutput1) 
    StreamGraph add virtualOutputNodes each map to single outputtag from upstream node
    Stream Record and StreamEdge both add outputTag, StreamConfig stores all type of side output serializers 

Record writer can utilize this information and compare with outputType( impl in prototype) or OutputTag (better implementation) or each Output<OUT> and just write matched stream record to channel

Runtime changes

It allows good compatibility without drastic change of current “single typed” output model.

    public class SideOutputTransformation<T> extends StreamTransformation<T> {
      public SideOutputTransformation(StreamTransformation input, OutputTag<T> tag) {
       super("SideOutput", tag.getTypeInformation(), input.getParallelism());
       this.input = input;
      }

        }

Instead of assuming output stream record values are same type, it will need to check output type and stream record type, only output with same outputtag typeinfo.


Compatibility, Deprecation, and Migration Plan

Test Plan

Phase 1 will follow same as adding feature to Flink API, adding unit tests and run local env mock tests