Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.

Scope

...

Providing a service that externally exposes a Startpoint API. Such services require other core changes and will be explored in another SEP or design document.

Proposed Implementation

Different systems in Samza have different formats for checkpoint offsets and lacks any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution takes the approach of defining the concept of Startpoints and utilizing a storage layer separate from manipulating the checkpoint offsets directly in the checkpoint stream.

...

A requested Startpoint will be stored in a metadata store. This will be decoupled from the actual checkpoints offsets in the checkpoint stream.

Public Interfaces

...

Startpoint Model


Code Block
languagejava
titleStartpoint
Startpoint
/**
 * Startpoint
 */
public class StartpointEntry {
  private final String taskName;
  private final Startpoint startpoint;

// Full implementation not shown for brevity
}

/**
 * Startpoint represents a position in a stream by {@link PositionType}
 */
public class Startpoint {
  private final PositionType PositionType;
  private final String position;

// Full implementation not shown for brevity
}

public enum PositionType {
  SPECIFIC_OFFSET,
  TIMESTAMP,
  EARLIEST,
  LATEST,
  BOOTSTRAP
}

SystemConsumer Registering Startpoint

The provided SystemConsumer implementations by the Samza framework will implement the new register interface below. This is analogous to the existing overloaded #register(SystemStreamPartition, String) method used to recover from checkpoint offsets. The plan is to eventually deprecate #register(SystemStreamPartition, String) and unify the values in the Startpoint metadata store and Checkpoint stream to use the same Startpoint format.


Code Block
languagejava
titleSystemConsumer
public interface SystemConsumer {
  /**
   * Register a SystemStreamPartition to this SystemConsumer by an externally defined
   * {@link Startpoint}, only if one exists for this SystemStreamPartition. If none exists,
   * {@link #register(SystemStreamPartition, String)} will be called instead.
   * If a SystemConsumer does not support a particular {@link PositionType}, an exception
   * should be thrown. Otherwise, the SystemConsumer is expected to seek to the offset position.
   *
   * @param systemStreamPartition
   *          The SystemStreamPartition object representing the Samza
   *          SystemStreamPartition to receive messages from.
   * @param startpoint
   *          {@link Startpoint) representing position in the stream to start
   *          reading events from.
   */
  default void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() != PositionType.SPECIFIC_OFFSET) {
      throw new UnsupportedException("Not implemented");
    }
    register(systemStreamPartition, startpoint.getPosition());
  }
 
  @Deprecated // Unify to eventually use the register method above only.
  void register(SystemStreamPartition systemStreamPartition, String offset) {
    ...
  }
 
// Excluding existing interface methods for brevity
}
 
// Below is pseudocode for system specific implementations on how to handle TIMESTAMP position types
 
public KafkaSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // Call underlying Kafka Consumer#offsetsForTimes().
      // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}
 
public EventHubSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→StartpointEntry(Startpoint, TaskName)

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

Code Block
languagejava
titleStartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Construct the StartpointManager with the {@link MetadataStore} to use. 
   */
  public StartpointManager(MetadataStore metadataStore) { ... }
  
  /**
   * Writes a {@link Startpoint} that defines the start position for stream partitions.
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpointEntry Reference to a StartpointEntry object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, StartpointEntry startpointEntry) { ... }

  /**
   * Returns the last {@link Startpoint} for a specified {@link SystemStreamPartition}.
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param removeUponRead If true, removes the Startpoint once read.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public StartpointEntry readStartpoint(SystemStreamPartition ssp, boolean removeUponRead) { ... }

  /**
   * Returns all {@link Startpoint} in the {@link MetadataStore}.
   * @return map of existing {@link SystemStreamPartition} to {@link Startpoint}.
   */
  public Map<SystemStreamPartition, StartpointEntry> readAllStartpoints() { ... }

// Full implementation not shown for brevity
}

Changes to OffsetManager

Code Block
languagejava
titleOffsetManager
// OffsetManager is written in Scala, but is translated to Java here for consistency in this design document.
public class OffsetManager {
  private final CheckpointManager checkpointManager;
  private final StartpointManager startpointManager;

  // Will add similar doc as StartpointManager#readStartpoint
  public Startpoint getStartpoint(StartpointKey startpointKey) {
    return startpointManager.readStartpoint(startpointKey);
  }

  // Will add similar doc as StartpointManager#writeStartpoint
  public Startpoint setStartpoint(StartpointKey startpointKey, Startpoint startpoint) {
    startpointManager.writeStartpoint(startpointKey, startpoint);
  }
  
  // Renamed from getStartingOffset to self document this method where the offset is coming from.
  public String getStartingOffsetFromCheckpoint(TaskName taskName, SystemStreamPartition systemStreamPartition) { 
    ... 
  }

  // Alias old name for backwards-compatibility.
  @Deprecated // set as legacy and remove in next major version.
  public String getStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
    return getStartingOffsetFromCheckpoint(taskName, systemStreamPartition);
  }

// Excluding members and methods not pertinent to this design document for brevity.
}

General Workflow

Entities

  • SamzaProcessor - For the sake of brevity, this represents SamzaContainer, TaskInstance, CheckpointManager and everything else in the framework that is not listed here.
  • SystemConsumer
  • StartpointManager
  • storage layer
  • ExternalSource - i.e. Checkpoint tool, REST backend, etc..

Objects

  • Startpoint
  • Checkpoint
  • SystemStreamPartition(SSP)
  • StartpointKey

Step 1: StartpointManager#writeStartpoint(StartpointKey, Startpoint) stores Startpoint in the storage layer. 

Step 2: ExternalSource triggers SamzaProcessor to restart

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpoint(StartpointKey, removeUponRead)
Startpoints are removed from storage layer at the first checkpoint offset commit. If SamzaProcessor has checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Step 4: SamzaProcessor task instances iterate through their SSPs and
   i) retrieves Startpoint from StartpointManager#readStartpoint(StartpointKey) call SystemConsumer#register(SystemStreamPartition, Startpoint). If a Startpoint exists for the key with TaskName+SSP, that takes higher precedence than key with only SSP.
  ii) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String)

Compatibility, Deprecation, and Migration Plan

All changes are backwards compatible. This is an add on feature and no changes to the existing checkpoint operations are required.

Future plan is to use the newer SystemConsumer#register(SystemStreamPartition, Startpoint) interface and deprecate the existing SystemConsumer#register(SystemStreamPartition, String). This is will allow a single point of entry into setting the starting offset on the SystemConsumer, whether the starting offset comes from a checkpoint or from a startpoint. 

Rejected Alternatives

Previous explored solutions involved modifying the checkpoint offsets directly. Operationally, the proposed solution provides more safety because checkpoints are used for fault-tolerance and should not allow user intervention to prevent human error. Having the ability to set starting offsets out-of-band provides the safety layer.