Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Operational Requirement of the Input System

1)

...

The partitioning algorithm should be implemented in the form of hash(key) % partitionNum. User is free to choose hash function but the hashed result must be mod by the partitionNum.

New partition to old partition mapping algorithm

The expansion expansion can be supported after this proposal if and only if the partition expansion algorithm in the input system meets the following requirement:

- Let's say the partition of a given stream in the input system has been expanded from previousPartitionCount to currentPartitionCount. Then the messages that are currently mapped to the partition with index X must have been mapped to partition with the same index X if X < previousPartitionCount (suppose partition index starts from 0).
- There exists a function such that, given previouPartitionCount, currentPartitionCount and a partitionIndex, the function can deterministically determine the index of the partition X where all messages currently mapped to the partition partitionIndex after partition expansion would have been mapped to the partition X before the partition expansion(s).

For example, this requirement is satisfied by Kafka when the following two requirements are met:

- User uses default partitioning algorithm of Kafka. This means the partitioning algorithm is implemented in the form of hash(key) % partitionCount.
- Partition number of any Kafka topic is always multiplied by a power of the same factor (e.g. 2) when we expand the partitionNote that this requirement is satisfied by the default partitioning algorithms in Kafka. Samza needs to repartition the user’s stream if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.

2) Stream management 

The partition number of any stream that is used as input stream of stateful job should always be multiplied by power of two when we increase it.

...

Create classes SystemStreamPartitionAssignmentManager and SetSSPTaskMapping to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager and SetChangelogMapping are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignment should be written to the coordinator stream every time the job model is initialized.

2) Add interface InputStreamPartitionExpansionAlgorithm and class DefaultPartitionExpansionAlgorithm

This interface includes the method int getPartitionBeforeExpansion(int currentPartitionIdx, int currentPartitionCount, int initialPartitionCount). This method should meet two requirements:

- This method returns the index of the partition that would contain all messages in the partition currentPartitionIdx if the input stream has never been expanded. 
- If currentPartitionIdx < initialPartitionCount (suppose partition index starts from 0), then the corresponding partition index before partition should be currentPartitionIdx.

Users can provide custom implementation of this interface and specify it to be used by JobCoordinator to calculate JobModel. It allows Samza to support task expansion for any input system whose partition expansion algorithm can be expressed by this method.

We will also create class DefaultPartitionExpansionAlgorithm that implements this interface. Given current partition index currentPartitionIdx and the initial partition count initialPartitionCount, this class returns currentPartitionIdx % initialPartitionCount as the index of the corresponding partition before ANY partition expansion.

3) Add interface SystemStreamPartitionGrouperWithFixedTaskNum

The new interface should extend the existing interface SystemStreamPartitionGrouper. It should include a new method group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps). The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.

...

4) Create class GroupByPartitionWithFixedTaskNum and GroupBySystemStreamPartitionWithFixedTaskNum

We should create two new classes GroupByPartitionWithFixedTaskNum and GroupBySystemStreamPartitionWithFixedTaskNum which implements the interface SystemStreamPartitionGrouperWithFixedTaskNumGroupByPartitionWithFixedTaskNum (or GroupBySystemStreamPartitionWithFixedTaskNum) should group system-stream-partitions in the same way as GroupByPartition (or GroupBySystemStreamPartition) if previousSystemStreamPartitionMapping is empty (i.e. the job is run for the first time) or if partition of those streams has not changed since the job is created. Otherwise, GroupByPartitionWithFixedTaskNum should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.

...

- If previousSystemStreamPartitionMapping is empty, return GroupByPartition.group(ssps).get(ssp), where ssps represents to entire set of SystemStreamPartition to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping the total number of tasks that are consuming from partitions of the stream ssp.getStream(). Denote this as taskNumForStream.
- Determine the partition corresponding to the ssp before ANY partition expansion using this:
previousPartitionIdx = InputStreamPartitionExpansionAlgorithm.getPartitionBeforeExpansion(ssp.getPartition(), currentPartitionCountForStream, taskNumForStream)
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), ssp.getPartition() % taskNumForStream) previousPartitionIdx))

Similarly, GroupBySystemStreamPartitionWithFixedTaskNum will map a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:

