Current state: [ ACCEPTED ]

Discussion thread: <link to mailing list DISCUSS thread>




Samza users running long running pipelines need a way to gracefully drain them in preparation for an upgrade. The upgrade could be due to changes to transforms or backward incompatible changes in data schema. The only option currently available to the user is to abruptly stop the pipeline and reset intermediate data which could lead to loss of buffered data. User logic is expected to constantly evolve and schema changes are often backward incompatible. It is therefore imperative to provide the ability to seamlessly stop and upgrade their pipelines without losing any buffered data. 


Samza currently offers various job management commands like submit, kill, status, through the  ApplicationRunner and StreamJob API. Kill is currently used to cancel a running pipeline. It immediately triggers shutdown in SamzaContainer which in turn halts processing of the pipeline in the RunLoop. Such abrupt cancellation can lead to unprocessed in-flight data. In-flight data refers to data that is pending processing in the intermediate topics (shuffled data) or outstanding state. Samza provides fault-tolerant processing of streams through its at-least once guarantee. It ensures that the job reprocesses messages from previous checkpoints when it resumes. This guarantees that any inflight-data that was left unprocessed upon termination is re-processed. 

Abruptly stopping a pipeline can, however, be a problem if there are backward incompatible changes in the intermediate data schema after restart. Any inflight data in intermediate streams will cause failure upon reprocessing as it cannot be deserialized due to the incompatible schema. The usual fix to get past the serde issue for shuffled data is to switch to a fresh intermediate stream. This is achieved by changing the opId for the PartitionBy operator in samza high-level API and the intermediate transform names in Beam API as the intermediate topic names are derived from them. Samza persists window and timer data in local Rocksdb state which could run into the serde issues as well. If the Samza pipeline is using timers and windows, any outstanding samza managed state would need to be reset along with its backing changelog stream.

These fixes only get the pipeline running again but the inflight intermediate stream data and any samza managed state that was reset is consequently lost. Aside from the data loss, this process of a pipeline upgrade is evidently cumbersome. Therefore, it is important to support the ability to drain and stop a pipeline smoothly prior to making an upgrade which involves an incompatible intermediate schema change. 

The work here is scoped to drain only intermediate stream (shuffle) data and system-managed state. The onus of clearing user state like LocalTable, if required, is on the user. System managed state comprises of window aggregation state and timers. The proposed approach will prematurely emit window panes and fire all timers , thereby, draining all state. Continuation of state post drain is beyond the scope of this work.

Proposed Changes

We propose a drain operation for Samza pipelines which halts the pipeline from consuming new data, completes processing of buffered data and then stops the pipeline.

Below is the high level approach with finer details covered in the sections below: 

  • Notifying Samza pipeline about Drain

We introduce DrainNotification which should be written to the coordinator stream store to notify the pipeline to trigger drain. This work will also provide utilities classes to aid with writing DrainNotification to the metadata store.

  • Handling Drain during message processing

Samza container will periodically check for drain notifications. Once it does encounter one, we first advance the watermark to infinity for all source streams which is effectively equivalent to converting an unbounded source to a bounded source. This guarantees that any buffered window data is processed and timers are fired. Watermark will be propagated to the downstream stages of the pipeline thereby ensuring that all local state is processed

Next, a custom drain control message is inserted as an in-memory marker in a source SSP pending queues to indicate that the pipeline is draining. All data from source SSPs is processed till the drain event is encountered. Samza container shuts down once all tasks have received a drain control message for all the SSPs they are consuming, including any intermediate stream SSPs. Tasks will perform a commit before shut down.

Notifying Samza Pipeline about Drain 

This section delves into the details of notifying samza pipelines to drain. 

We introduce classes- DrainNotifcation and DrainUtils. DrainNotifcation is used to signal to the samza engine to start drain. DrainUtils provides the ability to signal drain for a Samza job. It uses a namespace-aware coordinator-stream backed metadata store to read and write a drain notifcation on the coordinator stream. Kafka coordinator stream backed metadata-store acts as the control channel in Samza, which makes the drain notification available to all containers for the job. DeploymentId will be different for every deployment of the job and will be used by the containers to validate if the drain action was intended for the current deployment. The config,, is used to uniquely identify the current deployment in a container. DrainNotification will be cleaned up from the coordinator stream by the job coordinator on successful shut down.

 * DrainNotification is a custom message used by an external controller to trigger Drain.
 * */
public class DrainNotification {
   * Unique identifier of a drain notification.
  private final UUID uuid;
   * Unique identifier for a deployment so drain notifications messages can be invalidated across a job restarts.
  private final String runId;

   * Drain Mode.
   * */
  private final DrainMode drainMode;

