Current state: [ UNDER DISCUSSION ]
Discussion thread: <link to mailing list DISCUSS thread>
JIRA: SAMZA-TBD
Released:
Exactly once semantic in stream processing aims to achieve the goal that every event in the streams is processed only once, with or without the existence of failures.
While being a seemingly simple goal, this is much more complicated than it looks like. There are two main challenges:
To some extent, exactly once semantic is mostly about solving the above two challenges.
For Samza exactly once semantic, we have the following design goals:
What are not included the design goals:
There are a lot of different solutions for exactly once processing. We only pick Kafka Streams and Flink here because the solution for most other products are either not quite applicable (spark streaming) or has big performance degradation (storm) when exactly once is enabled.
Kafka streams solves exactly once by delegating everything to Kafka transactional support. This is essentially attacking the issue by reduce the systems that are involved in the streaming pipeline. In short, Kafka Streams delegate all the following operations to Kafka:
For more details:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics
Flink uses a variant of Chandy-Lampart algorithm to take a distributed snapshot of the entire system at a given logic clock.
For more details:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html
To some extent, Samza is a comnination of Flink and Kafka Streams, but not exactly the same. Here we make some comparison between the Samza, Kafka Streams and Flink.
Samza is similar to Kafka Streams that both of them use local state, and has local state change log written back to Kafka. Unlike Kafka Streams, Samza does not make assumption on the input and output system (although it works best with Kafka). The only assumption is a partition/offsets model. So it is difficult to always assume the messaging layer has transaction support.
Flink is more slimilar to Samza that flink tasks also run in a cluster and the check point coordination is driven by the JobManager (checkpointing coordinator). From that standpoint, Samza exactly once can use the same mechanism as Flink.
However, a critical difference between Flink and Samza is that Samza has the shared channel problem while Flink does not. Also, Samza has standalone mode which does not have a centralized Yarn AM, so a separate solution is needed to address that.
If we view stream processing engine as a state machine drive by an infinite event sequence. Exactly once can be think of as the following:
The state of a stream processing consists of the following parts:
We can further label the state depending on mutability from the stream processing engine.
State | Mutability |
---|---|
[Stream, Pos] | Y |
[S_SYS] | Y |
[S_OUT] | N |
Conclusion 1: This means that for any state that has been emitted, the stream processing engine cannot revert by itself, but needs the coordination from the down stream system. e.g. abort / revert command.
A stream processing system snapshot is represented by {[Source, Pos], S_SYS, S_OUT} at a given time.
Because the stream processing system usually has a topology pipeline. The snapshot can either be Global (at job level) or Local (at task level).
One interesting question is: For exactly once delivery, would local snapshots be good enough? To answer this, imagine the following topology
In the topology each node has two local snapshots. The snapshots of the same color means they are in the same global snapshot. Now if processing node(task) n1,n2 and n3 load snapshot_n1_1, snapshot_n2_2 and snapshot_n3_2 respectively. The state of n3 would be corresponding to the original source {(S1,1500), (S2,200)}. In this case, n1 will reprocess [1001-1500] from S1 and emit some output as S_OUT. Assuming those output are [40-45] in S3. Notice that due to the unrepeatablity, the generated output may not necessarily be the same as the [21-30] in S3, which was the output from the previous run.
Now in S3 there are two sets of messages both are the processing result of [1001-1500] from S1. Message [21-30] in S3 is the S_OUT of n1, according to conclusion 1, n1 itself cannot revert those states but have to rely on n3 to do that. Now to ensure exactly once processing, n3 should only process either [20-30] or [40-45], but not both. Apparently in this particular case, [20-30] needs to be reverted after processing. In order to do that, n3 must be able to know where should it revert its state to. In this case, the state must be reverted to Snapshot_n3_1 because that is the state corresponding to source position (S1,1000). Note that in this particular case, deduplication will not work because [20-30] and [40-45] were not duplicate of each other.
Also note that in this particular case, n2 does not need to be reset to Snapshot_n2_1 because its input stream S2 does not contain any duplicate S_OUT.
Conclusion 2: Exactly once delivery requires a global snapshot for all the nodes in the same processing tree.
In Fig.1, there is a dedicated channel between two processing nodes, this means that inside each channel the boundary of the snapshots is only controlled by a single upstream processing node. The boundary can be indicated by a marker in the stream. In this case any message before the marker should be included into the previous snapshot, and anything after the marker should be included into the next snapshot. Therefore there is a clear lineage of the snapshot versions in the channel divided by the markers. However, if the communication channel between two nodes is shared, the problem would be much more complicated. Let's take a look at Fig.2:
In the interest of explanation, we give the following definition:
With the above definitions, we declare the following sufficient and necessary conditions for a valid snapshot:
Condition 1: For all the messages produced by processing node N and are before the marker for snapshot version V from N, the state change introduced by those messages should all be included in snapshot version V.
Condition 2: Any state included in a snapshot of version V should be and should only be the result of messages whose snapshot version is less than or equals to V.
In Fig.2, the messages in S3 and S4 include both those from processing node n1 and n2. Take S3 as an example, at the point processing node n3 receives both snapshot v1 commit marker from n1 and n2, it has already processed message at pos=24, which should not go into the snapshot v1 because its snapshot version is v2. Similarly, in S4, message at pos=54 should not go into snapshot v1 either. Based on conclusion 1, node n3 has to somehow deal with this.
This problem is essentially caused by early arrival of messages with next snapshot version.
Here we listed four solutions to the problem of shared channel problem from different angle. The comparison is in the next section.
Synchronize on the snapshot commit marker generation. i.e. at the same stage, no node produce message with snapshot version V+1 until all the nodes has sent a snapshot marker V to the downstream.
Skip over all the message which do not belong to the current snapshot and seek back later to re-consume those messages for processing after the current snapshot is taken.
Buffer the messages (whether in memory or to disk) whose version is higher than the current snapshot version.
To solve the shared channel problem, we introduce the following conclusion which is an inference of the aforementioned non-determinism (unrepeatability):
Conclusion 3: For a given set of messages, the order of the processing does not matter as long as all each messages is processed only once.
The meaning of conclusion 3 can also be expressed in the following way:
Take Fig.2 as an example, the processing sequence would be the following:
To handle the shared channel problem (i.e. early arrival of the messages with next snapshot version), there are a few basic ways:
Here we compare all the solutions for there pros and cons
Solution | Pros | Cons | Performance Impact |
---|---|---|---|
Solution 1: Synchronize on commit marker |
|
|
|
Solution 2: Skip, seek back and reconsume |
|
|
|
Solution 3: Buffer locally and process later |
|
|
|
Solution 4: Process on site and fix later |
|
|
|
Based on the above comparison. Solution 1 and solution 3 seems better. Given that Samza already has a local disk. I think solution 3 is the best solution due to its simplicity (most importantly no additional synchronization, which is critical for standalone mode) and small performance impact.
As mentioned, a valid snapshot requires the entire processing pipeline to have a consistent global snapshot. One question is what if there is a node failure? In the above solution 3 the downstream processing nodes will not be able to make progress, which is not ideal.
To make progress in this case, one solution is to extend solution 2 to do finer granularity reordering. The details are discussed in these slides.
The system local state S_SYS for Samza contains two parts:
Rocks DB is a K-V store and the change log topic is the Rocks DB tables journal.
For Samza, the change log topic should be the source of truth of S_SYS. This is because the local rocks DB state can always be recovered from the change log, but not vice versa.
As mentioned above, in cases of failure recovery, we have to revert downstream processing nodes to a previous global snapshot that matches the upstream node state. In RocksDB, there are potentially two ways to do this:
Instead of using a snapshot to revert the state (see more in alternative approach), We are using the second approach to avoid write any uncommitted state into a writebatch.
Each processing node will have a node generation ID. the node generation ID will be check pointed to the change log topic and a RocksDB table.
The node generation will be bumped up only when the processing node restarts.
The global snapshot version. The processing node has to wait until all the Checkpoint Command Message of a snapshot version are received from each source stream produced by source tasks before making a checkpoint of that snapshot version.
Each message will have checkpoint version indicating which checkpoint this message belongs to.
The version of the checkpoint that will be checkpointed next time.
The on disk files storing the early arrival messages whose checkpoint version is higher than the current checkpint version. See Shared Channel Problem.
A source node is a processing node that do not have any upstream processing node. So it does not have S_OUT as its input.
In a processing pipeline, the source node assumes there is no duplicate messages from its source stream. If duplicates exists a UUID based deduplication may be needed.
A sink node is a processing node that do not have any downstream processing node. So it does not have to output the markers.
In the processin pipeline, the S_OUT of a sink node is no longer controlled by the stream processing framework. So exactly once semantic will need the support from the state receiving system. (e.g. Kafka with KIP-98).
WriteBatchWithIndex
(more detail).MessageChooser
.WriteBatchWithIndex
and RocksDBWrteBatchWithIndex
if neededThe sink node output is essentially S_OUT that has left the stream processing framework. So the exactly once delivery depends on the system that takes the S_OUT. If the system support transactions (e.g. Kafka with KIP-98), the output should be produced using the transactions between two checkpoints. Otherwise, there will be no exactly once delivery guarantee for the S_OUT in the sink nodes.
MessageType ProducerId Generation CheckpointVersion Seq MessageType => INT8 ProducerId => INT32 Generation => INT32 CheckpointVersion => INT64 Seq => INT64 |
MessageType: Indicate which of the message type this message is: Normal, Checkpoint, Restore
ProducerId: The producer id for this processing node. Assuming each processor only has one producer.
Generation: the generation of the processing node who produced this message.
CheckpointVersion: The checkpoint version the message should belong to. Note that this checkpoint version is the version on the processing node who produced this message. It may not necessarily be the same as the downstream processing node checkpoint version if a failure has ever occurred.
Seq: A sequence id per processing node & partition.
When the source task finishes a checkpoint, it will send a checkpoint command message to the stream. The checkpoint message format is following:
ProducerId Generation CheckpointVersion Seq ProducerId => INT32 Generation => INT32 CheckpointVersion => INT64 Seq => INT64 |
The generation is the generation of the current processing node. The version is a monotonically increasing long.
A restore checkpoint command is sent to the downstream when a processing node has reset its state to a previous checkpoint.
Generation Version Generation => INT32 CheckpointVersion => INT64 (Not necessarily needed because theoretically the target checkpoint version is always the last checkpoint version) |
The generation is the generation of the processing node which sends this Restore Checkpoint Command.
The version is the checkpoint version to restore.
The messages written to the change log will be in the following format.
Generation Key Value Generation => INT32 Key => bytes Value => bytes |
The generation Id in the change log message is the generation id of the processing node when wrote this change log message.
When a processing node sees a CheckPoint Command Message, it will start the checkpoint process. As part of the process, a change log commit message will be written to the change log to indicate the previous change log messages since last checkpoint have been committed.
CheckpointMetadata [TopicPartitionMetadata] CheckpointMetadata => Generation [CheckpointOutputSeq] Generation => INT32 CheckpointOutputSeq => Topic [PartitionAndSeq] PartitionAndSeq => Partition Seq Partition => INT32 Seq => INT64 TopicPartitionMetadata => Topic [PartitionMetadata] Topic => String PartitionMetadata => PartitionId Offset [ParentCheckPointMetadata] PartitionId => INT32 Offset => INT64 ParentCheckpointMetadata => ProducerId Generation CheckpointVersion Seq ResumeOffset ProducerId => INT32 Generation => INT32 CheckpointVersion => INT64 Seq => INT64 RecoveryOffset => INT64 |
CheckpointMetadata is the information about current processing node.
TopicPartitionMetadata is the metadata of the source streams this processing node has been consuming from. It includes:
When a processing node resets its state to a previous checkpoint, it needs to abort the already written change log messages.
Generation Generation => INT32 |
The change log abort message only contains a generation which is the generation of the processing node that sends this abort message.
During the checkpoint the processing node will write a value of checkpoint data into the RocksDB. This is essentially a current RocksDB state to change log pointer. The data format is the following
Offset Generation CheckpointVersion Offset => INT64 Generation => INT32 Version => INT64 |
The offset is the pointer to the change log partition where the current RocksDB state corresponding to.
The generation is the current processing node generation.
The version is the checkpoint version of this checkpoint.
TODO...
One alternative approach to revert RocksDB state is to use snapshots. RocksDB allows user to take a snapshot of the rocks DB and iterate over it. The cost to take a snapshot is small, as all the subsequent writes would be a copy on write. However, there are a few issues with this approach: