Status

Discussion thread
Vote thread
JIRA
Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

[This FLIP proposal is a joint work between Zhipeng Zhang and Yun Gao].


Motivation

When developing machine learning algorithms using DataStream, we found that DataStream lacks withBroadcast functionality, which is widely used in machine learning. A common example of withBroadcast using DataSet API is as follows:


DataSet<?> d1 = ...;
DataSet<?> d2 = ...;
DataSet<?> d3 = ...;
d1.map(new RichMapFunction <?, ?>() {
	@Override
	public Object map(Object aLong) throws Exception{
		List<?> d2Elements = getRuntimeContext().getBroadcastVariable("d2");
 		List<?> d3Elements = getRuntimeContext().getBroadcastVariable("d3");

        ...;
	}
})
	.withBroadcastSet(d2, "d2")
 	.withBroadcastSet(d3, "d3");


When executing the above user-defined map function, users can access all elements of d2 and d3. This indicates that we cannot consume any element from d1 before we consumed all elements of d2 and d3. A common solution (adopted in DataSet API) is that before invoking the map function, we require that all elements of d2 and d3 be collected and cached on each parallel instances of the MapOperator.


Currently, DataStream API does not support withBroadcast. Given that Flink aims to deprecate the DataSet API, we want to support withBroadcast on DataStream API. We summarize the requirements for supporting withBroadcast as follows:

  • Supports accessing multiple  broadcast inputs.
  • Supports accessing broadcast inputs in user defined function  of a stream operator, e.g, OneInputStreamOperator, TwoInputStreamOperator, MultiInputStreamOperator.
  • Avoids the possible deadlock caused by the priority-base data-consuming.

Public Interfaces

We propose to make the following API changes to support withBroadcast functionality described above. 


1) Add the HasBroadcastVariable interface.

A stream operator that needs to access broadcast varaibles should implement the follow interface.

@PublicEvolving
public interface HasBroadcastVariable {

    /**
     * sets broadcast variable.
     *
     * @param name name of the broadcast stream.
     * @param broadcastVariable list of elements contained in this broadcast stream.
     */
    void setBroadcastVariable(String name, List<?> broadcastVariable);
}



2) Add BroadcastUtils class.

We propose to add the following utility function to support withBroadcast function.

public class BroadcastUtils {
    /**
     * supports withBroadcast in DataStream API. Broadcast data streams are available at all
     * parallel instances of an operator that implements {@link HasBroadcastVariable}. An operator
     * that wants to access broadcast variables must implement ${@link HasBroadcastVariable}.
     *
     * <p>In detail, the broadcast input data streams will be consumed first and further set by
     * {@code HasBroadcastVariable.setBroadcastVariable(...)}. For now the non-broadcast input are
     * cached by default to avoid the possible deadlocks.
     *
     * @param inputList non-broadcast input list.
     * @param bcStreams map of the broadcast data streams, where the key is the name and the value
     *     is the corresponding data stream.
     * @param userDefinedFunction the user defined logic in which users can access the broadcast
     *     data streams and produce the output data stream. Note that users can add only one
     *     operator in this function, otherwise it raises an exception.
     * @return the output data stream.
     */
    @PublicEvolving
    public static <OUT> DataStream<OUT> withBroadcastStream(
            List<DataStream<?>> inputList,
            Map<String, DataStream<?>> bcStreams,
            Function<List<DataStream<?>>, DataStream<OUT>> userDefinedFunction) throw Exception {...}
}

Proposed Changes

In this section, we discuss a few design choices related to the implementation and usage of the proposed APIs. Basically we only need to cache the broacast inputs if there is no deadlock. If there is a deadlock, we would also need to cache the non-broadcat inputs those incur the deadlock.


To support withBroadcast in DataStream, there are several possible solutions.

  • Wraps all of the broadcast inputs and non-broadcast inputs in a MultiInputStreamOperator  and caches all of the broadcast and non-broadcast inputs in memory/disk to avoid the possible deadlock. 
    • We prefer not adopting this option because it incurs expensive cache cost.
  • Wraps all of the broadcast inputs and non-broadcast inputs in a MultiInputStreamOperator and use InputSelectable to decide the order of consuming records from different inputs. In this case, we only need to cache the broadcast inputs if there is no deadlock.
    • We cannot use this option because currently Flink does not support checkpointing a MultiInputStreamOperator that implements InputSelectable.
  • Wraps broadcast Inputs in a MultiInputStreamOperator and let it co-locate with the non-broadcast input operator. In this case, we can use global static variable to share the cached inputs in these two operators. Besides, we can analyze the jobGraph and decide whether we need to cache each of the non-broadcast input.
    • We plan to choose this option since we can (1) support checkpointing (2) and control whether to cache each of the input.

Example Usage

This sections shows how to use the proposed API to support operators that use broadcast variables.


public static void main(String[] args) {
	StreamExecutionEnvironment env = ...;
	// create non-broadcast inputs
	DataStream <?> input1 = env.addSource(...);
	DataStream <?> input2 = env.addSource(...);
		
	// create inputs that needs to be broadcasted to other operators and put it in a map
	DataStream <?> broadcastInput1 = env.addSource(...);
	DataStream <?> broadcastInput2 = env.addSource(...);
	Map <String, DataStream <?>> broadcastMap = new HashMap <>();
	broadcastMap.put("broadcastInput1", broadcastInput1);
	broadcastMap.put("broadcastInput2", broadcastInput2);

	// call withBroadcastStream
	DataStream <?> output = BroadcastUtils.withBroadcastStream(
		Arrays.asList(input1, input2),
		broadcastMap,
		dataStreams -> {
			DataStream in1 = dataStreams.get(0);
			DataStream in2 = dataStreams.get(1);
			return in1.connect(in2)
				.transform(
					"two-input",
					TypeInformation.of(...),
					new MyTwoInputOp())
				.name("broadcast");
		});
		
	output.addSink(...);
	env.execute();
}

/**
 * A two-input StreamOperator that implements HasBroadcastVariable interface.
 */
private static class MyTwoInputOp extends AbstractStreamOperator<?>
	implements TwoInputStreamOperator<?, ?, ?>, HasBroadcastVariable {

	// a map used to store the broadcast variables.
	Map<String, List<?>> broadcastVariables = new HashMap <>();

	@Override
	public void setBroadcastVariable(String name, List <?> broadcastVariable) {
		broadcastVariables.put(name, broadcastVariable);
	}

	@Override
	public void processElement1(StreamRecord <?> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}

	@Override
	public void processElement2(StreamRecord <?> streamRecord) throws Exception {
		List<?> broadcastInput1 = broadcastVariables.get("broadcastInput1");
		List<?> broadcastInput2 = broadcastVariables.get("broadcastInput2");
		// process element using the broadcast inputs
		// ...
	}
}


Compatibility, Deprecation, and Migration Plan

  • The code written in DataSet API using DataSet#withBroadcastSet  can call BroadcastUtils#withBroadcastStream for migration.
  • DataStream API does not support withBroadcast() for now. There are no compatibilty issues.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

  • Option-1: Another possible solution support `withBroadcast` is to reuse StreamingRuntimeContext#getBroadcastVariable(...). However, we cannot assume that all stream operator contains a streamingRuntimeContext.