This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • SEP-5: Enable partition expansion of input streams

Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


We will also create class DefaultPartitionExpansionAlgorithm that implements this interface. Given current partition index currentPartitionIdx and the initial partition count initialPartitionCount, this class returns currentPartitionIdx % initialPartitionCount as the index of the corresponding partition before ANY partition expansion.

3) Add


interface SystemStreamPartitionGrouperFixedTasks

The new interface should extend the existing interface SystemStreamPartitionGrouper. It should include a new method group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps). The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.

4) Create


class GroupByPartitionFixedTasks and 



We should create two new classes GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks and GroupBySystemStreamPartitionWithFixedTaskNumGroupBySystemStreamPartitionFixedTasks which implements the interface SystemStreamPartitionGrouperWithFixedTaskNumSystemStreamPartitionGrouperFixedTasksGroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks (or GroupBySystemStreamPartitionWithFixedTaskNum GroupBySystemStreamPartitionFixedTasks) should group system-stream-partitions in the same way as GroupByPartition (or GroupBySystemStreamPartition) if previousSystemStreamPartitionMapping is empty (i.e. the job is run for the first time) or if partition of those streams has not changed since the job is created. Otherwise, GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.

More specifically, GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks will map a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:

- If previousSystemStreamPartitionMapping is empty, return, where ssps represents to entire set of SystemStreamPartition to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping the total number of tasks that are consuming from partitions of the stream ssp.getStream(). Denote this as taskNumForStream.
- Determine the partition corresponding to the ssp before ANY partition expansion using this:
previousPartitionIdx = InputStreamPartitionExpansionAlgorithm.getPartitionBeforeExpansion(ssp.getPartition(), currentPartitionCountForStream, taskNumForStream)
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), previousPartitionIdx))

Similarly, GroupBySystemStreamPartitionWithFixedTaskNumGroupBySystemStreamPartitionFixedTasks will map a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:


Stateful Samza job which is using GroupByPartition (or GroupBySystemStreamPartition) as grouper class should be configured to use GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks (or GroupBySystemStreamPartitionWithFixedTaskNumGroupBySystemStreamPartitionFixedTasks ) in order to allow partition expansion. Note that GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks (or GroupBySystemStreamPartitionWithFixedTaskNumGroupBySystemStreamPartitionFixedTasks) is backward compatible with GroupByPartition (or GroupBySystemStreamPartition) because they return the same partition-to-task assignment if partition doesn't expand. Thus user's job should not need to bootstrap key/value store from the changelog topic.


For example, suppose partition is increased from 2 to 4 and we use GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks as grouper, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task. The figure below shows the relation between partitions, buckets and tasks after we increase partition from 2 to 4.


Public Interfaces

1) Add interface SystemStreamPartitionGrouperWithFixedTaskNum SystemStreamPartitionGrouperFixedTasks with the following definition:

Code Block
public interface SystemStreamPartitionGrouperWithFixedTaskNumSystemStreamPartitionGrouperFixedTasks extends SystemStreamPartitionGrouper {
  Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps);

3) Add class GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks which implements the interface SystemStreamPartitionGrouperWithFixedTaskNumSystemStreamPartitionGrouperFixedTasks

4) Add class GroupBySystemStreamPartitionWithFixedTaskNumGroupBySystemStreamPartitionFixedTasks which implements the interface SystemStreamPartitionGrouperWithFixedTaskNumSystemStreamPartitionGrouperFixedTasks

5) Add interface InputStreamPartitionExpansionAlgorithm with the following definition:


- The change made in this proposal is both source backward-compatible and binary backward compatible. Their code can compile and run correctly without change.
- For users who want to enable partition expansion for its input streams, they can do the following:
  - Set grouper class to GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks if the job is using GroupByPartition as grouper class
  - Set grouper class to GroupBySystemStreamPartitionWithFixedTaskNum GroupBySystemStreamPartitionFixedTasks if the job is using GroupBySystemStreamPartition as grouper class 
  - Change their custom grouper class implementation to extend the new interface if the job is using a custom grouper class implementation.
  - Set job.coordinator.monitor-partition-change to true in the job configuration
  - Run ConfigManager