You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 116 Next »

Status

Current state[Under Discussion]

Discussion thread: TBD

JIRA:  

Key Summary T Created Updated Due Assignee Reporter P Status Resolution
Loading...
Refresh


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415.

This KIP is trying to customize the incremental rebalancing approach for Kafka consumer client, which will be beneficial for heavy-stateful consumers such as Kafka Streams applications.

In short, the goals of this KIP are:

  • Reduce unnecessary downtime due to unnecessary partition migration: i.e. partitions being revoked and re-assigned.
  • Better rebalance behavior for falling out members.


Background

Consumer Rebalance Protocol: Stop-The-World Effect

As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would

  1. Join group with all currently assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks and resume working.

  3. Replay the assigned tasks state.

  4. Once all replay jobs finish, stream thread transits to running mode.

The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.

Example: Streams Assignor Rebalance Metadata

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its pluggable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.


StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   UserData                => Bytes

------------------


StreamsPartitionAssignor:

UserData (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


And on assignment: 

KafkaConsumer:

Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<String, List<Int32>>
   UserData                => Bytes

------------------

StreamsPartitionAssignor:

UserData (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32


Example: Consumer Sticky Assignor 

We also have a StickyAssignor provided out of the box trying to mitigate the cost of unnecessary partition migrations. This assignor only relies on subscription metadata but not modifying assignment metadata, as follows:

KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   UserData                => Bytes

------------------


StickyAssignor:

UserData (encoded in version 1) => AssignedPartitions
   AssignedPartitions      => List<String, List<Int32>>


The goal of this incremental protocol, is to fully leverage on the sticky assignors which will try to reassign partitions to its previous owners in best effort, such that we will revoke less partitions as possible since the revocation process is costly.


Proposed Changes: Incremental Consumer Rebalance Protocol

We will augment the consumer's rebalance protocol as proposed in Incremental Cooperative Rebalancing: Support and Policies with some tweaks compared to KIP-415. The key idea is that, instead of relying on the single rebalance's synchronization barrier to rebalance the group and hence enforce everyone to give up all the assigned partitions before joining the group as the new generation, we use consecutive rebalances where the end of the first rebalance will actually be used as the synchronization barrier.

Consumer Protocol

More specifically, we would first inject more metadata at the consumer-layer, as:

KafkaConsumer:

Subscription => TopicList UserData AssignedPartitions
   TopicList               => List<String>
   UserData                => Bytes   
   AssignedPartitions      => List<String, List<Int32>>   // new field


Assignment = AssignedPartitions UserData RevokedPartitions ErrorCode
   AssignedPartitions      => List<String, List<Int32>>
   UserData                => Bytes
   RevokedPartitions       => List<String, List<Int32>>   // new field
   ErrorCode               => Int16                       // new field


Note that it is compatible to inject additional fields after the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some fields before, and some fields (a.k.a. those new fields) after the assignor-specific info bytes, and vice-versa on deserialization. So adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.


As for the newly added error code, here are its possible values (we will talk about their usage later in this document):

NONE:        normal
NEED_REJOIN: indicate the consumer to complete the revocation (if any) and re-join group immediately


In addition, we want to resolve a long-lasting issue that when consumer's being kicked out of the group, since it no longer owns the partitions the `commit` call would doom to fail. To distinguish this case with the normal case that consumers are likely still within the group but just try to re-join, we introduce a new API into the consumer rebalance listener:

public interface ConsumerRebalanceListener {


    void onPartitionsRevoked(Collection<TopicPartition> partitions);


    void onPartitionsAssigned(Collection<TopicPartition> partitions);


    // new API
    default void onPartitionsEmigrated(Collection<TopicPartition> partitions) {
        onPartitionsAssigned(Collection<TopicPartition> partitions);
    }
}

Consumer Coordinator Algorithm

Rebalance behavior of the consumer (captured in the consumer coordinator layer) would be changed as follows.

  1. For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
    1. If subscription has changed, or topic metadata has changed: do NOT revoke any partitions or call rebalance-listener; instead just encode the current assigned partitions as part of the Subscription.
    2. If received REBALANCE_IN_PROGRESS from heartbeat response or commit response: same as a) above.
    3. If received MEMBER_ID_REQUIRED from join-group request: same as a) above.
    4. If received UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION from join-group / sync-group / commit / heartbeat response: reset generation / clear member-id correspondingly, emigrate the partitions with rebalance listener and then re-join group with empty assigned partition.
    5. If received assignment from previous rebalance's sync-group response contains error code NEEDS_REJOIN, revoke partitions as required before sending the join-group request with newly formed assigned partitions.
  2. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Take the cluster metadata and infer all the topic partitions that are targeted for assignment; let's call it total-known-partitions.
    2. Call the registered assignor of the selected protocol, passing in total-known-partitions as part of metadata and get the returned assignment; let's call it newly-assigned-partitions. Note the set of it should be equal to total-known-partitions.
    3. Compare the newly-assigned-partitions with assigned-partitions and generate three exclusive sub-sets:
      1. Intersection(total-partitions, assigned-partitions). These are partitions that are still owned by some members, and some of them may be now allocated for new members. Let's call it prev-assigned-partitions.
      2. Minus(total-partitions, assigned-partitions). These are partitions that are not previously owned by any one. This set is non-empty when its previous owner is on older version and hence revoked them already before joining, or a partition is revoked in previous rebalance by the new versioned member and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call it not-assigned-partitions.
      3. Minus(assigned-partitions, total-partitions). These are partitions that are not known from the cluster metadata but are claimed to be owned by the members. It is non-empty if some topics are deleted, or if the leader's metadata is not refreshed, or if the previous leader has created some topics in its assignor (consider the Streams case). Let's call it unknown-partitions.

    4. For not-assigned-partitions, it is safe to move them to the new member immediately since we know no one owns it before, and hence we can encode the owner from the newly-assigned-partitions directly.
    5. For unknown-partitions, it is also safe to just do nothing but just give them back to whoever claimed to be their owners by encoding them directly as well. If this is due to topic metadata update them a later rebalance will be triggered.
    6. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner, set the error-code to NEED_REJOIN for all the members (this is for fast re-join and rebalance).
  3. For every consumer: after received the sync-group request, do the following:
    1. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
      If this is not true, it means either bugs or incompatible members are forming the group (for incompatible members, see below), and hence the consumer should log a fatal error and shutdown gracefully.
    2. Update the newly assigned-partitions, and for those newly added  partitions, call the rebalance-listener — this is the same as the current logic.
    3. If revoked partitions is not empty, remove those partitions by calling the rebalance-listener.
    4. Check the error code:
      1. If it is NONE, complete.
      2. If it is NEEDS_REJOIN, immediately send another join group request with the updated assigned partitions following step 1.e) above. 


