You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 107 Next »

Status

Current state[Under Discussion]

Discussion thread: TBD

JIRA:   KAFKA-8019 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415.

This KIP is trying to customize the incremental rebalancing approach for Kafka consumer client, which will be beneficial for heavy-stateful consumers such as Kafka Streams applications.

In short, the goals of this KIP are:

  • Reduce unnecessary downtime due to unnecessary partition migration: i.e. partitions being revoked and re-assigned.
  • Better rebalance behavior for falling out members.


Background

Consumer Rebalance Protocol: Stop-The-World Effect

As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would

  1. Join group with all currently assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks and resume working.

  3. Replay the assigned tasks state.

  4. Once all replay jobs finish, stream thread transits to running mode.

The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.

Example: Streams Assignor Rebalance Metadata

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its pluggable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.


StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   UserData                => Bytes

------------------


StreamsPartitionAssignor:

UserData (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


And on assignment: 

KafkaConsumer:

Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<String, List<Int32>>
   UserData                => Bytes

------------------

StreamsPartitionAssignor:

UserData (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32


Example: Consumer Sticky Assignor 

We also have a StickyAssignor provided out of the box trying to mitigate the cost of unnecessary partition migrations. This assignor only relies on subscription metadata but not modifying assignment metadata, as follows:

KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   UserData                => Bytes

------------------


StickyAssignor:

UserData (encoded in version 1) => AssignedPartitions
   AssignedPartitions      => List<String, List<Int32>>


The goal of this incremental protocol, is to fully leverage on the sticky assignors which will try to reassign partitions to its previous owners in best effort, such that we will revoke less partitions as possible since the revocation process is costly.


Proposed Changes

We want to separate the protocol improvement into the consumer and streams layer, since streams today embeds a full-fledged consumer instance that hard-code ConsumerCoordinator.

Part I: Incremental Consumer Rebalance Protocol

We will augment the consumer's rebalance protocol as proposed in Incremental Cooperative Rebalancing: Support and Policies with some tweaks compared to KIP-415. The key idea is that, instead of relying on the single rebalance's synchronization barrier to rebalance the group and hence enforce everyone to give up all the assigned partitions before joining the group as the new generation, we use consecutive rebalances where the end of the first rebalance will actually be used as the synchronization barrier.

Consumer Protocol

More specifically, we would first inject more metadata at the consumer-layer, as:

KafkaConsumer:

Subscription => TopicList AssignedPartitions SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes   
   AssignedPartitions      => List<String, List<Int32>>   // new field


Assignment = AssignedPartitions RevokedPartitions AssignmentInfo
   AssignedPartitions      => List<String, List<Int32>>
   AssignmentInfo          => Bytes
   RevokedPartitions       => List<String, List<Int32>>   // new field
   ErrorCode               => Int16                       // new field


Note that it is compatible to inject additional fields after the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some fields before, and some fields (a.k.a. those new fields) after the assignor-specific info bytes, and vice-versa on deserialization. So adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.


Consumer Coordinator Algorithm

Rebalance behavior of the consumer (captured in the consumer coordinator layer) would be changed as follows.

  1. For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
    1. If received REBALANCE_IN_PROGRESS from heartbeat response: do NOT revoke any partitions; instead just encode the current assigned partitions as part of the Subscription.
  2. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Call the registered assignor of the selected protocol to generate the assignment; let's call it newly-assigned-partitions.
    2. Segment the total-partitions set of partitions inferred from the newly-assigned-partitions into two exclusive sub-sets: Intersection(total-partitions, assigned-partitions), and Minus(total-partitions, assigned-partitions).
      Note that the latter is possibly non-empty because a partition maybe revoked in previous rebalance and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call the former prev-assigned-partitions and the latter not-assigned-partitions.
    3. For not-assigned-partitions, we can encode the owner from the newly-assigned-partitions directly since we know no one owns it before, either due to revocation or due to newly created partitions.
    4. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner.
  3. For every consumer: after received the sync-group request, do the following:
    1. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
    2. Check the error code as well, and depending on it move forward to c) below, or trigger a rebalance immediately (for incompatible members, see below), or fail immediately if it is fatal.
    3. Update the newly assigned-partitions, and for those newly added  partitions, call the rebalance-listener — this is the same as the current logic.
    4. If revoked partitions is not empty, remove those partitions by calling the rebalance-listener. And then immediately send another join group request with the updated assigned partitions.


No changes required from the broker side, since this logic change is completely wrapped inside the consumer protocol / coordinator implementation itself, and to brokers it is just the same as previous version's rebalances.

Note that one minor difference compared with KIP-415 is that we do not introduce the scheduledDelay in the protocol, but instead the consumer will trigger rebalance immediately. This is because the consumer protocol would applies to all consumers (including streams) and hence should be kept simple, and also because KIP-345 is being developed in parallel which is aimed for tackling the scaling out / rolling bounce scenarios already.

We would omit the common scenarios description here since it is already covered in KIP-415, which is very similar to this KIP with the difference of the scheduledDelay above.


NOTE that for this new algorithm to be effective in reducing rebalance costs, it is really expecting the plug-in assignor to be "sticky" in some way, such that the diff of the newly-assigned-partitions and the existing-assigned-partitions can be small, and hence only a few subset of the total number of partitions need to be revoked / migrated at each rebalance in practice – otherwise, we are just paying more rebalance for little benefits. We will talk about how sticky StreamsAssignor would be updated accordingly in Part II.


Consumer StickyPartitioner

Since we've already encoded the assigned partitions 


Compatibility and Upgrade Path

Since we are modifying the consumer protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner.

Note that since we are injecting additional fields at the end of the consumer protocol, the new protocol would still be compatible with the old version. That means, an old-versioned consumer would still be able to deserialize a newer-versioned protocol data (as long as we only append new fields at the end, this would be the case).

However, when consumers with V1 is joining the group, there's a key behavioral difference that they would NOT revoke their partitions, and hence it is not safe to re-assign any of their partitions as we did in the current (V0) assignment logic. That means, the leader can only proceed the assignment when it knew that all the members are either on V0, or V1 versions

Another thing to keep in mind that, if the leader itself is still on older version, it would still be able to deserialize the V1 subscription protocol as V0, by ignoring the additional fields, and hence it may "think" everyone is still on V0, while some of them may actually be on the newer version.


Therefore when upgrading we need the new consumer byte-code to first still following the old versioned protocol for both metadata encoding, as well as the behavior (e.g. still revoking before send JoinGroup). And after everyone have upgraded to the new byte-code, we can allow them to start rebalancing with the new versioned protocol. Note that during the later rebalance, it is still possible that consumers will send join-group request with old version (but the key here is that they are all new-version aware), in which case consumer leader can freely adjust its logic based on the aggregated versions. More specifically, we introduce the following new config to Consumer:


Protocol Type
"rebalance.protocol":


type: Enum 
values: {eager, cooperative}
default: eager


When the config value is "eager", the consumer would still use V0 of the consumer protocol as well as the rebalance behavior; if the config value is "cooperative", the consumer will then use the new V1 protocol as well as the new algorithm. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of assignors to register multiple protocols and let leader to auto-switch to new versions, we need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).


