...
- Standby task shouldn’t be assigned to client which already have the active task
- Ideally, standby tasks should be in different rack from corresponding active tasks
- Ideally, standby tasks themselves should be in different racks
- Ideally, cross rack traffic cost should be minimized
- Assignment should be deterministic
There are several options to balance between reliability and efficiency for standby task assignment.While it's possible to come up very complex min-cost flow algorithms for standby assignment, I think it's OK to use an easy greedy algorithm for standby assignment initially:
I. Balance reliability over cost
...
Code Block |
---|
for (ClientState client : ClientStates) { for (Task standby task : client.standbyTasks) { for (ClientState otherClient : ClientState) { if (client.equals(otherClient) || otherClient.contains(task)) { continue; } for (Task otherStandbyTask : otherClient.standbyTask) { if (swap task and otherStandbyTask is feasible and have smaller cost) { swap(standbyTask, otherStandbyTask); } } } } |
II. Prefer reliability and then find optimal cost
We could also first find a more reliable standby assignment and then if we have a certain number of reliable standby assignments. We switch to prefer to minimize cost. For example, if we would like at least 1 standby to be in a different rack from the active, when we assign the first standby, we can make clients which are in a different rack from the active client have less cost. We can adjust the cost function as below:
Code Block |
---|
int getCost(Task t, Client c) {
final int TRAFFIC_WEIGHT = 1;
final int SAME_RACK_WEIGHT = 10;
int cost = 0;
// add weights for cross-AZ traffic
for TopicPartion tp : t.topicPartitions() {
if (tp has no replica in same rack as c) {
cost += TRAFFIC_WEIGHT;
}
}
// add weights for assignment with different racks
for (Client c1 which has t assigned) {
if (c is in same rack as other clients which has t) {
cost += SAME_RACK_WEIGHT;
}
}
} |
After we assign the first standby to different racks as much as possible, we adjust the cost function to give more weight for TRAFFIC_WEIGHT
so that we prefer low traffic assignments.
Option II is more configurable and may result in lower CRT cost. But it’s more complicated.
III. Summary
In summary, we can also add a configuration such as internal.rack.aware.assignment.standby.strategy
with value greedy
and balance
to choose between algorithm I and algorithm II. We can make algorithm II as default.
D. What if both rack information and rack aware tags are present
KIP-708: Rack aware StandbyTask assignment for Kafka Streams added rack awareness configurations for Kafka Streams with rack.aware.assignment.tags
and client.tag
. The goal is to use the tags to identify locations of clients and try to put tasks in clients of different locations as much as possible. This configuration could possibly be a superset of clients’ rack configuration in consumer which is client.rack
. There are several options on how to assign standby tasks when both configurations are present.
I. Choose a default one
If both rack aware tags and client rack information in subscription are present, we could default to use one for standby assignment. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys. So we default to rack aware tags.
II. Enforce client.rack information must be present in rack.aware.assignment.tags
For example, if a client configures consumer rack information as client.rack: az1
. Then rack aware tags must contain rack.aware.assignment.tags: rack
and client.tag.rack: az1
. We can fail the assignment if above is not satisfied.
III. Check client.rack information must be present in rack.aware.assignment.tags but don’t enforce it
Option II is too strict. We could just check and warn the users in logs if both client.rack
and rack.aware.assignment.tags
are configured but rack doesn’t exist in rack.aware.assignment.tags
or client.tag.rack
doesn’t have the same value as client.rack
. However, we continue processing the assignment using option I by defaulting to one of the configurations.
IV. Summary
I think option III is good enough for us and we can default to rack.aware.assignment.tags
configuration since it can contain more information than client.rack
.
E. Assignment for stateless tasks
We can use a min-cost flow algorithm to assign stateless tasks. First, we can find a feasible solution based on task load. Then we can use the cycle canceling algorithm to compute minimum cost assignment as we do for active tasks. We can reuse the configuration for active tasks to control if we merely compute the min-cost assignment or we also balance tasks for the same sub-topology.
4. Public Interfaces
There will be a few new public configs added:
- rack.aware.assignment.strategy: this config controls if we should use 4.B.I or 4.B.II to compute min-cost with or without balanced sub-topology
- rack.aware.assignment.enabled: this config controls if we should enable rack aware assignment
- rack.aware.assignment.traffic_cost: this config controls the cost we add to cross rack traffic
- rack.aware.assignment.non_overlap_cost: this config controls the cost we add to non overlap assignment with targeted assignment computed by HAAssignor
Note client.rack
consumer config needs to be configured by customer to enable rack aware assignment.
5. Compatibility, Deprecation, and Migration Plan
We will implement the new rack aware assignment logic in HighAvailabilityTaskAssignor
and use the new assignor only when client rack information appears in Subscriptions and at least one of the TopicPartitions in some tasks have different cost compared to other TopicPartitions. This is because if all of the have the same cost, there's no point to use rack aware assignment logic to compute min cost since assigning tasks to any clients doesn't make a difference. If users want to use rack aware assignment, they need to upgrade Kafka Streams to at least the earliest supported version. It's better to stop all the instances and start them all with the latest version with client.rack
config, but it's also OK to do a rolling bounce with the new version. Rolling bounce with new version may cause the assignment changing between rack aware assignment and no rack aware assignment but eventually the assignment will be computed using rack awareness after every instance uses the new version.
6. Test Plan
- Unit test will be added for min-cost flow graph algorithm and rack aware assignment
- Existing integration test will be updated to ensure it works with or without rack configuration as well as with both
client.rack
andrack.aware.assignment.tags
configurations - New integration test will be added to verify rack aware assignment
- Existing system test will be used to verify compatibility with old version
TaskAssignorConvergenceTest
will be updated to verify the new assignor will converge and it produces no worse assignment in terms of cross AZ cost- Performance test will be done manually to verify the efficiency of min-cost flow computation with large number of tasks and clients as well as different number of subscribed topic partitions
7. Rejected Alternatives
III. Summary
In summary, we can also add a configuration such as internal.rack.aware.assignment.standby.strategy
with value greedy
and balance
to choose between algorithm I and algorithm II. We can make algorithm II as default.
D. What if both rack information and rack aware tags are present
KIP-708: Rack aware StandbyTask assignment for Kafka Streams added rack awareness configurations for Kafka Streams with rack.aware.assignment.tags
and client.tag
. The goal is to use the tags to identify locations of clients and try to put tasks in clients of different locations as much as possible. This configuration could possibly be a superset of clients’ rack configuration in consumer which is client.rack
. There are several options on how to assign standby tasks when both configurations are present.
I. Choose a default one
If both rack aware tags and client rack information in subscription are present, we could default to use one for standby assignment. Both can be used to compute if two clients are in the same rack. Rack aware tags may have more information since it can support more keys. So we default to rack aware tags.
II. Enforce client.rack information must be present in rack.aware.assignment.tags
For example, if a client configures consumer rack information as client.rack: az1
. Then rack aware tags must contain rack.aware.assignment.tags: rack
and client.tag.rack: az1
. We can fail the assignment if above is not satisfied.
III. Check client.rack information must be present in rack.aware.assignment.tags but don’t enforce it
Option II is too strict. We could just check and warn the users in logs if both client.rack
and rack.aware.assignment.tags
are configured but rack doesn’t exist in rack.aware.assignment.tags
or client.tag.rack
doesn’t have the same value as client.rack
. However, we continue processing the assignment using option I by defaulting to one of the configurations.
IV. Summary
I think option III is good enough for us and we can default to rack.aware.assignment.tags
configuration since it can contain more information than client.rack
.
E. Assignment for stateless tasks
We can use a min-cost flow algorithm to assign stateless tasks. First, we can find a feasible solution based on task load. Then we can use the cycle canceling algorithm to compute minimum cost assignment as we do for active tasks. We can reuse the configuration for active tasks to control if we merely compute the min-cost assignment or we also balance tasks for the same sub-topology.
4. Public Interfaces
There will be a few new public configs added:
- rack.aware.assignment.strategy: this config controls if we should use 4.B.I or 4.B.II to compute min-cost with or without balanced sub-topology
- rack.aware.assignment.enabled: this config controls if we should enable rack aware assignment
- rack.aware.assignment.traffic_cost: this config controls the cost we add to cross rack traffic
- rack.aware.assignment.non_overlap_cost: this config controls the cost we add to non overlap assignment with targeted assignment computed by HAAssignor
Note client.rack
consumer config needs to be configured by customer to enable rack aware assignment.
5. Compatibility, Deprecation, and Migration Plan
We will implement the new rack aware assignment logic in HighAvailabilityTaskAssignor
and use the new assignor only when client rack information appears in Subscriptions and at least one of the TopicPartitions in some tasks have different cost compared to other TopicPartitions. This is because if all of the have the same cost, there's no point to use rack aware assignment logic to compute min cost since assigning tasks to any clients doesn't make a difference. If users want to use rack aware assignment, they need to upgrade Kafka Streams to at least the earliest supported version. It's better to stop all the instances and start them all with the latest version with client.rack
config, but it's also OK to do a rolling bounce with the new version. Rolling bounce with new version may cause the assignment changing between rack aware assignment and no rack aware assignment but eventually the assignment will be computed using rack awareness after every instance uses the new version.
6. Test Plan
- Unit test will be added for min-cost flow graph algorithm and rack aware assignment
- Existing integration test will be updated to ensure it works with or without rack configuration as well as with both
client.rack
andrack.aware.assignment.tags
configurations - New integration test will be added to verify rack aware assignment
- Existing system test will be used to verify compatibility with old version
TaskAssignorConvergenceTest
will be updated to verify the new assignor will converge and it produces no worse assignment in terms of cross AZ cost- Performance test will be done manually to verify the efficiency of min-cost flow computation with large number of tasks and clients as well as different number of subscribed topic partitions
7. Rejected Alternatives
A. The follow algorithm for standby assignment is rejected
Prefer reliability and then find optimal cost
We could also first find a more reliable standby assignment and then if we have a certain number of reliable standby assignments. We switch to prefer to minimize cost. For example, if we would like at least 1 standby to be in a different rack from the active, when we assign the first standby, we can make clients which are in a different rack from the active client have less cost. We can adjust the cost function as below:
Code Block |
---|
int getCost(Task t, Client c) {
final int TRAFFIC_WEIGHT = 1;
final int SAME_RACK_WEIGHT = 10;
int cost = 0;
// add weights for cross-AZ traffic
for TopicPartion tp : t.topicPartitions() {
if (tp has no replica in same rack as c) {
cost += TRAFFIC_WEIGHT;
}
}
// add weights for assignment with different racks
for (Client c1 which has t assigned) {
if (c is in same rack as other clients which has t) {
cost += SAME_RACK_WEIGHT;
}
}
} |
After we assign the first standby to different racks as much as possible, we adjust the cost function to give more weight for TRAFFIC_WEIGHT
so that we prefer low traffic assignments. This option is rejected because it's too complex. We need to run min-cost flow several times and it's not clear we could get a valid assignment solution each timeNo rejected alternatives yet.