You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 78 Next »

ca

Status

Current state[Under Discussion]

Discussion thread: TBD

JIRA:  TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415. There are already exciting discussions around it, but for Kafka Streams, the delayed rebalance is not the complete solution. This KIP is trying to customize the cooperative rebalancing approach specifically for KStream application context,  based on the great design for KConnect.

Currently Kafka Streams uses consumer membership protocol to coordinate the stream task assignment. When we scale up the stream application, KStream group will attempt to revoke active tasks and let the newly spinned hosts take over them. New hosts need to restore assigned tasks' state before transiting to "running". For state heavy application, it is not ideal to give up the tasks immediately once the new player joins the party, instead we should buffer some time to let the new player accept a fair amount of restoring tasks, and finish state reconstruction first before officially taking over the active tasks. Ideally, we could realize no downtime transition during cluster scaling.

In short, the primary goals of this KIP are:

  • Reduce unnecessary downtime due to task restoration and global application revocation.
  • Better auto scaling experience for KStream applications.
  • Stretch goal: better workload balance across KStream instances.

Proposed Changes

Terminology

we shall define several new terms for easy walkthrough of the algorithm.

  • Worker (A.K.A stream worker):  thread level streaming processor, who actually takes the stream task.
  • Instance (A.K.A stream instance): the KStream instance serving as container of stream workers set. This could suggest a physical host or a k8s pod. We will interleave the definition of worker and instance for the most part of discussion concerning "working member" because the capacity is essentially controlled by the instance relative size, not the worker.
  • Learner task: a special standby task that gets assigned to one stream instance to restore a current active task state from another instance.

Learner Task Essential

Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do task transfer. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task.

Stop-The-World Effect

As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all workers would

  1. Join group with all current assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks.

  3. Replay the assigned tasks state

  4. Once all replay jobs finish, worker transits to running mode.

The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.

For Kafka Connect, we choose to keep all current assigned tasks running and trade off with one more rebalance. The behavior becomes:

  1. Join group with all current active tasks running.

  2. After first rebalance, sync the revoked partitions and stop them.

  3. Rejoin group immediately with only active tasks to trigger a second rebalance.

Feel free to take a look at KIP-415 example to get a sense of how the algorithm works.

For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new member, we shall also mark active tasks as "being learned" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks. 

Next we are going to look at several typical scaling scenarios and edge scenarios to better understand the design of this algorithm.

Normal Scenarios

Scale Up Running Application

The newly joined workers will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real ownership transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.

Scale-up
Cluster has 3 stream workers S1(leader), S2, S3, and they each own some tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])

#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempting to join the group.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])

#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members each owning one task.


Scale Up from Empty Group

Scaling up from scratch means all workers are new members. There is no need to implement a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.

After deprecating group.initial.rebalance.delay, we still expect the algorithm to work because every task assignment during rebalance will adhere to the rule "if given task is currently active, reassignment must happen only to workers who are declared ready to serve this task."

Scale-up from ground
Group empty state: unassigned tasks [T1, T2, T3, T4, T5]

#First Rebalance 
New member S1 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) // T1~5 not previously owned

#Second Rebalance 
New member S2, S3 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) 
S2(assigned: [], revoked: [], learning: [T3, T4])
S3(assigned: [], revoked: [], learning: [T5])

#Third Rebalance 
S2 and S3 are ready immediately after the assignment.
Member S1~S3 join with following status:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [], revoked: [], learning: [T3, T4])
	S3(assigned: [], revoked: [], learning: [T5])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])

Scale Down Running Application

As we have already discussed around the “learner” logic, when performing the scale down of stream group, it is also favorable to initiate learner tasks before actually shutting down the instances. Although standby tasks could help in this case, it requires user to pre-set num.standby.tasks which may not be available when administrator performs scaling down. The plan is to use command line tool to tell certain stream members that a shutdown is on the way to be executed. These informed members will send join group request to indicate that they are “leaving soon”. During rebalance assignment, leader will perform the learner assignment among members without intention of leaving. And the leaving member will shut down itself once received the instruction to revoke all its active tasks.

For ease of operation, a new tool for scaling down the stream app shall be built. It will have access to the application instances, and ideally could do two types of scaling down:

  1. Percentage scaling. Compute targeting scaled down members while end user just needs to provide a %. For example, if the current cluster size is 40 and we choose to scale down to 80%, then the script will attempt to inform 8 of 40 hosts to “prepare leaving” the group.
  2. Direct scaling. Name the stream instances that we want to shut down soon. Built for online hot swapping and host replacement.

Leader crash during scaling
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
Scaling down the application, S2 will be leaving.

#First Rebalance 
Member S2 joins the group and claim that it is leaving
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [T4])

