DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: KAFKA-20098 - Getting issue details... STATUS
Motivation
This KIP is intended to be a complement to the unclean recovery design proposal of KIP-966.
A partition can become in offline if all of the ISR / ELR replicas of a partition are offline. If this occurs, automatic leadership failover will not take place. Unclean recovery is defined as picking a leader for an offline partition from the replica set which was never in the ISR / ELR. Unclean recovery has the potential to lead to data loss. Data loss takes place when the new leader replica chosen during the recovery election has a lower offset than the high water mark (HWM) of the ISR / ELR replicas. The new leader will always truncate the logs of other replicas.
Currently, the KRaft quorum controller has the ability to automatically perform unclean recovery elections. This is configured by unclean.leader.election.enable. Users favouring availability over durability have the option to set unclean.leader.election.enable=true which allows the controller to randomly pick a new leader from live replicas every 5 minutes. Users favouring durability over availability will typically configure unclean.election.enable=false. In this case controller will do nothing and the partition will remain offline until an ELR / ISR partition recovers. Unclean recovery elections can be manually triggered using kafka-leader-election.sh using the --election-type=unclean flag.
In some cases, randomly picking a new leader may not be an ideal outcome. There are no guarantees log contents for non-ISR / ELR replicas. A replica more recently added to a replica set may not be as "caught up" as other replicas which have been in the replica set for longer. A pathological example would be a new replica added to the replica set after the partition became offline. This replica would have no data at all and if set as leader by unclean recovery there will be complete truncation of the log.
Implementation of this KIP will provide a way for operators to "intelligently" recover offline topic partitions during emergencies. It would assist users who run without automated unclean elections ( unclean.leader.election.enable = false ) but may desire an improved "break glass if needed" option during emergencies. We propose implementing a new command line tool kafka-unclean-recovery.sh which will assist in recovering offline partitions based on log length.
Public Interfaces
Implementation of kafka-unclean-recovery.sh requires implementation of subset of the RPC changes within KIP-966. These are the GetReplicaLogInfo request and the modifying ElectLeadersRequest to "designate" a specific replica as the new partition leader. Neither of these two RPCs are yet implemented by the kafka reference implementation.
kafka-unclean-recovery.sh
A new CLI tool which would be run by operators using:
kafka-unclean-recovery.sh <options>
// Optional values
--recovery-duration-ms <integer> A broker is required to respond with log information within this duration for it to be considered as a candidate for election.
Defaults to 30_000 (30 seconds) and represents time in milliseconds.
--recovery-election-attempts <integer> Number of retries of transient failures allowed for leader election requests.
Defaults to 3.
// At least 1 of these is required.
--show-replica-info Print a table showing replica info of targeted partitions. Can be used with any other arguments.
--manual-recovery-output-file <string> File path of a new file which will be created. The generated file will be capable of being an input to kafka-elect-leaders.sh.
It will contain topic-partitions and their designated leaders.
{
"partitions": [
{"topic": "foo", "partition": 1, "designatedLeader": 0},
{"topic": "foobar", "partition": 2, "designatedLeader": 1}
]
}
Mutually exclusive with the --automated-recovery flag.
--automated-recovery Automatically elect leaders with longest apparent logs by attempting designated leader elections.
// One of these two are required.
--path-to-json-file <String> Path to a JSON file containing a list of topic-partitions to attempt unclean recovery elections on.
Mutually exclusive with --all-offline-partitions.
Example:
{
"partitions": [
{
topic: "foo",
partitions: [0, 3, 5]
},
{
topic: "bar",
partitions: [0, 1, 4]
}
]
}
--all-offline-partitions Perform unclean recovery on all detected offline partitions.
kafka-elect-leaders.sh
Derived from KIP-966.
...
// Updated field starts.
--election-type <[PREFERRED, UNCLEAN, DESIGNATED]:
Type of election to attempt. Possible
election type> values are
"preferred" for preferred leader election
or "unclean" for a random unclean leader election.
or "designated" for electing the given replica("designatedLeader") to be the leader.
If preferred election is selection, the
election is only performed if the
current leader is not the preferred
leader for the topic partition. If unclean/designation
election is selected, the
election is only performed if there
are no leader for the topic
partition. REQUIRED.
--path-to-json-file <String: Path to The JSON file with the list of
JSON file> partition for which leader elections
should be performed. This is an
example format. The desiredLeader field
is only required in DESIGNATION election.
{"partitions":
[{"topic": "foo", "partition": 1, "designatedLeader": 0},
{"topic": "foobar", "partition": 2, "designatedLeader": 1}]
}
Not allowed if --all-topic-partitions
or --topic flags are specified.
// Updated field ends.
Designated Leader Elections
Derived from KIP-966.
ACL: CLUSTER_ACTION
Limit: 1000 partitions per request. If more than 1000 partitions are included, only the first 1000 will be served.
{
"apiKey": XX,
"type": "request",
"listeners": ["broker", "controller"],
"name": "ElectLeadersRequest",
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
...
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
"about": "The topic partitions to elect leaders.",
"fields": [
...
// New fields begin. The same level with the Partitions
{ "name": "DesignatedLeaders", "type": "[]int32", "versions": "3+", "nullableVersions": "3+",
"about": "The designated leaders. The entry should match with the entry in Partitions by the index." },
},
// New fields end.
] },
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "The time in ms to wait for the election to complete." }
]
}
GetReplicaLogInfo RPC
Originally defined in KIP-966.
Request
ACL: CLUSTER_ACTION, DESCRIBE (on a topic)
Limit: 1000 partitions per request. If more than 1000 partitions are included, only the first 1000 will be served.
{
"apiKey":XX,
"type": "request",
"listeners": ["broker"],
"name": "GetReplicaLogInfoRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
"about": "The topic partitions to query the log info for.",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID"},
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of this topic whose leader should be elected." }
]}
]
}
Response
Differences from KIP-966:
- Addition of HasMoreData field which is true if there were more than 1000 partitions. In that case we only return the first 1000 and set HasMoreData = true.
- Removal of LastWrittenLeaderEpoch which refers to the previous Raft Leader epoch. This is a safety property is not useful for a command line tool.
{
"apiKey":XX,
"type": "response",
"name": "GetReplicaLogInfoResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+",
"about": "The epoch of the broker." },
{ "name": "HasMoreData", "type": "bool", "versions": "0+",
"about": "True if response does not include all the topic partitions requested. Only the first 1000 topic partitions are returned."},
{ "name": "TopicPartitionLogInfoList", "type": "[]TopicPartitionLogInfo", "versions": "0+",
"about": "The list of the partition log info.",
"fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "PartitionLogInfo", "type": "[]PartitionLogInfo", "versions": "0+", "about": "The log info of a partition.",
"fields": [
{ "name": "Partition", "type": "int32", "versions": "0+", "about": "The id for the partition." },
{ "name": "PartitionLeaderEpoch", "type": "int32", "versions": "0+", "about": "The current leader epoch for the partition from the broker point of view." },
{ "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The log end offset for the partition." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The result error, or zero if there was no error."}
]}
]}
]
}
Proposed Changes
For the tool to work as intended, GetReplicaLogInfo RPC and designated leadership extension to ElectLeaders RPC are needed. While it is not explicitly specified in KIP-966, designated leadership must have specific safety properties for the tool to operate safely. Designated leader elections should only be allowed if the partition is offline (IE has no leader). A broker may only be designated a leader of a partition if it is in the replica set, is unfenced and has an online replica. This behaviour is consistent with the prerequisites for the existing "unclean" leadership elections. GetReplicaLogInfo requests ask the broker for the length of its replica of a given partition. Note GetReplicaLogInfo in this KIP is slightly different from the one specified in KIP-966.
kafka-unclean-recovery.sh will accept an input set of topic-partitions to attempt to recover. It will send GetReplicaLogInfoRequests to each of the brokers to figure out their log lengths. It will keep retrying these requests until it hits a configurable deadline. From the GetReplicaLogInfoResponse's it has received it will attempt to figure out the best leader for each topic partition by selecting the replica with the highest epoch and then longest log length. After the best "potential" candidate has been selected, the tool will either send leader election requests to the controller or output a file which can be used as an input to the kafka-elect-leader.sh tool with the intention that the operator can review/modify the requests and/or send them manually. GetReplicaLogInfo RPC request can be amoritized by broker but this is an optimization and not required for correctness.
kafka-unclean-recovery.sh will attempt a "best-effort" approach and try to complete as many elections as it can within a time limit (or until the operator provides a SIGTERM). It should be expected that not all partitions in the input set will be "recovered" successfully. Possible failure modes are that either ElectLeadersRequest will fail or no successful GetReplicaLogInfoResponse was received in time for a given partition. Since designated leadership elections are idempotent (once a leader is elected, it is not possible to use designated leadership elections again until partition is again offline) operators can confidently run the tool multiple times. A zero exit status will be reported only if all elections for the input set are successful. Any failed elections will be logged to stderr.
There are some caveats to using a command line tool for this system.
The network connection between the operator running kafka-unclean-recovery.sh and network connection internal to the cluster, between controllers and brokers, is likely to be different. For example, the operator may receive a GetReplicaLogInfo response from a broker which is fenced by the controller. Since designated leader elections can only elect unfenced brokers as leaders, it is not a correctness concern if the kafka-unclean-recovery.sh attempts to designate a leader which the controller still believes to be offline. To aid the operator, the tool can have a "dry-run" mode which emits a JSON file compatible with kafka-elect-leaders.sh as well as a summary of the log-lengths of the different replicas in the input set of partitions. This would allow the operator to manually alter the generated ElectLeaderRequest to include a suitable candidate based on known offline partitions.
A second problem might be that between the tool receiving a GetReplicaLogInfoResponse and sending the ElectLeaderRequest, an offline ELR replica may become online again and be elected leader. Since designated leader elections can only affect offline partitions this does not pose a correctness concern, in this case the tool will report that it found a partition already online.
Compatibility, Deprecation, and Migration Plan
This approach is purely additive and is not incompatible with any existing RPCs.
Test Plan
A system test could be setup to run in the following way:
- Spins up a test cluster
unclean.leader.election=falseand create a partition. Produce to it. - Terminate all brokers in the ISR with unclean shutdowns (`kill -9`) and ensure that the entire ISR is offline.
- Assert that the test partition is offline.
- Record the replica set.
- Get kafka-unclean-recovery.sh to perform a designated leadership election using --automated-recovery flag.
- Assert that the test partition is online and is using a broker from the non-ISR replica set.
Rejected Alternatives
An attempt was made to implement this feature within the controller as part of KIP-966. KIP-966 proposed adding a new "unclean.recovery.strategy" configuration to the controller which contained two new "automated" election strategies - "Balanced" and "Aggressive". Both new strategies would allow recovery by discovering the replicas with longest logs. The new configuration would operate in a similar fashion to "unclean.election.enable" - IE it would be initiated by the controller every 5 minutes. The active controller, due to being the source of truth of cluster metadata, is best positioned to automatically initiate unclean-recoveries as it is immune to a network split between the cluster and operators. However implementation within the controller carries some caveats.
In normal operating cases, failover to another controller will happen automatically. Unclean recovery is intended to be used during emergencies where it is possible that the quorum may be partially degraded or the controller is extremely busy keeping track of chaos within the cluster. Adding new unclean recoveries to the controller adds complexity to the logic and operation of a sensitive aspect of cluster operation (leadership election) during emergencies. "unclean.election.enable=true" users would see "unclean.recovery.strategy" as a significant improvement over the current "random" algorithm to justify the complexity. For unclean recovery which is not initiated by the controller, there is not an obvious advantage provided given safety guarantees of designated leadership elections.
The KRaft controller operates in a "pull" model - Raft observers send fetch requests to the controller to receive up to date information. The controller handling unclean recovery would have required management of many GetReplicaLogInfo requests in a "push" model which could raise concerns about memory usage and could affect quota calculations. Further, the ElectLeaderRequest API is intended to be "synchronous" with respect to replication through KRaft. Allowing "longest log" recovery elections would have subtly changed this API contract as it now needs to wait for a more complex network requests sent out to brokers in comparison to just waiting for "normal" KRaft replication.
KIP-966 proposed adding a new "unclean.recovery.strategy" configuration to the controller. The the new "Balanced" and "Aggressive" election configurations add a larger API surface - with 3 additional election types becoming possible. The new configurations add ambiguity to unclean.election.enable configuration which would need to live with the new unclean.recovery.strategy. The command line tool approach only requires adding designated elections and moves the question of how an election should work outside of the controller and the impetus to add additional recovery strategies. In theory, users can implement their own specific strategy using GetReplicaLogInfo and designated leadership elections - there is no need for the controller to pick or have a specific strategy in mind.
If the kafka-unclean-recovery.sh tool fails catastrophically then this has no effect on the liveness of the controller.