You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 20 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread

JIRA SAMZA-1983 - Getting issue details... STATUS

Released: TBD


Purpose

The purpose of this feature is to provide a common interface for external tools and services to rewind or fast-forward starting offsets on any input stream. In addition to providing the common interface, this feature will provide the capabilities to manually manipulate starting offsets by various position types and not only by specific offsets. Many of the current underlying system consumers support different position types for seeking to an offset on an input stream, such as seeks by timestamp, and are not generically exposed by the current framework.

Motivation

In the current Samza framework, manually setting the starting offsets for an input stream requires stopping the Samza processor and using a system-specific checkpoint tool to modify the checkpoint offsets directly in the checkpoint stream. Using the current tooling is tedious and error prone, as it requires proper security access and potentially editing many offsets in a configuration file. In some cases, it can cause a Samza processor to lose its checkpoints. In addition to the dangerous nature of modifying checkpoints directly, the checkpoint offsets are arbitrary strings with system-specific formats. This requires a different checkpoint tool for each system type (i.e. Kafka, Eventhub, Kinesis, etc...).

Requirements

Goals

Provide a simple and generic interface to manipulate starting offsets per input stream partition instead of relying on system-specific checkpoint tools and services. This allows flexibility to build REST API layers and tools on top of the common interface.

Allow manipulating starting offsets on an input stream not only by specific offsets, but with different position types, such as by timestamp.

Maintain backwards compatibility.

Provide safety by setting starting offsets out-of-band and not directly in the checkpoints.

Simplicity. Easy for developers and users to use.

Non-goals

Full snapshot of a Samza job's state at a particular point in time.

Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.

Providing a service that externally exposes a Startpoint API. Such services require other core changes and will be explored in another SEP or design document.

Proposed Implementation

Different systems in Samza have different formats for checkpoint offsets and lacks any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution takes the approach of defining the concept of Startpoints and utilizing a storage layer separate from manipulating the checkpoint offsets directly in the checkpoint stream.

The Startpoint indicates what offset position a particular SystemStreamPartition should start consuming from. The Startpoint takes higher precedence than Checkpoints and defines the type and the position value of the position type. For example, if the Startpoint position type is TIMESTAMP, then the position value is an epoch value. The Startpoint enforces a stricter contract for external tools and services to follow as opposed to the string offset value in the Checkpoint.

Each SystemConsumer will also implement a new register method to handle a Startpoint.

A requested Startpoint will be stored in a metadata store. This will be decoupled from the actual checkpoints offsets in the checkpoint stream.

General Workflow

Entities

  • SamzaProcessor - For the sake of brevity, this represents SamzaContainer, TaskInstance, CheckpointManager and everything else in the framework that is not listed here.
  • JobCoordinator
  • SystemConsumer (implements interface StartpointVisitor)
  • StartpointManager
  • StorageLayer - See Storing Requested Startpoint
  • ExternalSource - i.e. Checkpoint tool, REST backend, etc..

Objects

  • Startpoint
  • Checkpoint
  • SystemStreamPartition(SSP)

Step 1: StartpointManager#writeStartpoint(SSP, Startpoint) stores Startpoint in the storage layerStartpointManager#writeStartpoint(SSP, TaskName, Startpoint) can also be used for special cases to target only a specific task. 

Step 2: ExternalSource triggers SamzaProcessor to restart

Step 3: Upon startup, the JobCoordinator calls StartpointManager#fanOutStartpointsToTasks(SystemStreamPartitionGrouper) to remap Startpoints keyed only by SSP to be re-keyed by SSP+TaskName. Startpoints already keyed by SSP+TaskName are not overwritten. This remapping is required because once a Startpoint is consumed, it will need to be deleted at the next Checkpoint commit (See Note below). Checkpoint commits happen at a per task level.

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpointForTask(SSP, TaskName)
As each of the SamzaProcessor task instances iterate through their SSPs:
    a) if there is a Startpoint for the requested SSP, call the StartpointVisitor#visit(SystemStreamPartition, Startpoint) interface implementated by a SystemConsumer
    b) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String). Eventually, both interface methods will be unified since the interface implementations should be agnostic of where the starting offsets come from and should only be concerned with setting the starting offset, whether it comes from a Checkpoint or a Startpoint. This will be explored during implementation.

Note: Startpoints are removed from StorageLayer at the first checkpoint offset commit. This ensures precedence is with the Startpoint until the next Checkpoint is committed, specifically in situations where the SamzaProcessor restarts before the commit. 
If SamzaProcessor have checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Startpoint Models

Startpoint
/**
 * Startpoint represents a position in a stream partition
 */
public abstract class Startpoint {
  private final long creationTimestamp;

  /**
   * Apply the visitor {@link StartpointVisitor}'s register methods to the instance of this {@link Startpoint}
   * class.
   * @param systemStreamPartition The {@link SystemStreamPartition} needed to register with the {@link StartpointVisitor}
   * @param startpointVisitor The visitor to register with.
   */
  public abstract void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor);
  
// Full implementation not shown for brevity
}


/**
 * A {@link Startpoint} that represents a specific offset in a stream partition.
 */
public final class StartpointSpecific extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents a timestamp offset in a stream partition.
 */
