This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • SEP-10 Exactly-once Processing in Samza
Skip to end of metadata
Go to start of metadata

Status

Current state: [ UNDER DISCUSSION ]

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-TBD

Released: 

Overview

Exactly once semantic in stream processing aims to achieve the goal that every event in the streams is processed only once, with or without the existence of failures.

While being a seemingly simple goal, this is much more complicated than it looks like. There are two main challenges:

  1. Non determinism (unrepeatability).
    Unlike batch processing, stream processing is inherently more non-deterministic. More specifically:
    1. The order of the events get processed is not deterministic. For example, consider a stream processing task consuming from 2 different topics, this is no guarantee of the message arrival order of the two topics. If the task is run multiple times, there is no guarantee that the messages will be precessed in the same order between any two runs. 
    2. The state at the point when an event is processed is also non-deterministic. A simple example would be an external store query during the stream processing, which can be updated asynchronously.
    Because of the non-determinism, when failure occurs, finding the right resuming point becomes tricky. So many systems simply choose to either reprocess the entire stream (if possible), or bootstrap the state from a snapshot. 
  2. Involvement of multiple system.
    Stream processing usually involves different sources, state store and sinks. This means the state of the entire stream processing job is widely distributed. This makes traditional solution for exactly once semantic such as transaction either very expensive or even impossible.

To some extent, exactly once semantic is mostly about solving the above two challenges.

Design Goals

For Samza exactly once semantic, we have the following design goals:

  1. Each message is processed exactly once by a Samza job.
  2. When a container failure happens, if the task is restarted on the same host (due to host affinity), no change log bootstrap should be required. 
  3. Performance with exactly once semantic enabled should be almost the same as at least once delivery.
  4. No dependency on transaction support of the underlying messaging system. Any messaging system with at least once delivery guarantee should be sufficient for Samza to achieve exactly once processing.
  5. Same design for both YARN controlled mode and standalone mode.

What are not included the design goals:

  1. Guarantee on the state transition order.
  2. Guarantee on the result reproduceability.

Existing Solutions And Comparison

There are a lot of different solutions for exactly once processing. We only pick Kafka Streams and Flink here because the solution for most other products are either not quite applicable (spark streaming) or has big performance degradation (storm) when exactly once is enabled.

Kafka Streams

Kafka streams solves exactly once by delegating everything to Kafka transactional support. This is essentially attacking the issue by reduce the systems that are involved in the streaming pipeline. In short, Kafka Streams delegate all the following operations to Kafka:

  1. Reading input from one or more input topics
  2. Making changes to local state, which, however it is layed out on disk or in memory can be journaled to a compacted Kafka topic
  3. Producing output to one or more output topics
  4. Updating the consumer offset to mark the input as processed

For more details:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics

Flink uses a variant of Chandy-Lampart algorithm to take a distributed snapshot of the entire system at a given logic clock. 

For more details:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/stream_checkpointing.html

Comparison

To some extent, Samza is a comnination of Flink and Kafka Streams, but not exactly the same. Here we make some comparison between the Samza, Kafka Streams and Flink.

Samza v.s. Kafka Streams

Samza is similar to Kafka Streams that both of them use local state, and has local state change log written back to Kafka. Unlike Kafka Streams, Samza does not make assumption on the input and output system (although it works best with Kafka). The only assumption is a partition/offsets model. So it is difficult to always assume the messaging layer has transaction support.

Flink is more slimilar to Samza that flink tasks also run in a cluster and the check point coordination is driven by the JobManager (checkpointing coordinator). From that standpoint, Samza exactly once can use the same mechanism as Flink.

However, a critical difference between Flink and Samza is that Samza has the shared channel problem while Flink does not. Also, Samza has standalone mode which does not have a centralized Yarn AM, so a separate solution is needed to address that.

Before The Design: Some Reasoning of Exactly Once Semantic

If we view stream processing engine as a state machine drive by an infinite event sequence. Exactly once can be think of as the following:

State