We'd update the above algorithm on leader (i.e. bullet point 3) as, such that we will first check the versions of subscription of all the members:

  1. If all members are on V1, then follow the new algorithm.
  2. If all members are on V0, then follow the old algorithm.
  3. If there's no consensus, it means we are in a second rebalance then do the following:
    1. For those members in V1, send assignment in V1 as well by just giving back their existing assigned-partitions from the subscription metadata and leaving the revoked-partitions empty.
    2. For those members in V0, which means they have revoked their partitions but we do not know what are those partitions, we can only given them V0 assignment back with an empty assigned-partitions.


The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize V0, but due to compatibility would still be able to deserialize V1 protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group.


So under case 3), those members in V0 will have a small window where no partitions are assigned, and some partitions are not assigned to anyone as well. Hence, we will add a new ConsumerCoordinatorMetrics, which client will record upon receiving the assignment with the error code

Protocol Type
"num.incompatible.rebalance": "total number of rebalances that have failed due to incompatible members joining group at the same time."


group: "consumer-coordinator-metrics"
tags: client-id=([-.\w]+)
type: Count

The client, upon receiving the assignment associated with the error-code, would record the above metric, and backoff and re-send join-group request.


As for the upgrade path, we would require users to do two rolling bounces, where:

  1. In the first rolling bounce, keep the rebalance.protocol as "eager" (no need to manually change anything though since it is the default value).
  2. After the first rolling bounce is completely done. Then do a second rolling bounce in which rebalance.protocol is updated to "cooperative". The above logic will make sure that eventually when everyone's sending the join-group request with V1.


Note that this proposal depends on user's correct behavior that everyone should be on the same "rebalance protocol" eventually, otherwise the we would fall into the case 3.b) forever where some partitions would not be assigned to anyone. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause undefined behavior since the old versioned leader may just proceed with the V0 "eager" assignment while some of the members are actually on V1.


There's a few edge cases worth mentioning here:

Downgrading and Old-Versioned New Member

If a consumer is downgraded after the above upgrade path is complete, it is treated as first leaving the group, and then rejoining the group as an new member with old V0. This situation can also be reflected when a new member with old version V0 is joining a team (probably mistakenly) that has been completely upgraded to V2. At this moment everyone else will still get their existing assigned-partitions and the new comer would not get anything. However if another member left the group as well, then its partitions would not be assigned to anyone due to the logic 3) above. We will rely on the above consumer-side metric so that users would be notified in time.

Old-Versioned Member Become Leader

Since group coordinator would select new leaders within the existing member, even if the new leader has failed after the group has successfully upgraded the new leader should still be V1-aware, and new members of V0 joining within the same generation should not be selected.

Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Recommended Upgrade Procedure

As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is like:

  • Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream application to stay with protocol type "consumer".
  • Rolling restart the stream application and the change is automatically applied. This is safe because we are not changing protocol type.

In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far, user could choose to take this much easier workaround.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the doc in implementation plan.

  • No labels