#Second Rebalance 
S3 finishes replay first and trigger another rebalance
Member S1~S3 join with following status:(S1 is not ready yet)
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: []) 
	S3(assigned: [T5], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

#Third Rebalance 
S1 finishes replay and trigger rebalance.
Member S1~S3 join with following status: 
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [], revoked: [T3], learning: []) 
	S3(assigned: [T4, T5], revoked: [], learning: [])
S1 performs task assignments:
	S1(assigned: [T1, T2, T3], revoked: [], learning: [])
	S2(assigned: [], revoked: [T3], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])
S2 will shutdown itself upon new assignment since there is no assigned task left.

Online Host Swapping

This is a typical use case where user wants to replace entire application's host type. Normally administrator will choose to do host swap one by one, which could cause endless KStream resource shuffling. The recommended approach under cooperative rebalancing is like:

  • Increase the capacity of the current stream job to 2 and boost up new type instances
  • Mark existing stream instances as leaving
  • Learner tasks finished on new hosts, shutting down old ones.
Online Swapping
Group stable state: S1[T1, T2], S2[T3, T4]
Swapping application instances, adding S3, S4 with new instance type.

#First Rebalance 
Member S3, S4 join the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: [T2])
	S4(assigned: [], revoked: [], learning: [T4])

Use scaling tool to indicate S1 & S2 are leaving 
#Second Rebalance 
Member S1, S2 initiate rebalance to indicate state change (leaving)
Member S1~S4 join with following status: 
	S1(assigned: [T1], revoked: [T2], learning: [])
	S2(assigned: [T3], revoked: [T4], learning: []) 
	S3(assigned: [], revoked: [], learning: [T2])
	S4(assigned: [], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: [T1, T2])
	S4(assigned: [], revoked: [], learning: [T3, T4])

#Third Rebalance 
S3 and S4 finishes replay T1 ~ T4 trigger rebalance.
Member S1~S4 join with following status: 
	S1(assigned: [], revoked: [T1, T2], learning: [])
	S2(assigned: [], revoked: [T3, T4], learning: [])
	S3(assigned: [], revoked: [], learning: [T1, T2])
	S4(assigned: [], revoked: [], learning: [T3, T4])
S1 performs task assignments:
	S1(assigned: [], revoked: [], learning: [])
	S2(assigned: [], revoked: [], learning: [])
	S3(assigned: [T1, T2], revoked: [], learning: [])
	S4(assigned: [T3, T4], revoked: [], learning: [])
S1~S2 will shutdown themselves upon new assignment since there is no assigned task left.

Edge Scenarios

Leader Transfer During Scaling 

Leader crash could cause a missing of historical assignment information. For the learners already assigned, however, each worker maintains its own assignment status, so when the learner task's id has no corresponding active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. 

Scale-down stream applications
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.

Leader Transfer Before Scaling 

However, if the leader dies before new instances join, the potential risk is that leader could not differentiate which stream instance is "new", because it relies on the historical information. For version 1.0, final assignment is probably not ideal in this case if we only attempt to assign learner task to new comers. This also motivates us to figure out a better task coordination strategy for load balance in long term.

Leader crash before
Cluster has 3 stream workers S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]

#First Rebalance 
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader
S2 ~ S4 join with following status
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments:
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [T1], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [])

Now the group reaches balance, although the eventual load is skewed.

Optimizations

Stateful vs Stateless Tasks

For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task, so that when we eventually consider the load balance of the stream applications, this could help us separate out tasks into two buckets and rebalance independently.

Eager Rebalance 

Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency by introducing many more rebalances. We could supply user with a config to decide whether they want to take eager approach or stable approach eventually, with some follow-up benchmark tools of the rebalance efficiency. Example:

A stream worker S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the worker will call out rebalance immediately when T1 finishes replaying. While under conservative approach, worker will rejoin the group until it finishes replaying of both T1 and T2.

Standby Task Utilization

Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize workers which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly. 

Scale Down Timeout

Sometimes end user wants to reach a sweet spot between ongoing task transfer and streaming resource free-up. So we want to take a similar approach as KIP-415, where we shall introduce a client config to make sure the scale down is time-bounded. If the time takes to migrate tasks outperforms this config, the leaving member will shut down itself immediately instead of waiting for the final confirmation. And we could simply transfer learner tasks to active because they are now the best shot to own new tasks.

Task Tagging

Note that to make sure the above resource shuffling could happen as expected, we need to have the following task status indicators to be provided:

Tag NameTask TypeExplanation
isStatefulbothIndicate whether given task has a state to restore.
isLearnerstandbyIndicate whether standby task is a learner task.
beingLearnedactiveIndicate whether active task is being learned by some other stream worker.
isReadystandbyIndicate whether standby task is ready to serve as active task.
isLeavingactiveIndicate whether active task will be leaving the group soon.

Assignment Algorithm

The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". However, we also want to have a holistic view of the new learner assignment algorithm during each actual rebalance.