The state of a stream processing consists of the following parts:

  1. For each event source event sequence Source, the last processed position PosFor a system with multiple inputs, this can be represented as  [Source, Pos]
  2. The stream processing engine internal state S_SYS
  3. The state that has already been emitted: S_OUT

We can further label the state depending on mutability from the stream processing engine.

 

StateMutability

[Stream, Pos]

Y

[S_SYS]

Y
[S_OUT]N

Conclusion 1: This means that for any state that has been emitted, the stream processing engine cannot revert by itself, but needs the coordination from the down stream system. e.g. abort / revert command.

Snapshot

A stream processing system snapshot is represented by {[Source, Pos], S_SYS, S_OUT} at a given time.

Because the stream processing system usually has a topology pipeline. The snapshot can either be Global (at job level) or Local (at task level).

One interesting question is: For exactly once delivery, would local snapshots be good enough? To answer this, imagine the following topology

In the topology each node has two local snapshots. The snapshots of the same color means they are in the same global snapshot. Now if processing node(task) n1,n2 and n3 load snapshot_n1_1, snapshot_n2_2 and snapshot_n3_2 respectively. The state of n3 would be corresponding to the original source {(S1,1500), (S2,200)}.  In this case, n1 will reprocess [1001-1500] from S1 and emit some output as S_OUT. Assuming those output are [40-45] in S3. Notice that due to the unrepeatablity, the generated output may not necessarily be the same as the [21-30] in S3, which was the output from the previous run.

Now in S3 there are two sets of messages both are the processing result of [1001-1500] from S1. Message [21-30] in S3 is the S_OUT of n1, according to conclusion 1, n1 itself cannot revert those states but have to rely on n3 to do that. Now to ensure exactly once processing, n3 should only process either [20-30] or [40-45], but not both. Apparently in this particular case, [20-30] needs to be reverted after processing. In order to do that, n3 must be able to know where should it revert its state to. In this case, the state must be reverted to Snapshot_n3_1 because that is the state corresponding to source position (S1,1000). Note that in this particular case, deduplication will not work because [20-30] and [40-45] were not duplicate of each other.

Also note that in this particular case, n2 does not need to be reset to Snapshot_n2_1 because its input stream S2 does not contain any duplicate S_OUT.

Conclusion 2: Exactly once delivery requires a global snapshot for all the nodes in the same processing tree.

Shared Channel Problem

In Fig.1, there is a dedicated channel between two processing nodes, this means that inside each channel the boundary of the snapshots is only controlled by a single upstream processing node. The boundary can be indicated by a marker in the stream. In this case any message before the marker should be included into the previous snapshot, and anything after the marker should be included into the next snapshot. Therefore there is a clear lineage of the snapshot versions in the channel divided by the markers. However, if the communication channel between two nodes is shared, the problem would be much more complicated. Let's take a look at Fig.2:

In the interest of explanation, we give the following definition:

  1. We give each snapshot a version number. The snapshot version is the version of its commit markers. 
  2. We label each message with a snapshot version. The snapshot version of a message indicates which snapshot the processing result of that message should belong to. The snapshot version of a message is V if the message is between snapshot commit marker V-1 and V.

With the above definitions, we declare the following sufficient and necessary conditions for a valid snapshot:

Condition 1: For all the messages produced by processing node N  and are before the marker for snapshot version V from N, the state change introduced by those messages should all be included in snapshot version V.

Condition 2: Any state included in a snapshot of version V should be and should only be the result of messages whose snapshot version is less than or equals to V.

In Fig.2, the messages in S3 and S4 include both those from processing node n1 and n2. Take S3 as an example, at the point processing node n3 receives both snapshot v1 commit marker from n1 and n2, it has already processed message at pos=24, which should not go into the snapshot v1 because its snapshot version is v2. Similarly, in S4, message at pos=54 should not go into snapshot v1 either. Based on conclusion 1, node n3 has to somehow deal with this.

This problem is essentially caused by early arrival of messages with next snapshot version.