  public DrainNotification(UUID uuid, String runId, DrainMode drainMode) {
    this.uuid = uuid;
    this.runId = runId;
    this.drainMode = drainMode;

   * Creates a DrainNotification in {@link DrainMode#DEFAULT} mode.
   * */
  public static DrainNotification create(UUID uuid, String runId) {
    return new DrainNotification(uuid, runId, DrainMode.DEFAULT);

  public UUID getUuid() {
    return this.uuid;

  public String getRunId() {
    return runId;

  public DrainMode getDrainMode() {
    return drainMode;

Handling Drain during event processing

Once the user triggers a drain by writing a DrainNotification, it is available on the coordinator stream for all containers to process.

Polling for Drain Notifications

We will add a class DrainMonitor in SamzaContainer that will periodically check the coordinator-stream backed metadata store for DrainNotification. SystemConsumers periodically polls the consumers for new messages, updates the chooser with the new messages, polls the chooser to pick the next message to process in the RunLoop. The following changes will be made:

  • If a DrainNotification message is encountered by Drain, it sets the SamzaContainer in drain mode
  • SystemConsumers will stop polling for new messages from source systems on every refresh. The refresh will still pick messages from intermediate stream systems. This marks the end of consumption of any new messages from sources. We still want to consume messages that are propagated downstream from source to intermediate steams.
  • SystemConsumers maintains an in-memory queue of unprocessed messages per SSP.
    • Upon drain, no new messages will be inserted in the source SSP queues as refresh of source consumers has stopped. There will be pending messages in the queues from previous refreshes.
    • It will write Watermark control messages to all active source SSP queues (registered SSPs - intermediate SSPs - end-of-stream SSPs) to advance the watermark to infinity (Long.MaxValue).
    • Next, it will also append a drain control message to all active source SSP queues (registered SSPs - intermediate SSPs - end-of-stream SSPs) to denote that the SSPs are draining

Drain control messages (DrainMesssage) are special markers appended to the per-SSP in-memory queue of unprocessed messages in SystemConsumers. Its purpose is to indicate that the SSP is draining and the chooser will not return any more messages for that SSP. It is akin to other control messages, namely WatermarkMessage and  EndOfStreamMessage. The current deployment id is also added to the Drain control message to differentiate the drain markers between re-deploys. Drain message will only be processed by the run loop if its deployment id matches the current deployment id to prevent accidental prop.

public class DrainMessage extends ControlMessage {
   * Id used to invalidate DrainMessages between runs. Ties to from config.
  private final String runId;

  public DrainMessage(String runId) {
    this(null, runId);

  public DrainMessage(@JsonProperty("task-name") String taskName, @JsonProperty("run-id") String runId) {
    this.runId = runId;

  public String getRunId() {
    return runId;

Processing Drain control messages

The run loop orchestrates reading and processing messages, checkpointing and windowing among tasks. RunLoop will perform drain in the following steps:

  • RunLoop picks a message from the chooser one at a time for processing
  • For every message returned from the MessageChooser, if it's a drain control message, the TaskInstance will track how many of its SSPs are drained
  • When all SSPs have drained,  TaskInstance performs a final commit before it invokes shutdown()
  • Once all task instances are shut down, the SamzaContainer will automatically shut down the runLoop and hence, the entire container.
  • When all containers shut down, the job coordinator will clean up the drain control message matching the current deployment from the coordinator stream.

Once drain is completed and the samza job is re-deployed with intermediate schema changes, it is imperative that samza doesn’t replay user messages from before the drain control message from the intermediate stream. TaskInstance will guarantee it by performing a final commit before task shutdown in a blocking way. TaskInstance’s commit method currently performs checkpoint writes on a commit executor which makes it async. This operation should be blocked when in drain mode.  

A task is ready to commit, when task.commit is requested either by user or commit thread and when process, window, commit and scheduler are not in progress. When task.async.commit is true and task.max.concurrency > 1, the task can commit when message processing is in progress. When in drain mode, commits shouldn’t happen asynchronously with message processing.

Drain in Samza High Level API

Samza high-level API provides a rich set of operators which can be chained to form a DAG of operations on the MessageStream. StreamOperatorTask is a StreamTask implementation which takes the operator graph and processes input messages by traversing the DAG. 

The task starts processing at the input operators of the operator DAG. We introduce a class DrainStates which handles all the book-keeping for drain messages. When the operator encounters a drain message, it updates the internal bookkeeping tracking all the SSPs for a system stream which received a drain message. If all input streams for the operator receive a drain control message, it propagates the message to the downstream operators in the operator DAG. 

In case of a shuffle stage, DrainMessage is always sent to a single partition of the downstream stage (intermediate stream) by all source partitions. This is done for aggregation and DrainStates keep a track of which source partitions have sent a DrainMessage to the aggregate partition. Once the downstream aggregate partition has received drain messages from all the parent partitions, drain message is broadcasted to all the peer partitions. This process is repeated for all downstream stages.

handleDrain will be added to the base OperatorImpl implementation which executes when all input streams to the operator have been received. In the case of a broadcast or partition operator, handleDrain will write Drain control messages to the intermediate stream for propagation.

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.

package org.apache.samza.operators.impl;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.samza.system.DrainMessage;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;

 * This class tracks the drain state for streams in a task. Internally it keeps track of Drain messages received
 * from upstream tasks for each system stream partition (ssp). If messages have been received from all tasks,
 * it will mark the ssp as drained. For a stream to be drained, all its partitions assigned to
 * the task need to be drained.
 * This class is thread-safe.
public class DrainStates {
  private static final class DrainState {
    // set of upstream tasks
    private final Set<String> tasks = new HashSet<>();
    private final int expectedTotal;
    private volatile boolean drained = false;

    DrainState(int expectedTotal) {
      this.expectedTotal = expectedTotal;

    synchronized void update(String taskName) {
      if (taskName != null) {
        // aggregate the drain messages
        drained = tasks.size() == expectedTotal;
      } else {
        // drain is coming from either source or aggregator task
        drained = true;

    boolean isDrained() {
      return drained;

    public String toString() {
      return "DrainState: [Tasks : "
          + tasks
          + ", isDrained : "
          + drained
          + "]";

  private final Map<SystemStreamPartition, DrainState> drainStates;

   * Constructing the drain states for a task.
   * @param ssps all the ssps assigned to this task
   * @param producerTaskCounts mapping from a stream to the number of upstream tasks that produce to it
  DrainStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) {
    this.drainStates =
          ssp -> ssp,
          ssp -> new DrainState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0))));

   * Update the state upon receiving a drain message.
   * @param drain message of {@link DrainMessage}
   * @param ssp system stream partition
  void update(DrainMessage drain, SystemStreamPartition ssp) {
    DrainState state = drainStates.get(ssp);

   * Checks if the system-stream is drained.
   * */
  boolean isDrained(SystemStream systemStream) {
    return drainStates.entrySet().stream()
        .filter(entry -> entry.getKey().getSystemStream().equals(systemStream))
        .allMatch(entry -> entry.getValue().isDrained());

   * Checks if all streams (input SSPs) for the task has drained.
   * */
  boolean areAllStreamsDrained() {
    return drainStates.values().stream().allMatch(DrainState::isDrained);

  public String toString() {
    return drainStates.toString();

* Abstract base class for all stream operator implementations.
* @param <M> type of the input to this operator
* @param <RM> type of the results of applying this operator
public abstract class OperatorImpl<M, RM> {
   * This method is implemented when all input streams to this operation have encountered drain control message.
   * Inherited class should handle drain by overriding this function.
   * By default noop implementation is for in-memory operator to handle the drain. Output operator need to
   * override this to actually propagate drain over the wire.
   * @param collector message collector
   * @param coordinator task coordinator
   * @return results to be emitted when this operator reaches drain
   protected  Collection<RM> handleDrain(MessageCollector collector, TaskCoordinator coordinator) {
     return Collections.emptyList();

Watermark & State Handling with Drain

Samza supports two notions of time. By default, all built-in Samza operators use processing time. Samza supports event-time operations through the Beam API. Whenever the user signals a drain, the watermark is advanced to infinity through watermark control messages. Beam’s samza runner keeps a local rocksdb state for timers and window state. Timers will be fired and window data is processed as a consequence of the watermark being advanced to infinity.

Advancing the watermark doesn't have any effect in Samza since windowing is processing-time based. Samza high-level API supports window operations on MessageStream. It keeps a track of window data in local rocksdb state and tracks the triggers in-memory. When the window operator receives drain, all the triggers will fire and results will be emitted from the window operation. This is implemented by overriding the handleDrain in WindowOperatorImpl.

Implementation and Test Plan

  1. Implement DrainNotification, DrainMode, DrainUtils
  2. Implement DrainMonitor and RunLoop processing changes
  3. High-Level API processing changes

Tests would include

  1. Testing DrainNotification, DrainUtils, serde of DrainNotification
  2. Testing DrainMonitor
  3. Testing RunLoop change high-level API changes
    1. Drain for task with single SSP
    2. Drain for task with multiple SSPs
    3. Drain waits for inflight messages
    4. Drain honors drain specific async task timeout
    5. Drain commit behavior
  4. Integration test for High-Level API
    1. Drain with shuffle stages
    2. Drain without shuffle stages
    3. Drain with shuffle stages and with drain notification published before container start
    4. Drain without shuffle stages and with drain notification published before container start
  5. Integration test for Low Level Level API
    1. Test drain
    2. Test drain with drain notification published before container start

Rejected Alternatives

Approach 1: Drain on by default every stop

One approach is to perform drain by default on every pipeline stop which would obviate the need to write DrainNotification using the DrainManager. Rest of the operations would be the same. When samza engine drains inflight data, it will also have to deal with all the buffered window state and timer data. Given the nature of streaming pipelines, there will always be incomplete windows at any point in time. This means that always draining the pipelines will lead to emitting incorrect windows and firing all timers on every stop. Hence, draining should not be performed on every stop but instead be reserved for exceptional cases where the user has to make a backward incompatible change to intermediate schemas. 


The plan is to release this feature with Samza 1.8  release

  • No labels