You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

 

Status:

Current state"Under Discussion"

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-td14204.html

JIRA:   FLINK-4460 - Getting issue details... STATUS

Released: TBD, follow JIRA comments

Motivation

Side outputs(a.k.a Multi-outputs)  is one of highly requested features in high fidelity stream processing use cases. With this feature, Flink can

  • Side output corrupted input data and avoid job fall into “fail -> restart -> fail” cycle
  • Side output sparsely received late arriving events while issuing aggressive watermarks in window computation.

Public Interfaces

  • Add org.apache.flink.util.RichCollector.

  • offer RichCollector from RichFunction RuntimeContext instance

Proposed Changes

We want to introduce outputTag and support operator collect arbitrary types of records with defined output Tags. In this prototype, it demonstrated how things works in raw/hacky form.

User may declare multiple output tags and use get output stream with one previously defined outputtag.

final OutputTag<String> sideOutput1 = new OutputTag<String>() {}; 
 

Update userFunctions using RichCollector(Or MultiCollector) the rationale behind is Collector has been used in lot of places other than stream and transformation, adding direct to Collector interface will introduce many empty methods in those classes.

public interface RichCollector<T> extends Collector<T>{
 <S> void collect(OutputTag<S> tag, S value);
}

FlatMapFunction as example

flatMap(String value, Collector<Tuple2<String, Integer>> out){
out.collect(new Tuple2<String, Integer>(token, 1));
((RichCollector<Tuple2<String, Integer>>)collector).collect(sideOutput1, "sideout");
}

WindowFunction

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable{
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

 

In RichFunction, user can getRuntimeContext().getCollector(), it returns a RichCollector

  • Add <S> void collect(OutputTag<S> tag, S value) method to Collector interface 

In this way, all of existing function will have side output support without compatbility issue. The drawbacks are there will be large number classes implementing Collector interface with empty implementation 

User may pass outputtag defined eearlier and get a corresponding outputstream. There can be more than one outputtag share same type.

flatMap(..).getSideOutput(sideOutput1) 

Record writer can utilize this information and compare with outputType( impl in prototype) or OutputTag (better implementation) or each Output<OUT> and just write matched stream record to channel

It allows good compatibility without drastic change of current “single typed” output model.

public class SideOutputTransformation<T> extends StreamTransformation<T> {
  public SideOutputTransformation(StreamTransformation input, OutputTag<T> tag) {
     super("SideOutput", tag.getTypeInformation(), input.getParallelism());
     this.input = input;
  }

Instead of assuming output stream record values are same type, it will need to check output type and stream record type, only output with same outputtag typeinfo.

To some extend, getOutput works like split stream and select by outputtag of a specific tag. It might be great to embed "outputselector" and reuse selectvirtualnode. Problem is split and select only takes list of strings. Flink might consider allow split assign tags, select choose tags. No need to add another type of virtual node which almost do exactly same as select

In current form, many functions assume single OUT type.

@Public
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
  OUT join(IN1 first, IN2 second) throws Exception;
}
...
public void join(IN1 left, IN2 right, Collector<OUT> out) throws Exception {
  out.collect(this.wrappedFunction.join(left, right));
}

In order to make it working for multiple outputs, it should be able to output multiple output types. I think just change join implementation to RichCollector fail to expose flexible collector.collect(tag, value) to user. A possible approach to this problem is to expose composite return type other than OUT where user may choose to return OUT typed instance or <OutputTag, value> tuple or pure unified <OutputTag, value> tuple where framework internal handles build instance from user return. It allocate default OutputTag for default OUT record.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?  We would like to approach with two phase approach. Step 1 would be backward compatible manner. Other than RichCollector change can affects some existing userFunction binary compatibility, code level upgrade effort is very little other than change collector type class name from Collector -> RichCollector. 
  • Moving forward, code refactors would create bigger impact to framework backwards compatbility. We would like to revisit and discuss once we understand benefit and impact balance better. 

Test Plan

Phase 1 will follow same as adding feature to Flink API, adding unit tests and run local env mock tests

  • No labels