- If previousSystemStreamPartitionMapping is empty, return GroupBySystemStreamPartition.group(ssps).get(ssp), where ssps represents to entire set of SystemStreamPartition to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping the total number of tasks that are consuming from partitions of the stream ssp.getStream(). Denote this as taskNumForStream.
- Determine the partition corresponding to the ssp before ANY partition expansion using this:
previousPartitionIdx = InputStreamPartitionExpansionAlgorithm.getPartitionBeforeExpansion(ssp.getPartition(), currentPartitionCountForStream, taskNumForStream)
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), ssp.getPartition() % taskNumForStream previousPartitionIdx))

Stateful Samza job which is using GroupByPartition (or GroupBySystemStreamPartition) as grouper class should be configured to use GroupByPartitionWithFixedTaskNum (or GroupBySystemStreamPartitionWithFixedTaskNum ) in order to allow partition expansion. Note that GroupByPartitionWithFixedTaskNum (or GroupBySystemStreamPartitionWithFixedTaskNum ) is backward compatible with GroupByPartition (or GroupBySystemStreamPartition) because they return the same partition-to-task assignment if partition doesn't expand. Thus user's job should not need to bootstrap key/value store from the changelog topic.

User should also provide custom implementation of InputStreamPartitionExpansionAlgorithm and specify it in the config if the DefaultPartitionExpansionAlgorithm does not match with the partition expansion algorithm used in the job's input system.

To help understand this algorithm, the idea is to split partitions into disjoint buckets (or groups) of partitions where the union of those buckets equals the original set of partitions. The partition-to-bucket assignment ensure that messages with the same key will be produced to the partitions of the same bucket. Then partitions in the same bucket will be assigned to the same task to ensure that messages with the same key will go to the same task. 

...

View file
namesamza.pdf
height250

...



5Handle partition expansion while tasks are running

JobCoordinator is already monitoring partition expansion of input streams as of current Samza implementation. When JobCoordinator detects partition expansion of any input stream, it should shutdown re-calculate JobModel, shutdown all containers using the off-the-shelf Yarn API, re-calculate JobModel and , wait for callback to confirm that these containers have been shutdown, and restart container using the new JobModel.

...

4) Add class GroupBySystemStreamPartitionWithFixedTaskNum which implements the interface SystemStreamPartitionGrouperWithFixedTaskNum

5) Add interface InputStreamPartitionExpansionAlgorithm with the following definition:

Code Block
languagejava
public interface InputStreamPartitionExpansionAlgorithm {
  int getPartitionBeforeExpansion(int currentPartitionIdx, int currentPartitionCount, int initialPartitionCount);
}


This method returns the index of the partition that should contain all messages in the partition currentPartitionIdx if the input stream has NEVER been expanded.

6) Add class DefaultPartitionExpansionAlgorithm which implements this InputStreamPartitionExpansionAlgorithm. Given current partition index currentPartitionIdx and the initial partition count initialPartitionCount, this class returns currentPartitionIdx % initialPartitionCount as the index of the corresponding partition before ANY partition expansion.

7) Add config job.systemstreampartition.expansion.algorithm. This config specifies the canonical name of a class that implements the interface InputStreamPartitionExpansionAlgorithm. The default value of this config will be the canonical name of the class DefaultPartitionExpansionAlgorithm.


Test Plan

To be added

Compatibility, Deprecation, and Migration Plan

...

Allow task number to increase is useful since it increases the performance of a given Samza job. However, this feature alone does not solve the problem of allowing partition expansion. For example, say we have a job that joins two streams both of which have 3 partitions. If partition number of one stream increases from 3 to 6, we would still want the task number to remain 3 to make sure that messages with the same key from both streams will be handled by the same task. This needs to be done with the new grouper classes proposed in this doc.

2. Add new config and class for user to specify the new-partition to old-partition mapping strategy based on the input system.

 

 The current proposal relies on the input system to meet the specified operational requirements. While these requirements can be satisfied by Kafka, they may or may not be satisfied by other systems such as Kinesis. We can support partition expansion for more input systems than Kafka if user is able to express the new-partition to old-partition mapping strategy. However, since we currently don't know how user is going to use such config/class to do it, we choose to keep the current SEP simple and only add new config/class when we have specific use-case for them.