public final class StartpointTimestamp extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents the earliest offset in a stream partition.
 */
public final class StartpointOldest extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents the latest offset in a stream partition.
 */
public final class StartpointUpcoming extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents a custom startpoint. This is for systems that have a non-generic option
 * for setting offsets. Startpoints are serialized to JSON in the {@link org.apache.samza.metadatastore.MetadataStore}
 * and it is recommended to maintain the subclass of this {@link StartpointCustom} as a simple POJO.
 */
public abstract class StartpointCustom extends Startpoint { ... }
 ...

StartpointVisitor


SystemConsumer
/**
 * Visitor interface for system consumers to implement to support {@link Startpoint}s.
 */
public interface StartpointVisitor {

  /**
   * Seek to specific offset represented by {@link StartpointSpecific}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointSpecific The {@link Startpoint} that represents the specific offset.
   */
  void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific);

  /**
   * Seek to timestamp offset represented by {@link StartpointTimestamp}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointTimestamp The {@link Startpoint} that represents the timestamp offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
    throw new UnsupportedOperationException("StartpointTimestamp is not supported.");
  }

  /**
   * Seek to earliest offset represented by {@link StartpointOldest}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointOldest The {@link Startpoint} that represents the earliest offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
    throw new UnsupportedOperationException("StartpointOldest is not supported.");
  }

  /**
   * Seek to latest offset represented by {@link StartpointUpcoming}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointUpcoming The {@link Startpoint} that represents the latest offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
    throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
  }

  /**
   * Bootstrap signal represented by {@link StartpointCustom}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointCustom The {@link Startpoint} that represents the bootstrap signal.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
    throw new UnsupportedOperationException(String.format("%s is not supported.", startpointCustom.getClass().getSimpleName()));
  }
}
 
// Below is example pseudocode for system specific implementations on how to handle timestamp position types. Other position types,
// (StartpointSpecific, StartpointOldest, StartpointUpcoming) left out for brevity.
 
public KafkaSystemConsumer implements SystemConsumer, StartpointVisitor {
  @Override
  public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // Call underlying Kafka Consumer#offsetsForTimes().
      // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}
 
public EventHubSystemConsumer implements SystemConsumer, StartpointVisitor {
  @Override
  public void visit(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→Startpoint or SSP+TaskName→Startpoint

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks. However, when a job starts, all Startpoints keyed only by SSP will be remapped to SSP+TaskName. See General Workflow above for details.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

StartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...}


  /**
   * Writes a {@link Startpoint} that defines the start position for a specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...}

  /**
   * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public Startpoint readStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Returns the last {@link Startpoint} for a specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public Startpoint readStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * Deletes the {@link Startpoint} for the specified {@link SystemStreamPartition}
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   */
  public void deleteStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Deletes the {@link Startpoint} for the specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   */
  public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this method re-maps the Startpoints from
   * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all tasks provided by the {@link JobModel}
   * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
   * method to assign the Startpoints to the appropriate tasks.
   * @param jobModel The {@link JobModel} is used to determine which {@link TaskName} each {@link SystemStreamPartition} maps to.
   * @return The list of {@link SystemStreamPartition}s that were fanned out to SystemStreamPartition+TaskName.
   */
  public Set<SystemStreamPartition> fanOutStartpointsToTasks(SystemStreamPartition ssp, SystemStreamPartitionGrouper grouper) {...}

// Full implementation not shown for brevity
}

Changes to OffsetManager

OffsetManager
// OffsetManager is written in Scala, but is translated to Java here for consistency in this design document.
public class OffsetManager {
  private final CheckpointManager checkpointManager;
  private final StartpointManager startpointManager;

  // Will add similar doc as StartpointManager#readStartpoint
  public Startpoint getStartpoint(StartpointKey startpointKey) {
    return startpointManager.readStartpoint(startpointKey);
  }

  // Will add similar doc as StartpointManager#writeStartpoint
  public Startpoint setStartpoint(StartpointKey startpointKey, Startpoint startpoint) {
    startpointManager.writeStartpoint(startpointKey, startpoint);
  }
  
  // Renamed from getStartingOffset to self document this method where the offset is coming from.
  public String getStartingOffsetFromCheckpoint(TaskName taskName, SystemStreamPartition systemStreamPartition) { 
    ... 
  }

  // Alias old name for backwards-compatibility.
  @Deprecated // set as legacy and remove in next major version.
  public String getStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
    return getStartingOffsetFromCheckpoint(taskName, systemStreamPartition);
  }

  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
    ...
    checkpointManager.writeCheckpoint(taskName, checkpoint);
    ...
    // for SSPs in this task
       startpointManager.deleteStartpoint(ssp, taskName);
    // 
    ...
  }
// Excluding members and methods not pertinent to this design document for brevity.
}


Compatibility, Deprecation, and Migration Plan

All changes are backwards compatible. This is an add on feature and no changes to the existing checkpoint operations are required.

Rejected Alternatives

Previous explored solutions involved modifying the checkpoint offsets directly. Operationally, the proposed solution provides more safety because checkpoints are used for fault-tolerance and should not allow user intervention to prevent human error. Having the ability to set starting offsets out-of-band provides the safety layer. 


  • No labels