Current state: ACCEPTED
Discussion thread: N/A
Released: Samza 1.0
This document outlines a proposal for extending Samza’s Execution Planner to verify agreement in partition count among the stream(s) behind Tables and other streams participating in Stream-Table Joins in applications written using Samza High-Level APIs.
Motivating Example: Stream-Stream Join
In a typical application using Samza high-level API, the user defines a sequence of operations to be performed on a stream of messages using a set of methods defined on the org.apache.samza.operators.MessageStream interface.
For instance, to perform the operations illustrated in Fig. 1 on a stream of messages, a user can write the Samza app in listing 1 using Samza high-level API:
Fig. 1 — A logical workflow of stream processing operations
Fig. 2 — An illustration of the OperatorSpec graph of objects generated by Samza for the application in listing 1. OperatorSpecs associated with input/output streams are highlighted in yellow.
This code gets translated by Samza’s core libraries into an internal representation where every operation/transformation is represented by an
OperatorSpec object, a logical abstraction that describes the operation specified by the app author.
Some of these
OperatorSpecs represent message streams which can be thought of as channels propagating data into and out of the application, e.g.
InputOperatorSpecsrepresent input data streams, e.g. S1 and S2, from which input messages are read.
OutputOperatorSpecsrepresent output data streams, e.g. S3, to which processed data is produced.
A common example of such message streams is Kafka topics which can be used to carry data consumed or produced by Samza applications. But the fact that Kafka topics are typically sharded into different partitions presents an interesting challenge to some stream operations. In the application above, for instance, it is specified that a Join operation is to be performed between a filtered version of data stream S1 and data stream S2. But in the case where these data streams are Kafka topics, Samza has to ensure that these Kafka topics are — at the very least — sharded into the same number of partitions, or else the Join operation would be invalid.
Responsibilities of Samza’s Execution Planner
Verifying Compatibility of Joined Input Streams
The Execution Planner is the core Samza module responsible for verifying that all streams participating in any given Join operation agree in partition count. To achieve this, it traverses the graph of
OperatorSpecs produced by Samza High-Level API to verify compliance to this requirement among all such sets of streams.
Fig. 3 — 2 examples cases of Stream-Stream Joins. After considering the partition counts of the joined input streams, Samza’s Execution Planner accepts the one to the left but rejects the one to the right.
Inferring Partition Counts of Intermediate Streams
Another closely-related responsibility of Samza’s Execution Planner is inferring partition counts of all intermediate streams present in the
OperatorSpec graph. Such streams are introduced into the
OperatorSpec graph whenever the Partition-By operation is used, and are represented by the same type of
OperatorSpecs used to represent input streams, i.e.
InputOperatorSpec. Unlike input streams however, intermediate streams have no defined partition counts by default. As we said, it is the Execution Planner that decides the partition count of every intermediate stream after traversing the
OperatorSpec graph, according to the following rules:
Any intermediate stream joined with an input stream gets assigned the same partition count as that input stream.
Any intermediate stream not covered by the first rule is assigned the partition count value specified by the Samza config property
If no value is specified for
job.intermediate.stream.partitions, the Execution Planner falls back to using the maximum partition count among all input and output streams, capped at a maximum hard-coded value of 256.
Fig. 4 — The OperatorSpec graph of an example high-level Samza application that employs the Partition-By operation. The Execution Planner decides to assign the partition count value 16 to intermediate stream S2′, the same value of input stream S1, since they are joined together.
It is important to realize there are situations where it is not possible to enforce agreement between an intermediate stream and the input streams it is joined with, a scenario that would cause the Execution Planner to signal an error and reject the whole application. Fig. 5 illustrates one such case.
Fig. 5 — The
Tables and Stream-Table Joins
A recent addition to Samza is the introduction of Table, a key-value abstraction that facilitates accessing remotely stored data. And with this addition, it was also made possible to perform Join operations between Tables and streams.
The code sample below demonstrates how this can be achieved using Samza High-Level API.
Fig. 6 — A diagram illustrating the logical data flow in the example Samza application in listing 2. Stream S1 is partitioned then sent to table T which is then joined with stream S2.
But since Tables can be populated with data flowing from input streams (aka local tables), it is still important to ensure that the stream used to populate the table has the same number of partitions as the stream the table is joined with. Failing to do so exposes Stream-Table Joins to the same class of problems Stream-Stream Joins could run into if Samza were to allow joining 2 streams with different partition counts, i.e. invalid Joins.
Another recent addition to Samza that is related to Tables is Side-Input Streams. Simply put, this feature allows Samza application authors to specify that a table should be populated — and constantly updated — with data from one or more input streams. Such streams have been given the name Side-Input Streams or Side-Inputs for short.
As far as Samza’s Execution Planner is concerned, Tables with side-input Streams are not much different from other tables. They can be treated like other tables populated with data from a non-side-input stream. However, there is one key difference that requires treating tables with side-inputs differently. This difference is that, unlike all other streams, side-input streams are not represented in the
OperatorSpec graph. At the time of writing, no
InputOperatorSpecs — or any
OperatorSpec for that matter — are created to represent side-input streams in the
OperatorSpec graph. Only the names of side-input streams are specified in the configuration of the tables they populate. And since Samza’s Execution Planner currently relies on traversing the
OperatorSpec graph to identify groups of input streams related through Joins, accounting for tables with side-inputs will require some special handling.
At the time of writing, Samza’s Execution Planner does not account for Tables, which means it is simply incapable of fulfilling its responsibilities with Samza High-Level applications performing Stream-Table Joins.
In particular, assuming that we have:
A table T
An input stream S1, with partition count P1, used to populate table T
An input stream S2, with partition count P2, joined with table T
The table below enumerates a number of cases in which Samza’s current Execution Planner does not enforce the necessary constraints on P1 and P2 to ensure the validity of the Stream-Table Join between table T and stream S2.
P1 must be equal to P2
P2 must be set to P1
P1 must be set to P2
If the result of joining S1 and S2 is subsequently joined with an input stream S3, P1 and P2 must be set to P3.
Cases #1 and #2 apply equally well if S1 is a side-input stream.
In all these cases, Samza application authors have no defense against essentially invalid Stream-Table Joins.
As explained in the Responsibilities of Samza’s Execution Planner section, Samza’s Execution Planner employs traversal of the
OperatorSpec graph to identify groups of input and/or intermediate streams involved in Join operations and verify agreement among their partition counts. To understand the reason why this does not work in the case of Stream-Table Joins, we examine the
OperatorSpec graph generated for the Samza application in the code sample below.
Fig. 7 — A graph representing the OperatorSpec graph generated by Samza for the application in listing 3. As usual, OperatorSpecs associated with input/output streams are highlighted in yellow.
It is important to observe the following key differences between this graph and the graph representing the Stream-Stream Join application in Fig. 2:
StreamTableJoinOperator, is used to represent Stream-Table Join operations.
A new terminal
SendToTableOperatorSpecis used to represent the operation of producing data to a table.
StreamTableJoinOperatorSpecis not connected to the
SendToTableOperatorSpec. It only has a reference to the table (
TableSpec) participating in the Stream-Table Join operation.
The disconnect between the
OperatorSpecs is attributed to the way the
OperatorSpec graph gets constructed, whereby it is modeled exactly after the
StreamApplication defined by the user, i.e.
SendTo(Table)results in an
InputOperatorSpecconnected to a
FilterOperatorSpecwhich is in turn connected to a
sendTo(Stream)results in a
StreamTableJoinOperatorSpecconnected to an
And since no further operations can be chained to
sendTo(Table) according to the current Samza High-Level APIs, a
SendToTableOperatorSpec is always a terminal vertex in the
OperatorSpec graph, and all subsequent operations on a table will have their corresponding
OperatorSpecs in disconnected components of the graph.
OperatorSpec graph generated in this scenario with that of the Stream-Stream Join application in listing 1 confirms this causation.
To extend Samza’s ExecutionPlanner to support Tables, we need to address the disconnect between a
SendToTableOperatorSpec and all relevant
StreamTableJoinOperatorSpecs. One possibility that does not require changing Samza’s High-Level APIs is to modify the
OperatorSpec graph traversal such that virtual connections are assumed between every
SendToTableOperatorSpec and all the
StreamTableJoinOperatorSpecs that reference the same table (
TableSpec) in the entire
Fig. 8 — A graph representing the
Modularizing Partition Count Calculation
I propose to break down the code for verifying the validity of Join operations, in
ExecutionPlanner.calculatePartitions(), into 3 different pieces:
Join input validation
Details of each one of these steps are outlined below.
OperatorSpec Graph Analysis
In this phase, we perform all the necessary traversals on the
OperatorSpec graph. Specifically, we need 2 traversals of the entire graph starting from
First traversal is a preprocessing step for the second traversal, where we create an association between every
StreamTableJoinOperatorSpecs sharing the same
Second traversal is for creating an association between
InputOperatorSpecs and all relevant Join
OperatorSpecs}. Stream-Table Joins are also covered in this traversal by customizing the
OperatorSpecgraph traversal through the association created in step 1 above to assume virtual connections.
Both traversals of the
OperatorSpec graph are very similar. In essence, we’re just visiting every
OperatorSpec and recording some observations upon encountering certain types of
OperatorSpecs. This motivates the introduction of the graph traversal utility discussed later in Unifying Graph Traversal.
Operations in this step tackle
OperatorSpecs exclusively. The outcome of this phase will be the association created in step 2 above, where
InputOperatorSpecs are grouped by the Join
OperatorSpecs they participate in.
Conversion to StreamEdges
In this phase, we convert every entry in the association produced by the previous step into a
StreamEdge set by simply dropping the key and replacing every
InputOperatorSpec with its corresponding
To support tables with side-input streams, we can also examine every key (Join
OperatorSpec) in the association produced by the previous step, retrieve the relevant
TableSpec if the key is a
StreamTableJoinOperatorSpec, and add a new input
StreamEdge that corresponds to the side-input stream, to the appropriate set of
The output of this step is a collection of
StreamEdge sets, each of which represents a group of
StreamEdges participating in a particular Join. The number of such sets should be typically equivalent to the number of Joins in the
Processing Joined Streams
In this step, we process every group of
StreamEdges produced by the previous step. Each group can be composed of:
StreamEdges with defined partition counts only (i.e. input streams), or
StreamEdgeswith defined/undefined partition counts (i.e. input/intermediate streams), or
StreamEdgeswith undefined partition counts only (i.e. intermediate streams)
Groups of type (b) are processed such that all
StreamEdges with undefined partition counts in a group get assigned the same partition count as the other stream(s) whose partition count is defined in the same group. It is worth noting that this assignment can change the type of other group(s) from (c) to (b) because 1 intermediate
StreamEdge can appear in multiple groups. This entails that we perform this assignment iteratively and incrementally in such a way that accounts for the interdependencies between the different groups. Finally,
StreamEdges in groups of type (c) are assigned the maximum partition count among all input and output
StreamEdges capped at a maximum hardcoded value of 256.
At the end of this process,
StreamEdges in every group must have the same partition count or else the Execution Planner will reject the application.
Operations in this step tackle
StreamEdges exclusively. No further associations with other entities are necessary.
Unifying Graph Traversal
I propose to introduce the following general stateless graph traversal utility for use in
OperatorSpec Graph Analysis.
The following code snippet demonstrates how this utility can be used to traverse and print the entire
OperatorSpec graph starting from its
Introducing such utility has several advantages:
It spares us having to write redundant code for the 2 traversals needed during the
OperatorSpecGraph Analysis step.
It makes it easy to carry out more traversals of any part of the
OperatorSpecgraph should the need arise.
It allows us to write modular visitors, i.e. implementations of
Consumer<T>, that observe different aspects of the graph while keeping their code exclusively focused on collecting, maintaining, and updating metadata about the traversed graph in isolation from the traversal code itself.
It allows us to customize the traversal progression so that virtual connections can be made between otherwise disconnected parts of the
OperatorSpecgraph, as in the second traversal in
OperatorSpecGraph Analysis. This way, we can keep the
OperatorSpecgraph representation as close as possible to the way it was defined by the user without being limited by it.
This utility can be used to carry out the 2 traversals in the
OperatorSpec Graph Analysis step as shown below.
The first traversal can be done as follows:
SendToTableVisitor is a type dedicated to observing the
OperatorSpec graph and building the association between
SendToTableOperatorSpecs and the
StreamTableJoinOperatorSpecs that share the same
The second traversal can be similarly achieved, with a custom implementation of the
getNextVertexes() function that relies on the association created in the previous step.