DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In a consumer group, each member consumes from the list of topic partitions assigned to it. Assignments are recalculated whenever a member's subscription changes, members leave or join the consumer group or the metadata of subscribed topics (racks and number of partitions) changes. Previously, assignments were calculated on the client and it was acceptable for assignors to take a few hundred milliseconds for large groups with thousands of members. As part of the new consumer group protocol introduced in KIP-848, assignors were moved from the client to the broker. Now, while calculating an assignment, one of the group.coordinator.threads is blocked and no progress can be made for other requests for the same __consumer_offsets partition. The same applies to assignments for the new share groups and streams groups since they share the same group coordinator.
The current slowest implemented assignor is the consumer protocol's non-rack aware uniform assignor for heterogeneous subscriptions. With 1,000 members, 1,000 topics and 50,000 partitions, this assignor takes 15 to 20 ms to run and the running time scales roughly linearly with the number of members and number of topics. The rack aware version of the assignor is expected to be even slower. Under normal operation, the occasional 20 ms assignor runs are not ideal but not fatal to the group coordinator - it means that once in a while we will take 20 ms longer than usual to respond to requests. However, when such a group is rebalancing frequently, either due to connectivity issues or a poorly written application, the group coordinator may not be able to keep up and effectively become unavailable for other groups. In the worst case, a poorly written application could trigger a new assignment run on every one of the 200 heartbeats per second (assuming the default heartbeat interval of 5 seconds) and exhaust group coordinator resources, since it's not possible to perform 200x20 ms assignor runs per second.
We would like a way to keep the group coordinator stable in the presence of large groups with expensive assignor calculations. Two mechanisms are proposed:
- A minimum delay between assignor runs. This makes it much harder for a badly behaved large group to exhaust group coordinator resources.
- The option to run assignors asynchronously on the group coordinator's executor threads. This would allow for more consistent response times even when slow assignors are being run.
Proposed Changes
Assignment Batching
When a consumer group is scaling up or undergoing a rolling restart, we currently recompute the assignment upon each heartbeat for every new member. This can be wasteful and expensive when the consumer group is large.
It is proposed that we only recompute the assignment at intervals. During a heartbeat, when the assignment is out of date (the assignment epoch is less than the group’s epoch due to subscription, membership or topic metadata changes), we will check when the assignment was last computed and only recompute the assignment when we finished computing the previous assignment at least N seconds ago. When there is no previous assignment or we do not know when the previous assignment was computed, we compute the assignment immediately.
If the last member of a group joins and we do not immediately recompute the assignment, we will do so on a future heartbeat from any member once the assignment interval has elapsed. Heartbeats occur every 5 seconds by default.
The timestamp of the last assignor run shall be written to __consumer_offsets as part of the target assignment metadata. Note that if we fail to write updated target assignment metadata to __consumer_offsets for whatever reason, the timestamp will not be updated and we may allow another assignor run sooner than intended.
The assignment interval will be controlled by a new config option, group.consumer.assignment.interval.ms. The config option shall be dynamic and also overridable at the group level. A good default choice for the config is 1 second. This trades off an acceptable increase in rebalance times (see the Impact on Rebalance Times section below) in exchange for protecting group coordinator stability against a single group triggering repeated expensive assignor calculations, as long as the assignor runs take less than 1 second.
Static group.consumer.min.assignment.interval.ms and group.consumer.max.assignment.interval.ms config options will also be added to bound group.consumer.assignment.interval.ms, analogous to session.timeout.ms and heartbeat.interval.ms. group.consumer.min.assignment.interval.ms will default to 0 ms, to allow assignment batching to be disabled. group.consumer.max.assignment.interval.ms will default to 15,000 ms, the same as group.consumer.max.heartbeat.interval.ms. It is not recommended to configure the assignment interval to longer than one heartbeat interval as it slows down rebalancing for not much additional benefit.
The same applies for share and streams groups. For streams groups, the group.streams.assignment.interval.ms is independent of group.streams.initial.rebalance.delay.ms and starts after the initial rebalance delay. There is currently no consumer or share group initial.rebalance.delay.ms option.
Impact on Rebalance Times
Consider a consumer group scaling up where all members have the same subscription. We compare the process without and with an assignment.interval.ms.
The members are numbered 1 to N and their heartbeats occur in the same order. We assume that clients revoke partitions and notify the broker almost instantly.
| Time | assignment.interval ms = 0 ms | Time | assignment.interval ms = 1,000 ms |
|---|---|---|---|
Members 1 to N-1 join the group. Target Assignment: Members 1 to N-1 have all partitions. | Members 1 to N-1 join the group. Target Assignment: Members 1 to N-1 have all partitions. | ||
| +0 ms | Member N joins the group. Target Assignment: Members 1 to N have all partitions. | +0 ms | Member N joins the group. No new target assignment yet. |
| +0 to +5,000 ms | Members 1 to N-1 heartbeat and discover they must revoke partitions. | +0 to +800 ms | Members 1 to X-1 heartbeat. |
| +800 ms | New target assignment may be computed. Member X heartbeats and discovers it must revoke partitions. Target Assignment: Members 1 to N have all partitions. | ||
| +800 ms to +5,000 ms | Members X+1 to N-1 heartbeat and discover they must revoke partitions. | ||
| +5,000 ms | Member N heartbeats, gaining the revoked partitions. The rebalance has completed. | +5,000 ms | Member N heartbeats, gaining some but not all revoked partitions. |
| +5,000 ms to +5,800 ms | Members 1 to X-1 heartbeat and discover they must revoke partitions. Members 1 to X-1 heartbeat again immediately after revoking partitions. | ||
| +10,000 ms | Member N heartbeats, gaining the rest of the revoked partitions. The rebalance has completed. |
We see that for a large consumer or streams group, the rebalance is very likely to take 1 heartbeat interval longer to complete when assignment.interval.ms is set.
Rebalance times for small groups with 1 or 2 members are unaffected, since the rebalance would be complete by +5,000 ms.
Rebalance times for share groups are less affected, taking up to 1 heartbeat interval longer, since members do not have to wait for other members to revoke partitions.
Assignment Offload
We propose adding the option to offload assignor runs to the group coordinator executor, so that slow assignors do not hold up request processing.
A new config option, group.consumer.assignor.offload.enable, will be added. When the config is enabled, we will offload assignment calculation to the group coordinator executor during heartbeats and disallow new assignment calculations when one is already in-flight. The new assignment will usually be available on the next heartbeat, which is 5 seconds later by default. The config option shall be dynamic and overridable at the group level. It will be enabled by default. Similar to assignment batching, this trades off an acceptable increase in rebalance times (see the Impact on Rebalance Times section below) in exchange for protecting group coordinator stability against groups triggering expensive assignor calculations.
When the first member joins a new group, we will not have an assignment ready for them in the first heartbeat response. We have to respond with a valid member epoch greater than 0 and an assignment. Thus, we reserve epoch 1 of new groups to contain an empty assignment for all members. The first computed assignment will be at epoch 2. Existing groups will keep their current epoch numbering and no migration is proposed.
During an offloaded assignment calculation, the group may be deleted or downgraded, members may be removed from the group and static members may be replaced without necessarily bumping the group epoch and triggering a subsequent assignor run. Currently, we handle these by updating the target assignment directly. When assignment calculation is offloaded, we must apply the missed changes to the new target assignment once it is ready: deleted/downgraded groups will have their assignments discarded, removed members will have their assignments discarded and replaced static members will have their assignments moved to the latest static member instance.
Changes to topic metadata such as racks and partition counts are currently handled by bumping the group epoch, which triggers another assignor run.
The same applies for share and streams groups.
As we will be making heavier use of the group coordinator executor, we will add a new group.coordinator.background.threads config option to control the number of threads in the executor. Currently, the group coordinator executor only has a single thread. We will default the new config option to 2 threads, to reduce contention when multiple groups want to run assignors at the same time. We will also add thread idle ratio, queue and processing time metrics for the executor, analogous to the existing thread-idle-ratio-avg, event-queue-time-ms and event-processing-time-ms metrics.
Impact on Rebalance Times
Consider a consumer group scaling up where all members have the same subscription. We compare the process without and with assignor.offload.enable set.
We assume that clients revoke partitions and notify the broker almost instantly. We also assume that offloaded assignments are instantaneous and always available on the next request.
| Time | assignor.offload.enable = false | Time | assignor.offload.enable = true |
|---|---|---|---|
Members 1 to N-1 join the group. Target Assignment: Members 1 to N-1 have all partitions. | Members 1 to N-1 join the group. Target Assignment: Members 1 to N-1 have all partitions. | ||
| +0 ms | Member N joins the group. Target Assignment: Members 1 to N have all partitions. | +0 ms | Member N joins the group. New target assignment available on next request. |
| +0 to +5,000 ms | Members 1 to N-1 heartbeat and discover they must revoke partitions. | +0 to +5,000 ms | Members 1 to N-1 heartbeat and discover they must revoke partitions. Members 1 to N-1 heartbeat again immediately after revoking partitions. |
| +5,000 ms | Member N heartbeats, gaining the revoked partitions. The rebalance has completed. | +5,000 ms | Member N heartbeats, gaining the revoked partitions. The rebalance has completed. |
We see that for a large consumer or streams group, the rebalance time is unchanged when assignor.offload.enable is set.
Rebalance times for groups with a single member will take 1 heartbeat interval longer, since assignments are only ready on the second heartbeat. Note that this will impact startup times for tooling such as kafka-console-consumer
Rebalance times for share groups will take up to 1 heartbeat interval longer, since assignments are only ready on the second heartbeat whereas members previously did not have to wait for other members to revoke partitions.
Note that the change in rebalance times does not stack with the impact from assignment batching. When both features are combined, rebalance times increase by 1 heartbeat interval at most.
Public Interfaces
Broker Metrics
Three new sets of metrics will be added, now that the group coordinator executor may run assignors:
- The idle ratio of the executor thread pool, analogous to the event processor's thread-idle-ratio-avg metric.
- kafka.server:type=group-coordinator-metrics,name=background-thread-idle-ratio-avg
- The queue time of tasks in the group coordinator executor.
- kafka.server:type=group-coordinator-metrics,name=background-queue-time-ms-p50
- kafka.server:type=group-coordinator-metrics,name=background-queue-time-ms-p95
- kafka.server:type=group-coordinator-metrics,name=background-queue-time-ms-p99
- kafka.server:type=group-coordinator-metrics,name=background-queue-time-ms-p999
- kafka.server:type=group-coordinator-metrics,name=background-queue-time-ms-max
- The processing time of tasks in the group coordinator executor.
- kafka.server:type=group-coordinator-metrics,name=background-processing-time-ms-p50
- kafka.server:type=group-coordinator-metrics,name=background-processing-time-ms-p95
- kafka.server:type=group-coordinator-metrics,name=background-processing-time-ms-p99
- kafka.server:type=group-coordinator-metrics,name=background-processing-time-ms-p999
- kafka.server:type=group-coordinator-metrics,name=background-processing-time-ms-max
Although the share coordinator shares the same coordinator runtime and metrics code as the group coordinator, we intentionally avoid adding the same executor metrics to the share coordinator as its executor is currently unused.
Broker Configurations
We will add config options to specify the minimum delay between two assignor runs for the same group and whether to offload assignor runs to the group coordinator executor. These config options shall be dynamic.
We also add a group.coordinator.background.threads config option to control the number of threads in the group coordinator executor, since it may be used for running assignors going forward. Currently, the group coordinator executor only has a single thread. We will default this config option to 2 threads, to reduce contention when multiple groups want to run assignors at the same time. This config option will not be dynamic.
The effective "previous" values of the new config options are also noted. The proposed defaults will increase rebalance times by 1 heartbeat in exchange for protecting the stability of the group coordinator.
| Name | Type | "Previous" Value | Default | Doc |
|---|---|---|---|---|
| group.coordinator.background.threads | int | 1 | 2 | The number of threads used by the group coordinator for processing background tasks (e.g. updating regular expression subscriptions and offloaded assignments). |
| group.consumer.assignment.interval.ms | int | 0 ms | 1,000 ms | The interval between assignment updates for a consumer group. |
| group.share.assignment.interval.ms | int | 0 ms | 1,000 ms | The interval between assignment updates for a share group. |
| group.streams.assignment.interval.ms | int | 0 ms | 1,000 ms | The interval between assignment updates for a streams group. |
| group.consumer.min.assignment.interval.ms | int | - | 0 ms | The minimum interval between assignment updates for a consumer group. |
| group.share.min.assignment.interval.ms | int | - | 0 ms | The minimum interval between assignment updates for a share group. |
| group.streams.min.assignment.interval.ms | int | - | 0 ms | The minimum interval between assignment updates for a streams group. |
| group.consumer.max.assignment.interval.ms | int | - | 15,000 ms | The maximum interval between assignment updates for a consumer group. |
| group.share.max.assignment.interval.ms | int | - | 15,000 ms | The maximum interval between assignment updates for a share group. |
| group.streams.max.assignment.interval.ms | int | - | 15,000 ms | The maximum interval between assignment updates for a streams group. |
| group.consumer.assignor.offload.enable | bool | false | true | Whether to offload consumer group assignment to a group coordinator background thread. |
| group.share.assignor.offload.enable | bool | false | true | Whether to offload share group assignment to a group coordinator background thread. |
| group.streams.assignor.offload.enable | bool | false | true | Whether to offload streams group assignment to a group coordinator background thread. |
Group Configurations
The assignment.interval.ms and assignor.offload.enable configs may be dynamically overridden on a per-group basis. An assignment.interval.ms value of -1 indicates that the broker-level configuration should be used.
| Name | Type | Default | Doc |
|---|---|---|---|
| consumer.assignment.interval.ms | int | -1 | The interval between assignment updates for a consumer group. |
| share.assignment.interval.ms | int | -1 | The interval between assignment updates for a share group. |
| streams.assignment.interval.ms | int | -1 | The interval between assignment updates for a streams group. |
| consumer.assignor.offload.enable | bool | not set | Whether to offload consumer group assignment to a group coordinator background thread. |
| share.assignor.offload.enable | bool | not set | Whether to offload share group assignment to a group coordinator background thread. |
| streams.assignor.offload.enable | bool | not set | Whether to offload streams group assignment to a group coordinator background thread. |
KRPC
ConsumerGroupHeartbeat API
Request Schema
No changes.
Required ACL
No changes.
Request Validation
No changes.
Request Handling
New groups have their group epoch initialized to 2, to allow epoch 1 to be reserved for an empty target assignment. This was already implemented for streams groups as part of KAFKA-19829.
When the group's target assignment epoch is less than the group epoch and we have to compute a new target assignment, we now:
- Check whether there is an in-flight assignor run for the group on the executor.
- When there is an in-flight assignor run for the group, there is no new target assignment. We will trigger the next assignor run on a future heartbeat from any member once there is no in-flight assignor run and the assignment interval has elapsed.
- Determine the assignment interval for the group.
- If
consumer.assignment.interval.msis set for the group and not-1, the assignment interval is the group'sconsumer.assignment.interval.ms. - Otherwise, the assignment interval is the broker's
group.consumer.assignment.interval.ms.
- If
- Check whether there is a previous assignment timestamp and the assignment interval has not elapsed since we finished computing the previous target assignment.
- If so, there is no new target assignment. We will trigger the next assignor run on a future heartbeat from any member once the assignment interval has elapsed.
- Start computing the new target assignment.
- If
group.consumer.assignor.offload.enableis set, we hand the assignor run to the executor and there is no new target assignment yet. - Otherwise, we run the assignor synchronously and the new target assignment is the assignor result.
- If
When there is no previous target assignment for the group and no new target assignment after the above, we treat it as an empty target assignment at epoch 1 during reconciliation.
We modify reconciliation to revoke any partitions the member is no longer subscribed to, since the target assignment may lag behind member subscriptions. Regular expression subscriptions that have not been resolved yet are treated as an empty subscription. This was implemented as part of KAKFA-19431.
Response Schema
No changes.
Response Handling
No changes.
While there are no changes to the HeartbeatIntervalMs in the response proposed in this KIP, we would like to highlight that we do not guarantee that HeartbeatIntervalMs will remain constant during the lifetime of a group or that all members will receive the same HeartbeatIntervalMs values. Future broker implementations may vary HeartbeatIntervalMs.
JoinGroup API
This section only applies to mixed groups containing both classic and consumer protocol members.
Request Schema
No changes.
Required ACL
No changes.
Request Validation
No changes.
Request Handling
This section only applies to mixed groups containing both classic and consumer protocol members.
When the group's target assignment epoch is less than the group epoch and we have to compute a new target assignment, we now:
- Check whether there is an in-flight assignor run for the group on the executor.
- When there is an in-flight assignor run for the group, there is no new target assignment. We rely on a consumer protocol member's heartbeat to trigger the next assignor run.
- Determine the assignment interval for the group.
- If
consumer.assignment.interval.msis set for the group and not-1, the assignment interval is the group'sconsumer.assignment.interval.ms. - Otherwise, the assignment interval is the broker's
group.consumer.assignment.interval.ms.
- If
- Check whether there is a previous assignment timestamp and the assignment interval has not elapsed since we finished computing the previous target assignment.
- If so, there is no new target assignment. We rely on a consumer protocol member's heartbeat to trigger the next assignor run.
- Start computing the new target assignment.
- If
group.consumer.assignor.offload.enableis set, we hand the assignor run to the executor and there is no new target assignment yet. - Otherwise, we run the assignor synchronously and the new target assignment is the assignor result.
- If
When there is no previous target assignment for the group and no new target assignment after the above, we treat it as an empty target assignment at epoch 1 during reconciliation.
We modify reconciliation to revoke any partitions the member is no longer subscribed to, since the target assignment may lag behind member subscriptions and the official classic consumer will reject the assignment and rejoin the group when it sees any partition it is not subscribed to. This was implemented as part of KAKFA-19431.
Response Schema
No changes.
Response Handling
No changes.
SyncGroup API
This section only applies to mixed groups containing both classic and consumer protocol members.
Request Schema
No changes.
Required ACL
No changes.
Request Validation
No changes.
Request Handling
This section only applies to mixed groups containing both classic and consumer protocol members.
Currently we return a REBALANCE_IN_PROGRESS error when the member epoch is less than the group epoch and the member does not have unrevoked partitions, to get the member to rejoin the group.
Now that the target assignment can lag behind the group epoch, we adjust the condition to only return a REBALANCE_IN_PROGRESS when the member epoch is less than the target assignment epoch and the member does not have unrevoked partitions.
Response Schema
No changes.
Response Handling
No changes.
Heartbeat API
This section only applies to mixed groups containing both classic and consumer protocol members.
Request Schema
No changes.
Required ACL
No changes.
Request Validation
No changes.
Request Handling
This section only applies to mixed groups containing both classic and consumer protocol members.
Currently we return a REBALANCE_IN_PROGRESS error when the member epoch is less than the group epoch, to get the member to rejoin the group and pick up its new assignment.
Now that the target assignment can lag behind the group epoch, we adjust the condition to only return a REBALANCE_IN_PROGRESS when the member epoch is less than the target assignment epoch.
This condition will be hit once an offloaded assignor run has completed, or a batched assignor run has been triggered by a consumer protocol member.
Response Schema
No changes.
Response Handling
No changes.
ShareGroupHeartbeat API
Request Schema
No changes.
Required ACL
No changes.
Request Validation
No changes.
Request Handling
New groups have their group epoch initialized to 2, to allow epoch 1 to be reserved for an empty target assignment. This was already implemented for streams groups as part of KAFKA-19829.
When the group's target assignment epoch is less than the group epoch and we have to compute a new target assignment, we now:
- Check whether there is an in-flight assignor run for the group on the executor.
- When there is an in-flight assignor run for the group, there is no new target assignment. We will trigger the next assignor run on a future heartbeat from any member once there is no in-flight assignor run and the assignment interval has elapsed.
- Determine the assignment interval for the group.
- If
share.assignment.interval.msis set for the group and not-1, the assignment interval is the group'sshare.assignment.interval.ms. - Otherwise, the assignment interval is the broker's
group.share.assignment.interval.ms.
- If
- Check whether there is a previous assignment timestamp and the assignment interval has not elapsed since we finished computing the previous target assignment.
- If so, there is no new target assignment. We will will trigger the next assignor run on a future heartbeat from any member once the assignment interval has elapsed.
- Start computing the new target assignment.
- If
group.share.assignor.offload.enableis set, we hand the assignor run to the executor and there is no new target assignment yet. - Otherwise, we run the assignor synchronously and the new target assignment is the assignor result.
- If
When there is no previous target assignment for the group and no new target assignment after the above, we treat it as an empty target assignment at epoch 1 during reconciliation.
We modify reconciliation to revoke any partitions the member is no longer subscribed to, since the target assignment may lag behind member subscriptions. This was implemented as part of KAKFA-19431.
Response Schema
No changes.
Response Handling
No changes.
While there are no changes to the HeartbeatIntervalMs in the response proposed in this KIP, we would like to highlight that we do not guarantee that HeartbeatIntervalMs will remain constant during the lifetime of a group or that all members will receive the same HeartbeatIntervalMs values. Future broker implementations may vary HeartbeatIntervalMs.
StreamsGroupHeartbeat API
Request Schema
No changes.
Required ACL
No changes.
Request Validation
No changes.
Request Handling
New groups have their group epoch initialized to 2, to allow epoch 1 to be reserved for an empty target assignment. This was already implemented for streams groups as part of KAFKA-19829.
When the group's target assignment epoch is less than the group epoch and we have to compute a new target assignment, we now:
- Check whether there is an in-flight assignor run for the group on the executor.
- When there is an in-flight assignor run for the group, there is no new target assignment. We will trigger the next assignor run on a future heartbeat from any member once there is no in-flight assignor run and the assignment interval has elapsed.
- Determine the assignment interval for the group.
- If
streams.assignment.interval.msis set for the group and not-1, the assignment interval is the group'sstreams.assignment.interval.ms. - Otherwise, the assignment interval is the broker's
group.streams.assignment.interval.ms.
- If
- Check whether there is a previous assignment timestamp and the assignment interval has not elapsed since we finished computing the previous target assignment.
- If so, there is no new target assignment. We will trigger the next assignor run on a future heartbeat from any member once the assignment interval has elapsed.
- Start computing the new target assignment.
- If
group.streams.assignor.offload.enableis set, we hand the assignor run to the executor and there is no new target assignment yet. - Otherwise, we run the assignor synchronously and the new target assignment is the assignor result.
- If
When there is no previous target assignment for the group and no new target assignment after the above, we treat it as an empty target assignment at epoch 1 during reconciliation.
Response Schema
No changes.
Response Handling
No changes.
While there are no changes to the HeartbeatIntervalMs in the response proposed in this KIP, we would like to highlight that we do not guarantee that HeartbeatIntervalMs will remain constant during the lifetime of a group or that all members will receive the same HeartbeatIntervalMs values. Future broker implementations may vary HeartbeatIntervalMs.
Records
Target Assignment
ConsumerGroupTargetAssignmentMetadataValue
We add a timestamp field to the target assignment metadata. The field will be tagged to allow broker downgrades and default to 0 when not present. The version number will not be bumped.
{
"apiKey": 6,
"type": "coordinator-value",
"name": "ConsumerGroupTargetAssignmentMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
"about": "The assignment epoch." },
// Add Timestamp field.
{ "name": "Timestamp", "versions": "0+", "taggedVersions": "0+", "tag": 0, "type": "int64", "default": 0, "ignorable": true,
"about": "The time at which the assignment calculation finished."}
]
}
ShareGroupTargetAssignmentMetadataValue
We add a timestamp field to the target assignment metadata. The field will be tagged to allow broker downgrades and default to 0 when not present. The version number will not be bumped.
{
"apiKey": 12,
"type": "coordinator-value",
"name": "ShareGroupTargetAssignmentMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
"about": "The assignment epoch." },
// Add Timestamp field.
{ "name": "Timestamp", "versions": "0+", "taggedVersions": "0+", "tag": 0, "type": "int64", "default": 0, "ignorable": true,
"about": "The time at which the assignment calculation finished."}
]
}
StreamsGroupTargetAssignmentMetadataValue
We add a timestamp field to the target assignment metadata. The field will be tagged to allow broker downgrades and default to 0 when not present. The version number will not be bumped.
{
"apiKey": 20,
"type": "coordinator-value",
"name": "StreamsGroupTargetAssignmentMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
"about": "The assignment epoch." },
// Add Timestamp field.
{ "name": "Timestamp", "versions": "0+", "taggedVersions": "0+", "tag": 0, "type": "int64", "default": 0, "ignorable": true,
"about": "The time at which the assignment calculation finished."}
]
}
Compatibility, Deprecation, and Migration Plan
No compatibility issues are expected.
On broker upgrade, the effective assignment.interval.ms will increase from 0 to 1,000 ms and assignor.offload.enable will be enabled by default when not configured.
Deprecation
It is believed that users generally will not need to disable assignor.offload.enable. We may consider deprecating and removing the assignor.offload.enable configs in the future.
Test Plan
Unit tests for consumer, share and streams group heartbeats will be added with the new config options set.
Existing consumer, share and streams group integration tests will be reused by adding variants with the new config options set.
Rejected Alternatives
Size-based Offloading
Instead of always offloading assignment calculation to the group coordinator executor when group.*.assignor.offload.enable is enabled, we could offload assignments only for groups above a certain size threshold.
This was not included in the initial proposal because it introduces extra complexity and the choice of the threshold is not clear.