Status

Discussion threadhttps://lists.apache.org/thread/x3xdvkkjys55p21ldokoo31sv4w7dt57
Vote thread
JIRA

FLINK-6131 - Getting issue details... STATUS

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

There was a first side input design document here: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#. A second attempt to solve the problem, based on the first one, was started here: https://docs.google.com/document/d/1hqUmrLY_wPTeS5bqG36Qq9P8LeDjZ_db61ky7OQy1hw/edit#heading=h.z6bw9wg0jj23. The original mailing list discussion is available here: https://lists.apache.org/thread.html/797df0ba066151b77c7951fd7d603a8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E.

Motivation

Side inputs are an extension of the idea of broadcast sets from the DataSet API to the DataStream API. The name side input (inspired by a similar feature in Apache Beam) is preliminary but we chose to diverge from the name broadcast set because 1) it is not necessarily broadcast, as described below and 2) it is not a set. A side input is an additional input to an operation that itself can result from a streaming computation. We propose adding support for side inputs in two stages. First, we would add support for non-windowed side inputs and then we would add support for windowed side inputs.

Several use cases would be enabled by or can benefit from native support for side inputs. In general, most of these follow the pattern of joining a main stream of high throughput with one or several inputs of slowly changing or static data:

  • Join stream with static data: In this use case we have a high-throughput main input and a static set of data that we want to use to filter elements in the main stream or enrich them. We would first like to load the static data into internal structures and then stream by the main stream. Since we are in a streaming environment we would have to buffer arriving elements on the main input until we know that we completely read the side input. In many cases the main input is keyed and we also want to key the side input such that we don’t have to ship all side-input elements to all parallel instances of an operation. (This is one reason for not calling the feature broadcast stream/set).
  • Join stream with slowly evolving data: This is very similar to the above case but the side input that we use for enriching is evolving over time. This can be done by waiting for some initial data to be available before processing the main input and the continuously ingesting new data into the internal side input structure as it arrives.
  • Evolving or static Filter/Enriching:  In this case we would learn some global piece of data or model that should be used to filter or otherwise affect processing of all elements. This piece of information would be broadcast to all parallel operator instances. For example, the value that we broadcast could be the average length of words that continuously changes and we downstream want to filter out words that are below or above this average. In general, this would go into the direction of streaming machine learning and dynamically updating models. In case of dynamic changes we again need some sort of triggering and continuous ingestion of new data into the side input data structures.
  • Window-Based Joins: In this example we would have two keyed streams that we want to join on possibly different (time-)windows. We need to map the windows of the main input to the correct window of the side input and also buffer elements of the main input if the data in the corresponding side input is not yet available.

Public Interfaces

The API needs two parts:

  • a way to specify additional inputs to operations as side inputs and
  • a way to get the side-input data in the operator/user function at runtime.

The proposed API relies on wrappers that wrap a DataStream and specify what kind of side input we want. The wrapper would then also be used at runtime to retrieve the contents of the side input. We don’t cover all permutations of the use cases mentioned above but they should suffice to bring across the concept.

These example shows what the concepts would look like in actual code.

Global (non-windowed) Side Input
DataStream<String> mainStream = …
DataStream<String> sideStream = …

SingletonSideInput<String> filterString =
  new SingletonSideInput<>(sideStream);

mainStream
  .filter(new RichFilterFunction<>() {
    boolean filter(String in) {
      String sideValue =
        getRuntimeContext().getSideInput(filterString);
      return in.contains(sideValue);
    }
  }).withSideInput(filterString);


Windowed Side Input
// we have to specify windows/triggers on both of them...
WindowedStream<> mainStream = …
WindowedStream<T> sideStream = …

WindowedKeyedSingletonInput<> enrichInput =
  new WindowedKeyedSingletonInput<>(sideStream);

mainStream
  .apply( (key, window, values, out) -> {
    // this is scoped to the correct key and window
    T sideValue = getSideInput(enrichInput);
    for (value in values) {
      out.collect(value + sideValue);
    }
  })
  .withSideInput(enrichInput)

Side Input Semantics

We have to decide some things:

  • What side input does the user function "see" when processing a main input?
  • Do we wait for for side input to be available before processing a main-input element?
  • If yes, when do we consider a side input ready?

There are all sorts of tricky semantics that we could try and figure out but we propose to go with the minimal viable solution:

Side Input Visibility

Here we have to look at two cases which, if you think about it, boil down to one case in the end: non-windowed side input and windowed side input. For windowed main-input, the side input that the user function can access should be the side input for the same window or, for differing window functions, a "matching window". The case of non-windowed streams is covered by considering (theoretically) everything to be in the GlobalWindow.

Side Input Readyness

We propose that processing of a main input (in a given window) waits for the side input to be available for that window. We consider side input ready as soon as the first data arrives for that side input. If side input data is updated with more incoming data then successive main-input elements see the updated side input. 

Side Input Types