Below is a more illustrative of the different set of partitions and their assignment logic:


No changes required from the broker side, since this logic change is completely wrapped inside the consumer protocol / coordinator implementation itself, and to brokers it is just the same as previous version's rebalances.

Note that one minor difference compared with KIP-415 is that we do not introduce the scheduledDelay in the protocol, but instead the consumer will trigger rebalance immediately. This is because the consumer protocol would applies to all consumers (including streams) and hence should be kept simple, and also because KIP-345 is being developed in parallel which is aimed for tackling the scaling out / rolling bounce scenarios already.

We would omit the common scenarios description here since it is already covered in KIP-415, which is very similar to this KIP with the difference of the scheduledDelay above.


NOTE that for this new algorithm to be effective in reducing rebalance costs, it is really expecting the plug-in assignor to be "sticky" in some way, such that the diff of the newly-assigned-partitions and the existing-assigned-partitions can be small, and hence only a few subset of the total number of partitions need to be revoked / migrated at each rebalance in practice – otherwise, we are just paying more rebalance for little benefits. We will talk about how sticky StreamsAssignor would be updated accordingly in Part II.


Consumer StickyPartitioner

Since we've already encoded the assigned partitions at the consumer protocol layer, for consumer's sticky partitioner we are effectively duplicating this data at both consumer protocol and assignor's user data.

To fix this, we will just let the new versioned byte-code to always encode an empty array at the user data and leverage on the Subscription's assigned partitions instead. The metadata bytes will be slightly changed but it is not exposed to brokers semantically anyways. The only (slight) impact is on describe-group protocol, where the modified metadata bytes format will be visible.


Compatibility and Upgrade Path

Since we are modifying the consumer protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner.

Note that since we are injecting additional fields at the end of the consumer protocol, the new protocol would still be compatible with the old version. That means, an old-versioned consumer would still be able to deserialize a newer-versioned protocol data (as long as we only append new fields at the end, this would be the case).

However, when consumers with V1 is joining the group, there's a key behavioral difference that they would NOT revoke their partitions, and hence it is not safe to re-assign any of their partitions as we did in the current (V0) assignment logic. That means, the leader can only proceed the assignment when it knew that all the members are either on V0, or V1 versions