Possible solutions to the shared channel problem

Here we listed four solutions to the problem of shared channel problem from different angle. The comparison is in the next section.

Solution 1: a simple and absolutely working solution

Synchronize on the snapshot commit marker generation. i.e. at the same stage, no node produce message with snapshot version V+1 until all the nodes has sent a snapshot marker V to the downstream.

Solution 2: another simple and working solution

Skip over all the message which do not belong to the current snapshot and seek back later to re-consume those messages for processing after the current snapshot is taken.

Solution 3: yet another simple and working solution

Buffer the messages (whether in memory or to disk) whose version is higher than the current snapshot version.

Solution 4: A potentially more efficient but absolutely more complicated solution.

To solve the shared channel problem, we introduce the following conclusion which is an inference of the aforementioned non-determinism (unrepeatability):

Conclusion 3: For a given set of messages, the order of the processing does not matter as long as all each messages is processed only once.

The meaning of conclusion 3 can also be expressed in the following way:

  • The states in snapshot version V does not have to start after snapshot version V-1 has been fully taken, or 
  • It is not necessary to process all messages with snapshot version V-1 before processing messages with snapshot version V 

Take Fig.2 as an example, the processing sequence would be the following:

  1. n3 is at position {(S3, 23), (S4, 53)}. So far everything that has been read are with snapshot V1 (we can think of it as a mutable K-V store snapshot_v1_temp).
  2. n3 is at position {(S3, 24), (S4, 53)}. Because S3 pos=24 is labeled as snapshot V2. The result cannot go into snapshot V1. 
    1. n3 creates a temp K-V store and mark it as snapshot_v2_temp, it is essentially an incremental addition to snapshot V1. 
    2. n3 puts the result K-V of (S3, 24) only into snapshot_v2_temp.
  3. n3 is at position {(S3, 25), (S4, 53)}. Because S3 pos=25 is labeled as snapshot V1, its result should go into both snapshot_v1_temp and snapshot_v2_temp. The following two computation is needed in this case:
    1. when calculating the result forsnapshot_v1_temp,  snapshot_v2_temp would be ignored. i.e. the current local state will exclude it. Say the result is (K_v1, V_v1).This result will be put into snapshot_v1_temp.
    2. when calculating the result for snapshot_v2_tempthe local state will include both snapshot_v1_temp and snapshot_v2_temp, say the result is (K_v2, V_v2). This result will be put into snapshot_v2_temp.
    3. Note that both (K_v1, V_v1) and (K_v2, V_v2) are the processing results of the same message (S3, 25). Both of them are correct but just based on different processing order. (non-determinism and unrepeatability). 
      However, from exactly once standpoint, only one of them should exist in the eventual state. Because snapshot_v2_temp will be applied later, so when n3 commits (K_v2,V_v2) to its state, it should also remove (K_v1,V_v1) from the state. if K_v1 and K_v2 are the same, this would be an overwrite.
  4. n3 is at position {(S3, 26), (S4, 53)}. n3 cannot close snapshot_v1_temp because the commit marker has not been received from S4.
  5. n3 is at position {(S3, 27), (S4, 53)}. Because S3 pos=25 is labeled as snapshot V2, its result should only go into snapshot_v2_temp.
  6. n3 is at position {(S3, 27), (S4, 54)}, Same as step 5.
  7. n3 is at position {(S3, 27), (S4, 55)}, all the commit marker for snapshot v1 has been collected, now flush snapshot_v1_temp to the the local state. At this point, the local state only include all the processing result of messages labeled with snapshot version V1, because all the messages with snapshot version V2 went into snapshot_v2_temp.
  8. Later on when snapshot_v2_temp is committed to the state, the local state will be only include all the processing result of messages with snapshot version V1 and V2.

Comparison of all the solutions

To handle the shared channel problem (i.e. early arrival of the messages with next snapshot version), there are a few basic ways:

  1. Avoid such early arrival from the beginning (Prevention: solution 1)
  2. Delay the processing of those messages. (Delay/Reorder: solution 2 and solution 3)
  3. Process the message on sight and fix the snapshot later. (Fix: solution 4)

