You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: WIP - UNDER DISCUSSION

Discussion thread

JIRA SAMZA-1983 - Getting issue details... STATUS

Released: 

Purpose

The purpose of this feature is to provide a common interface for external tools and services to rewind or fast-forward starting offsets on any input stream. In addition to providing the common interface, this feature will provide the capabilities to manually manipulate starting offsets by various position types and not only by specific offsets. Many of the current underlying system consumers support different position types for seeking to an offset on an input stream, such as seeks by timestamp, and are not generically exposed by the current framework.

Motivation

In the current Samza framework, manually setting the starting offsets for an input stream requires stopping the Samza processor and using a checkpoint tool to modify the checkpoint offsets directly in the checkpoint stream. Using the current tooling is tedious and error prone, as it requires proper security access and potentially editing many offsets in a config file. In some cases, it can cause a Samza processor to lose its checkpoints. In addition to the dangerous nature of modifying checkpoints directly, the checkpoint offsets are arbitrary strings with system-specific formats. This requires a different checkpoint tool for each system type (i.e. Kafka, Eventhub, Kinesis, etc...).

Requirements

Goals

Provide a simple and generic interface to manipulate starting offsets per input stream partition instead of relying on system-specific checkpoint tools and services. This allows flexibility to build REST API layers and tools on top of the common interface.

Allow manipulating starting offsets on an input stream not only by specific offsets, but with different position types, such as by timestamp.

Maintain backwards compatibility.

Provide safety by setting starting offsets out-of-band and not directly in the checkpoints.

Simplicity. Easy for developers and users to use.

Non-goals

Full snapshot of a Samza job's state at a particular point in time.

Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.

Providing a service that externally exposes a Startpoint API. Such services require other core changes and will be explored in another SEP or design document.

Proposed Implementation

Different systems in Samza have different formats for checkpoint offsets and lacks any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution takes the approach of defining the concept of Startpoints and utilizing a storage layer separate from manipulating the checkpoint offsets directly in the checkpoint stream.

The Startpoint indicates what offset position a particular SystemStreamPartition should start consuming from. The Startpoint takes higher precedence than Checkpoints and defines the type and the position value of the position type. For example, if the Startpoint position type is TIMESTAMP, then the position value is an epoch value. The Startpoint enforces a stricter contract for external tools and services to follow as opposed to the string offset value in the Checkpoint.

Each SystemConsumer will also implement a new register method to handle a Startpoint.

A requested Startpoint will be stored in a metadata store. This will be decoupled from the actual checkpoints offsets in the checkpoint stream.

Public Interfaces

Startpoint Model

Startpoint
Startpoint
/**
 * Startpoint
 */
public class StartpointEntry {
  private final String taskName;
  private final Startpoint startpoint;

// Full implementation not shown for brevity
}

/**
 * Startpoint represents a position in a stream by {@link PositionType}
 */
public class Startpoint {
  private final PositionType PositionType;
  private final String position;

// Full implementation not shown for brevity
}

public enum PositionType {
  SPECIFIC_OFFSET,
  TIMESTAMP,
  EARLIEST,
  LATEST,
  BOOTSTRAP
}

SystemConsumer Registering Startpoint

The provided SystemConsumer implementations by the Samza framework will implement the new register interface below. This is analogous to the existing overloaded #register(SystemStreamPartition, String) method used to recover from checkpoint offsets. The plan is to eventually deprecate #register(SystemStreamPartition, String) and unify the values in the Startpoint metadata store and Checkpoint stream to use the same Startpoint format.

SystemConsumer
public interface SystemConsumer {
  /**
   * Register a SystemStreamPartition to this SystemConsumer by an externally defined
   * {@link Startpoint}, only if one exists for this SystemStreamPartition. If none exists,
   * {@link #register(SystemStreamPartition, String)} will be called instead.
   * If a SystemConsumer does not support a particular {@link PositionType}, an exception
   * should be thrown. Otherwise, the SystemConsumer is expected to seek to the offset position.
   *
   * @param systemStreamPartition
   *          The SystemStreamPartition object representing the Samza
   *          SystemStreamPartition to receive messages from.
   * @param startpoint
   *          {@link Startpoint) representing position in the stream to start
   *          reading events from.
   */
  default void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() != PositionType.SPECIFIC_OFFSET) {
      throw new UnsupportedException("Not implemented");
    }
    register(systemStreamPartition, startpoint.getPosition());
  }
 
  @Deprecated // Unify to eventually use the register method above only.
  void register(SystemStreamPartition systemStreamPartition, String offset) {
    ...
  }
 