Another thing to keep in mind that, if the leader itself is still on older version, it would still be able to deserialize the V1 subscription protocol as V0, by ignoring the additional fields, and hence it may "think" everyone is still on V0, while some of them may actually be on the newer version.


Therefore when upgrading we need the new consumer byte-code to first still following the old versioned protocol for both metadata encoding, as well as the behavior (e.g. still revoking before send JoinGroup). And after everyone have upgraded to the new byte-code, we can allow them to start rebalancing with the new versioned protocol. Note that during the later rebalance, it is still possible that consumers will send join-group request with old version (but the key here is that they are all new-version aware), in which case consumer leader can freely adjust its logic based on the aggregated versions. More specifically, we introduce the following new config to Consumer:


Protocol Type
"rebalance.protocol":


type: Enum 
values: {eager, cooperative}
default: eager


When the config value is "eager", the consumer would still use V0 of the consumer protocol as well as the rebalance behavior; if the config value is "cooperative", the consumer will then use the new V1 protocol as well as the new algorithm. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of assignors to register multiple protocols and let leader to auto-switch to new versions, we need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).


The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize V0, but due to compatibility would still be able to deserialize V1 protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group.

As for the upgrade path, we would require users to do two rolling bounces, where:

  1. In the first rolling bounce, keep the rebalance.protocol as "eager" (no need to manually change anything though since it is the default value).
  2. After the first rolling bounce is completely done. Then do a second rolling bounce in which rebalance.protocol is updated to "cooperative". The above logic will make sure that eventually when everyone's sending the join-group request with V1.

There's no changes needed on the V1's new assignment algorithm, since if user's are following the right protocol, then on the first rolling bounce everyone will still follow the rebalance protocol of V0. On the second rolling bounce, the leader should always be on the newer code, so it can recognize both V1 and V0 members (while some are not being bounced with the config change yet). It is safe to just follow the above algorithm since for V0 members, since they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, it is safe to move those partitions to other members immediately based on the assignor's output.


Note that this proposal depends on user's correct behavior that everyone should be on the same "rebalance protocol" eventually, otherwise the we would fall into the case 3.b) forever. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause undefined behavior since the old versioned leader may just proceed with the V0 "eager" assignment while some of the members are actually on V1.

Looking into the Future

The above upgrade path is admittedly not ideal, and to avoid such case to happen again, I'd propose to modify the JoinGroup protocol in this KIP as well:

JoinGroupRequest (v5) => groupId SessionTimeout RebalanceTimeout memberId ProtocolType Protocols
   GroupId                 => String
   SessionTimeout          => Int32
   RebalanceTimeout        => Int32
   MemberId                => String
   ProtocolType            => String
   Protocols               => List<Protocol>
   ProtocolVersion         => Int32                   // new field

Protocol (v0) => ProtocolName ProtocolMetadata
   ProtocolName            => String
   ProtocolMetadata        => Bytes (consumer protocol encoded bytes)


And then on the broker side, when choosing the leader it will pick the one with the highest protocol version instead of picking it "first come first serve".

Again, this change will not benefit the upgrade path at this time, but in the future if we need to change the rebalance protocol again we can actually use one rolling bounce instead since as long as there's one member on the newer version, that consumer will be picked. This can also help saving "version probing" cost on Streams as well.


Edge Cases Discussion

There's a few edge cases worth mentioning here:

Downgrading and Old-Versioned New Member

If a consumer is downgraded after the above upgrade path is complete, it is treated as first leaving the group, and then rejoining the group as an new member with old V0. This situation can also be reflected when a new member with old version V0 is joining a team (probably mistakenly) that has been completely upgraded to V2. At this moment everyone else will still get their existing assigned-partitions and the new comer would not get anything. However if another member left the group as well, then its partitions would not be assigned to anyone due to the logic 3) above. We will rely on the above consumer-side metric so that users would be notified in time.

Old-Versioned Member Become Leader

Since group coordinator would select new leaders within the existing member, even if the new leader has failed after the group has successfully upgraded the new leader should still be V1-aware, and new members of V0 joining within the same generation should not be selected. And with the protocol version we should be guaranteed to prefer higher-versioned member as leader as always.


Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Recommended Upgrade Procedure

As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is like:

  • Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream application to stay with protocol type "consumer".
  • Rolling restart the stream application and the change is automatically applied. This is safe because we are not changing protocol type.

In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far, user could choose to take this much easier workaround.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the doc in implementation plan.

  • No labels