Here we compare all the solutions for there pros and cons

SolutionProsConsPerformance Impact
Solution 1: Synchronize on commit marker
  • Simple.
  • No duplicate consumption.
  • Synchronization is needed across all the tasks in the same stage.
  • Depending on the synchronization cost. Typically tens of ms.
Solution 2: Skip, seek back and reconsume
  • Simple.
  • No synchronization across tasks in the same stage.
  • Reconsumption is required, will invalidate prefetch.
  • Depending on how far we have to seek back. Typically the overhead is at hundreds of ms.
Solution 3: Buffer locally and process later
  • Simple.
  • No duplicated consumption.
  • No synchronization across tasks in the same stage.
  • Local disk is involved.
  • Depending on the local disk speed. Typically a few ms to tens of ms.
Solution 4: Process on site and fix later
  • no buffer or reorder is needed
  • no duplicated consumption
  • no synchronization across tasks in the same stage.
  • complicated
  • one message needs to be processed multiple times
  • The performance impact is linear to the processing time and number of early arrivals.
  • Depending on the processing time and the number of early arrivals. The longer the processing time, the bigger the impact. The more early arrivals, the bigger the performance impact would be

Based on the above comparison. Solution 1 and solution 3 seems better. Given that Samza already has a local disk. I think solution 3 is the best solution due to its simplicity (most importantly no additional synchronization, which is critical for standalone mode) and small performance impact.

Progress on processing node failure

As mentioned, a valid snapshot requires the entire processing pipeline to have a consistent global snapshot. One question is what if there is a node failure? In the above solution 3 the downstream processing nodes will not be able to make progress, which is not ideal.

To make progress in this case, one solution is to extend solution 2 to do finer granularity reordering. The details are discussed in these slides.

System local state for Samza

The system local state S_SYS for Samza contains two parts:

  1. Rocks DB
  2. Kafka Change Log topic.

Rocks DB is a K-V store and the change log topic is the Rocks DB tables journal.

For Samza, the change log topic should be the source of truth of S_SYS. This is because the local rocks DB state can always be recovered from the change log, but not vice versa.

Reverting RocksDB State

As mentioned above, in cases of failure recovery, we have to revert downstream processing nodes to a previous global snapshot that matches the upstream node state. In RocksDB, there are potentially two ways to do this:

  1. RocksDB snapshot.
  2. WriteBatch.

Instead of using a snapshot to revert the state (see more in alternative approach), We are using the second approach to avoid write any uncommitted state into a writebatch.

Detail Design

Hi Level Principle

  • The global states checkpoint is driven by all the source nodes (the nodes in the first stage of the entire pipeline)
  • No synchronization between processing nodes in the same processing stage.

Terminology

Node Generation

Each processing node will have a node generation ID. the node generation ID will be check pointed to the change log topic and a RocksDB table.

The node generation will be bumped up only when the processing node restarts.

Checkpoint Version

The global snapshot version. The processing node has to wait until all the Checkpoint Command Message of a snapshot version are received from each source stream produced by source tasks before making a checkpoint of that snapshot version.

Message Checkpoint Version

Each message will have checkpoint version indicating which checkpoint this message belongs to.

Current Checkpoint Version

The version of the checkpoint that will be checkpointed next time. 

Message Disk Buffer

The on disk files storing the early arrival messages whose checkpoint version is higher than the current checkpint version. See Shared Channel Problem.

Source Node

A source node is a processing node that do not have any upstream processing node. So it does not have S_OUT as its input.

In a processing pipeline, the source node assumes there is no duplicate messages from its source stream. If duplicates exists a UUID based deduplication may be needed.

Sink Node

A sink node is a processing node that do not have any downstream processing node. So it does not have to output the markers.

In the processin pipeline, the S_OUT of a sink node is no longer controlled by the stream processing framework. So exactly once semantic will need the support from the state receiving system. (e.g. Kafka with KIP-98).