We propose several different types of side inputs (these are heavily influenced by the types of side input that are available in Apache Beam):

  • List/Iterable side input: The interface that a user function sees is a List/Bag, new side input data that arrives will be appended to the existing elements
  • Singleton Side Input: The interface that a user function sees is a single value of type T. We can either not allow several elements to arrive on the side input (for a given window) or replace the previous value when new data comes in.
  • Map Side Input: The interface that a user function sees is a Map<K, V> where the user function can iterate over all entries or access by a given key (this is not to be mistaken with keyed state). If new data for a key arrives we either replace the existing value or don't allow updates. This should be in line with singleton side input.
  • Multimap Side Input: The interface that a user function sees is a Map<K, Iterable<V>>. This is the combination of Map and List/Bag side input.

Proposed Changes

We will first go into the underlying changes before again describing the proposed API in more detail.

Buffering Elements

All the side input cases mentioned above incorporate some kind of waiting element: we have to buffer main-input data until a side input is available. The main-input itself can be either keyed or un-keyed and a side input can be either keyed (only if the main input is also keyed) or broadcast. In the former case the key must match the key of the main-input stream. If the side input is not keyed it has to be broadcast because otherwise results would depend on which parallel instance of an operation receives a given side input element.

While waiting for a side input to become available we must buffer the elements from the main input. Depending on whether the main-input is keyed or not we need to store the buffered elements in either operator state or in key-group state. We cannot store them in keyed state because we need to iterate over all buffered elements when we determine that a side input is ready. We also think that element buffering should be scoped to windows; when we know that we have a side input ready for a given window we only need to process the buffered elements that were buffered for that window.

We propose to add a new Operator-level service (similar to the timer service and state backends) for buffering records with this interface:

RecordBuffer
/**
 * @param T type of records in the buffer
 * @param N type of the namespace used for scoping
 */
interface RecordBuffer<T, N> {
  /** Add records to buffer for given namespace. */
  addRecord(N namespace, T record);
  
  /** Returns an iterator over the elements stored for the given namespace. */
  Iterator<T> getRecords(N namespace);
 
  /** Removes all elements for the given namespace from the buffer. */
  void clear(N namespace);
}

depending on whether the operator is keyed or not it has to either provide a RecordBuffer that stores data in operator state when checkpointing or in key-group state. The key-group version of the RecordBuffer will have to be provided by the KeyedStateBackend while the non-keyed version can be provided by the StateBackend.

Storing Side-Input Data

For keyed operators with keyed side input we can simply store the side input in the KeyedStateBackend.

For non-keyed side input we need to store the side input in broadcast state. If we only allow broadcast state to be modified depending on the incoming (broadcast) side input data then this is state that is guaranteed to be the same on all parallel operator instances when checkpointing. Since side-input data can become somewhat big we should add a BroadcastStateBackend that has exactly the same interface as KeyedStateBackend but does not store data per key. When checkpointing, only one of the parallel instances of the operator needs to checkpoint its broadcast state. When restoring, all operators can be restored from that data.

Getting Side-Input Data into an Operator

There needs to be a way of somehow getting the actual side-input data into an operator. Right now, there are OneInputStreamOperator and TwoInputStreamOperator with have, respectively, one or two inputs. There can be zero or more side inputs. We now propose some ways of how this could be achieved

Using a tagged union input on the second input of a TwoInputStreamOperator

In this scenario, we would only allow side inputs for one-input operations. When we know that an operation is going to have side inputs we instantiate a special operator that reads the normal input on the first input and the (multiplexed) side input streams on the second input. For example, in addition the the normal StreamMap operator we would also have a StreamMapWithSideInputs operator that takes care of side inputs. For this scenario we would need to know what side inputs an operation has before instantiating the operator. This would (with the current API) lead to an API pattern like this:

Side Inputs before operation
DataStream<String> mainStream = …
DataStream<String> sideStream = …

BroadcastSingletonInput<String> filterString =
  new BroadcastSingletonInput<>(sideStream);

mainStream
  .map(new DoSomething())
  .withSideInputs(filterString) // this sets side input for following operation
  .filter(new RichFilterFunction<>() {
    boolean filter(String in) {
      String sideValue =
        getRuntimeContext().getSideInput(filterString);
      return in.contains(sideValue);
    }
  })


Having to set the side input before the operation can seem somewhat counterintuitive.

Managing side inputs at the level of StreamTask

We could handle both the buffering of elements and of side input data itself at the StreamTask instead of at the operator. The StreamTask would just provide an interface to the operator that would allow it to query the side input. In this scenario, the StreamTask would need to implement the record buffering (either per key-group or in operator state) described above. The advantage of this solution is that the operator gets a clearly defined interface and doesn't have to do much else. The disadvantage is that we would not have a more primitive interface that users could use if they have use cases that go beyond the capabilities offered by side inputs. If we decided to put the side input functionality at the operator level users would be able to override it to fit their behaviour.

Allowing a stream operator to be extended with arbitrary inputs

This would roughly follow the ideas laid out here: https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit#heading=h.pqg5z6g0mjm7. If we can arbitrarily add inputs to an operator we can also add (keyed or broadcast) inputs to an operator for dealing with side inputs. This would have the advantage that we also provide a general n-ary operator interface that users can use. The disadvantage is that we put more (somewhat complex) logic into the stream operator.

Compatibility, Deprecation, and Migration Plan

There are not backwards compatibility/migration/deprecation concerns since this only adds new API.

Test Plan

The new feature will be tested with unit tests and integration tests.

Rejected Alternatives

We list some implementation alternatives above. Once we decide on a final solution we will move the rejected alternatives here for history purposes.