We shall assign tasks in the order of: active, learner and standby. The assignment will be broken down into following steps:

Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of Workers,

      Where each worker contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicate "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To resource available instances

	Keep existing learner tasks' assignment unchanged

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances with corresponding standby tasks

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource available instances
		Based on num.standby.task config, this could take multiple rounds

Output Finalized Task Assignment

Backing up Information on Leader 

Since the incremental rebalancing requires certain historical information of last round assignment, the leader worker will need to maintain the knowledge of:

  1. Who participated in the last round of rebalance. This is required information to track new comers.
  2. Who will be leaving the consumer group. This is for scaling down support as the replay could take longer time than the scaling down timeout. Under static membership, since we don't send leave group information, we could leverage leader to explicitly trigger rebalance when the scale-down timeout reaches. Maintaining set of leaving members are critical in making the right task shuffle judgement.


These are essential group state knowledges leader wants to memorize. To avoid the severity of leader crash during scaling, we are avoiding backing up too much information on leader for now. 


For the smooth delivery of all the features we have discussed so far, an iteration plan of reaching final algorithm is defined as below:

Version 1.0

Delivery goal: Scale up support, conservative rebalance

The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader worker will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to them onceThe reason we are hoping to only implement new instances is because there is a potential edge case that could break the existing naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. Also we care more about smooth transition over resource balance for stage one. We do have some historical discussion on marking weight for different types of tasks. If we go ahead to aim for task balance too early, we are potentially in the position of over-optimization. In conclusion, we want to delay the finalized design for eventual balance until last version.

We also don't want to take the eager rebalance optimization in version 1.0 due to the explained concerns.

Version 2.0

Delivery goal: Scale down support

We will focus on the delivery of scaling down support upon the success of version 1.0. We need to extend on the v1 protocol since we need existing instances to take the extra learning load. We shall break the statement in v1 which claims that "only new instances could take learner tasks". To make this happen, we need to deliver in following steps:

  1. Create new tooling for marking instances as ready to scale down
  2. Tag the leaving information for targeted members
  3. Scale down timeout implementation

Version 3.0

Delivery goal: Eager rebalance analysis

A detailed analysis and benchmark support need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely things we are not in favor of. 

The version 3.0 is upon version 1.0 success, and could be done concurrently with version 2.0. We may choose to adopt or discard this change, depending on the benchmark result.

Version 4.0 (Stretch)

Delivery goal: Task labeling, eventual workload balance

The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks, which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave.

We also want to balance the load separately for stateful tasks and stateless tasks as discussed above. So far version 4.0 still has many unknowns and is slightly beyond the incremental rebalancing scope. A separate KIP could be initiated.

Trade-offs

More Rebalances

The new algorithm will invoke many more rebalances than the current protocol as one could perceive. As we have discussed in the overall incremental rebalancing design, it is not always bad to have multiple rebalances when we do it wisely, and after KIP-345 we have a future proposal to avoid scale up rebalances for static members. The goal is to pre-register the members that are planning to be added. The broker coordinator will augment the member list and wait for all the new members to join the group before rebalancing, since by default stream application’s rebalance timeout is infinity. The conclusion is that: it is server’s responsibility to avoid excessive rebalance, and client’s responsibility to make each rebalance more efficient.

Metadata Space vs Rebalance Efficiency

Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we add on too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.

Public Interfaces

We are going to add a new type of protocol called "stream" for the protocol type. 

Protocol Type
ProtocolTypes : {"consumer", "connect", "stream"}


Also a bunch of new configs for user to better apply this change.

stream.rebalancing.mode

Default: incremental

Version 1.0

The setting to help ensure no downtime upgrade of online application.

Options : upgrading, incremental

scale.down.timeout.ms

Default: infinity

Version 2.0

Time in milliseconds to force terminate the stream worker when informed to be scaled down.

learner.partial.rebalance

Default : true

Version 3.0

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the replaying. Otherwise, new worker will batch the ready call to ask for single round of rebalance.

stream.imbalance.percentage

Default: 0.2

Version 4.0

The tolerance of task imbalance factor between hosts to trigger rebalance.

Implementation Plan

Call out this portion because the algorithm we are gonna design is fairly complicated so far. To make sure the delivery is smooth with fundamental changes of KStream internals, I build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the changes are highly coupled with internal changes. Without these details, the algorithm is not making sense.

Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer changes the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Switching Protocol Type

As we have mentioned above, a new protocol type shall be created. To ensure smooth transition, we need to make sure the existing job doesn't fail. The workflow for upgrading is like below:

  • Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream application to stay with protocol type "consumer".
  • Rolling restart the stream application and the change is automatically applied. This is safe because we are not changing protocol type.

In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far this workaround could make the effort smaller.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the google doc.

  • No labels