Flow Overview

Processing Node Start (1.x)

  1. Check local RocksDB to see if there is already checkpoint data
    1. If there is checkpoint data, use the pointer in the checkpoint data to read until the end of the change log partition.
    2. If there is no checkpoint data, bootstrap from change log partition.
  2. Restore the state from the last available checkpoint data.
  3. Check if the last message in the change log partition was a Change Log Commit Message
    1. If yes, send a Checkpoint Command Message with that checkpoint version to all the downstream partitions.
    2. If no, append a Change Log Abort Message to the change log partition
  4. Send a Restore Checkpoint Command with the last checkpoint version to all the downstream partitions.

Get Messages (2.2)

  1. Check if there is a Message Disk Buffer with the current checkpoint version.
    1. if there is such message disk buffer and it is not empty, return the first message in the buffer.
  2. Read a message from the source stream.
  3. If the message checkpoint version is the current checkpoint version, return this message.
  4. If the message checkpoint version is above the current checkpoint version
    1. Append the message to the message disk buffer of the  message's checkpoint version. (Create the message disk buffer if needed)
  5. If the message checkpoint version is below the current checkpoint version, skip that message.
  6. go back to step 2.

Message Processing (2.x)

  1. Create a RocksDB WriteBatchWithIndex (more detail).
  2. Read a message from MessageChooser.
  3. Process the message
    1. Reading by using the WriteBatchWithIndex and RocksDB
    2. Process the message
    3. Write to the chagne log partition if needed
    4. Write to the WrteBatchWithIndex if needed
    5. output to the downstream
  4. If the node is a source task node and checkpoint is required after step 3, execute the checkpiont process.
  5. bump up the current checkpoint version.

Sink Node Output

The sink node output is essentially S_OUT that has left the stream processing framework. So the exactly once delivery depends on the system that takes the S_OUT. If the system support transactions (e.g. Kafka with KIP-98), the output should be produced using the transactions between two checkpoints. Otherwise, there will be no exactly once delivery guarantee for the S_OUT in the sink nodes.

Checkpoint

Checkpoint Process (3.x)

  1. Receive a Checkpoint Command Message from a stream.
  2. Keep consuming from the other streams until all the Checkpoint Command Message of the same version is received from all the streams. (synchronization barrier)
  3. Flush all the output messages to the downstream. (Flush S_OUT)
  4. Flush all the Change Log Messages
  5. Flush the Change Log Commit Message and get the offset of that message.
    1. The offsets in the change log commit message should be the first message's offset in the message disk buffer
  6. Write the RocksDB checkpoint data to the current open WriteBatch.
  7. Close the WriteBatch and write the batch to RocksDB.
  8. Send the Checkpoint Command Message with its own generation id to the downstream.
  9. Delete the corresponding Message Disk Buffer.

Restoring Checkpoint Process (4.x)

  1. Receive a Restore Checkpoint Command
  2. If the checkpoint version to restore is not the last checkpoint version, it is an exception. Otherwise continue.
  3. Discard the current WriteBatch if it is not empty. 
  4. Write the Change Log Abort Message with the current processing node's generation id to the change log partition.
  5. Forward the restore checkpoint command
  6. reset the offsets in the last checkpoint and continue reading.

Protocol Design

Message Protocol

Checkpoint Command Message
MessageType ProducerId Generation CheckpointVersion Seq
	MessageType => INT8
	ProducerId => INT32
	Generation => INT32
	CheckpointVersion => INT64
	Seq => INT64

MessageType: Indicate which of the message type this message is: Normal, Checkpoint, Restore

ProducerId: The producer id for this processing node. Assuming each processor only has one producer.

Generation: the generation of the processing node who produced this message.

CheckpointVersion: The checkpoint version the message should belong to. Note that this checkpoint version is the version on the processing node who produced this message. It may not necessarily be the same as the downstream processing node checkpoint version if a failure has ever occurred.