// Excluding existing interface methods for brevity
}
 
// Below is pseudocode for system specific implementations on how to handle TIMESTAMP position types
 
public KafkaSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // Call underlying Kafka Consumer#offsetsForTimes().
      // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}
 
public EventHubSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→StartpointEntry(Startpoint, TaskName)

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

StartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Construct the StartpointManager with the {@link MetadataStore} to use. 
   */
  public StartpointManager(MetadataStore metadataStore) { ... }
  
  /**
   * Writes a {@link Startpoint} that defines the start position for stream partitions.
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpointEntry Reference to a StartpointEntry object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, StartpointEntry startpointEntry) { ... }

  /**
   * Returns the last {@link Startpoint} for a specified {@link SystemStreamPartition}.
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param removeUponRead If true, removes the Startpoint once read.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public StartpointEntry readStartpoint(SystemStreamPartition ssp, boolean removeUponRead) { ... }

  /**
   * Returns all {@link Startpoint} in the {@link MetadataStore}.
   * @return map of existing {@link SystemStreamPartition} to {@link Startpoint}.
   */
  public Map<SystemStreamPartition, StartpointEntry> readAllStartpoints() { ... }

// Full implementation not shown for brevity
}

Changes to OffsetManager

OffsetManager
// OffsetManager is written in Scala, but is translated to Java here for consistency in this design document.
public class OffsetManager {
  private final CheckpointManager checkpointManager;
  private final StartpointManager startpointManager;

  // Will add similar doc as StartpointManager#readStartpoint
  public Startpoint getStartpoint(StartpointKey startpointKey) {
    return startpointManager.readStartpoint(startpointKey);
  }

  // Will add similar doc as StartpointManager#writeStartpoint
  public Startpoint setStartpoint(StartpointKey startpointKey, Startpoint startpoint) {
    startpointManager.writeStartpoint(startpointKey, startpoint);
  }
  
  // Renamed from getStartingOffset to self document this method where the offset is coming from.
  public String getStartingOffsetFromCheckpoint(TaskName taskName, SystemStreamPartition systemStreamPartition) { 
    ... 
  }

  // Alias old name for backwards-compatibility.
  @Deprecated // set as legacy and remove in next major version.
  public String getStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
    return getStartingOffsetFromCheckpoint(taskName, systemStreamPartition);
  }

// Excluding members and methods not pertinent to this design document for brevity.
}

General Workflow

Entities

  • SamzaProcessor - For the sake of brevity, this represents SamzaContainer, TaskInstance, CheckpointManager and everything else in the framework that is not listed here.
  • SystemConsumer
  • StartpointManager
  • storage layer
  • ExternalSource - i.e. Checkpoint tool, REST backend, etc..

Objects

  • Startpoint
  • Checkpoint
  • SystemStreamPartition(SSP)
  • StartpointKey

Step 1: StartpointManager#writeStartpoint(StartpointKey, Startpoint) stores Startpoint in the storage layer. 

Step 2: ExternalSource triggers SamzaProcessor to restart

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpoint(StartpointKey, removeUponRead)
Startpoints are removed from storage layer at the first checkpoint offset commit. If SamzaProcessor has checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Step 4: SamzaProcessor task instances iterate through their SSPs and
   i) retrieves Startpoint from StartpointManager#readStartpoint(StartpointKey) call SystemConsumer#register(SystemStreamPartition, Startpoint). If a Startpoint exists for the key with TaskName+SSP, that takes higher precedence than key with only SSP.
  ii) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String)

Compatibility, Deprecation, and Migration Plan

All changes are backwards compatible. This is an add on feature and no changes to the existing checkpoint operations are required.

Future plan is to use the newer SystemConsumer#register(SystemStreamPartition, Startpoint) interface and deprecate the existing SystemConsumer#register(SystemStreamPartition, String). This is will allow a single point of entry into setting the starting offset on the SystemConsumer, whether the starting offset comes from a checkpoint or from a startpoint. 

Rejected Alternatives

Previous explored solutions involved modifying the checkpoint offsets directly. Operationally, the proposed solution provides more safety because checkpoints are used for fault-tolerance and should not allow user intervention to prevent human error. Having the ability to set starting offsets out-of-band provides the safety layer. 


  • No labels