Seq: A sequence id per processing node & partition.

Checkpoint Protocol

Checkpoint Command Message

When the source task finishes a checkpoint, it will send a checkpoint command message to the stream. The checkpoint message format is following:

Checkpoint Command Message
ProducerId Generation CheckpointVersion Seq
	ProducerId => INT32
	Generation => INT32
	CheckpointVersion => INT64
	Seq => INT64

The generation is the generation of the current processing node. The version is a monotonically increasing long.

Restore Checkpoint Command

A restore checkpoint command is sent to the downstream when a processing node has reset its state to a previous checkpoint.

RestoreCheckpointCommand
Generation Version
	Generation => INT32
	CheckpointVersion => INT64 (Not necessarily needed because theoretically the target checkpoint version is always the last checkpoint version)

The generation is the generation of the processing node which sends this Restore Checkpoint Command.

The version is the checkpoint version to restore.

Change Log Message Format

The messages written to the change log will be in the following format.

Change Log Topic Message Format
Generation Key Value
	Generation => INT32
	Key => bytes
	Value => bytes

The generation Id in the change log message is the generation id of the processing node when wrote this change log message.

Change Log Commit Message Format

When a processing node sees a CheckPoint Command Message, it will start the checkpoint process. As part of the process, a change log commit message will be written to the change log to indicate the previous change log messages since last checkpoint have been committed.

Change Log Commit Message Format
CheckpointMetadata [TopicPartitionMetadata]
	CheckpointMetadata => Generation [CheckpointOutputSeq]
		Generation => INT32
		CheckpointOutputSeq => Topic [PartitionAndSeq]
			PartitionAndSeq => Partition Seq
				Partition => INT32
				Seq => INT64
	TopicPartitionMetadata => Topic [PartitionMetadata]
		Topic => String
		PartitionMetadata => PartitionId Offset [ParentCheckPointMetadata]
			PartitionId => INT32
			Offset => INT64
			ParentCheckpointMetadata => ProducerId Generation CheckpointVersion Seq ResumeOffset
				ProducerId => INT32
				Generation => INT32
				CheckpointVersion => INT64
				Seq => INT64
				RecoveryOffset => INT64

CheckpointMetadata is the information about current processing node.

TopicPartitionMetadata is the metadata of the source streams this processing node has been consuming from. It includes:

  • Partition offset to resume consumption if the state is reset to this checkpoint.
  • The checkpoint version in each parent node corresponding to this checkpoint.
  • The generation of each parent node when this checkpoint is taken.
  • The checkpointed output sequence number from each parent node.
  • The resume offset if there is a false positive on parent node failure. See more on the slides.

Change Log Abort Message Format

When a processing node resets its state to a previous checkpoint, it needs to abort the already written change log messages.

ChangeLogAbortMessage
Generation
	Generation => INT32

The change log abort message only contains a generation which is the generation of the processing node that sends this abort message.

RocksDB Checkpoint Data Format

During the checkpoint the processing node will write a value of checkpoint data into the RocksDB. This is essentially a current RocksDB state to change log pointer. The data format is the following

Offset Generation CheckpointVersion
	Offset => INT64
	Generation => INT32
	Version => INT64

The offset is the pointer to the change log partition where the current RocksDB state corresponding to.

The generation is the current processing node generation.

The version is the checkpoint version of this checkpoint.

Failure Scenario and Recovery Method

TODO...

Alternative Approaches

Use RocksDB snapshot instead of WriteBatch

One alternative approach to revert RocksDB state is to use snapshots. RocksDB allows user to take a snapshot of the rocks DB and iterate over it. The cost to take a snapshot is small, as all the subsequent writes would be a copy on write.  However, there are a few issues with this approach:

  1. RocksDB snapshot is not persistent and will be lost on DB restart.
  2. Because this is a full snapshot, to restore the RocksDB state becomes expensive.
  3. A DB snapshot may not always be cheap. So it may not work for other databases.

 

  • No labels