DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In FIP-8: Support Cluster Rebalance, we introduced cluster rebalance, but this mechanism was initially designed specifically for Log Tables. However, PrimaryKey Tables come with significant limitations in rebalance.
Key Limitations with Log Tables vs. PrimaryKey Tables
| Aspect | Log Tables | PrimaryKey Tables |
|---|---|---|
| Replication | 3 replicas (high availability) | No built-in replica concept |
| Leader Switch Speed | Milliseconds (instantaneous) | Seconds or Minutes (Slow) |
| Recovery Process | - | 1.Apply latest kv snapshot 2.Start RocksDB instance 3.Apply logs |
| User impact | - | Potentially significant downtime |
To ensure rebalancing does not disrupt user reading/writing on PrimaryKey tables, we need a dedicated rebalance mechanism for PrimaryKey Tables
Public Interfaces
To support rebalance for PrimaryKey Tables, this FIP introduces a concept of a HotStandbyReplica. A HotStandbyReplica refers to a non-leader replica of a PrimaryKey Table that maintains identical RocksDb data as the leader. It directly writes fetched logs into its active RocksDB instance. During leader switch, it can seamlessly switch to become the new leader and immediately provide read/write services within milliseconds interruption.
To support HotStandbyReplica and rebalance for PrimaryKey Table, the following basic concepts and interfaces will be introduced:
- Add variables
List<Integer> hotStandbyReplicasandList<Integer> issinLeaderAndIsrand related ZkData. - Add fields
hot_standby_replicasandissinPbNotifyLeaderAndIsrReqForBucketofNotifyLeaderAndIsrRequest - Add field
kv_applied_offsetinPbFetchLogReqForBucketofFetchLogRequest - Add field
new_issinPbAdjustIsrReqForBucketofAdjustIsrRequest - Add fields
hot_standby_replicasandissinPbAdjustIsrRespForBucketofAdjustIsrResponse
1. ZkData change of LeaderAndIsr
Add variables Integer hotStandbyReplica and List<Integer> iss in LeaderAndIsr
The original znode content example is as follow:
{
"version":1,
"leader":1,
"leader_epoch":10,
"isr":[1,2,3],
"coordinator_epoch":1000
};
The new znode content example is as follow, iss means inSyncStandby replica:
{
"version":2,
"leader":1,
"leader_epoch":10,
"isr":[1,2,3],
"coordinator_epoch":1000,
"hot_standby_replica":1, # The latest hotStandbyReplica.
"iss":[1] # The latest inSyncStandby replica collection.
};
2. FlussApis change
2.1 NotifyLeaderAndIsrRequest
message PbNotifyLeaderAndIsrReqForBucket {
required PbPhysicalTablePath physical_table_path = 1;
required PbTableBucket table_bucket = 2;
required int32 leader = 3;
required int32 leader_epoch = 4;
repeated int32 replicas = 5 [packed = true];
repeated int32 isr = 6 [packed = true];
required int32 bucket_epoch = 7;
optional int32 hot_standby_replica = 8 [packed = true]; //newly added. The latest hotStandbyReplica.
repeated int32 iss = 9 [packed = true]; // newly added. The latest inSyncStandby replica collection.
}
2.2 FetchLogRequest
message PbFetchLogReqForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
// TODO leader epoch
required int64 fetch_offset = 3;
required int32 max_fetch_bytes = 4;
optional int64 kv_applied_offset = 5; // newly added. The offset indicates the kv applied endOffset.
}
2.3 AdjustIsrRequest
message PbAdjustIsrReqForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
required int32 leader_epoch = 3;
repeated int32 new_isr = 4 [packed = true];
required int32 coordinator_epoch = 5;
required int32 bucket_epoch = 6;
// inSyncStandbyReplicas
repeated int32 new_iss = 7 [packed = true]; // newly added. The newly inSyncStandby replica collection.
}
2.4 AdjustIsrResponse
message PbAdjustIsrRespForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
optional int32 error_code = 3;
optional string error_message = 4;
optional int32 leader_id = 5;
optional int32 leader_epoch = 6;
repeated int32 isr = 7 [packed = true];
optional int32 bucket_epoch = 8;
optional int32 coordinator_epoch = 9;
optional int32 hot_standby_replica = 10 [packed = true]; // newly added.
repeated int32 iss = 11 [packed = true]; // newly added. The newly inSyncStandby replica collection. }
Proposed Changes
To support rebalance for PrimaryKey Table, the example execution process is illustrated in the following:
The case is the the RebalanceManager generates a rebalance task to change the replica assignment of the PrimaryKey table with tableId=1 from [0,1,2] to [1,2,3], where the original leader is 0, and the new leader is 1. For details on how the rebalance task is generated, please refer to FIP-8: Support Cluster Rebalance. The diagram below illustrates the execution process of this rebalance task:
1.submit task
In this step, RebalanceManager register a rebalance task to the CoordinatorProcessor. Upon registration, the system will execute the first step: adjust both the assignment and LeaderAndIsr. Using the previous example, this involves modifying the original assignment from [0,1,2] to [0,1,2,3] while keeping the same leader 0, and adding 1 as HotStndbyReplica. This approach allows node 3 to join as a regular follower while simultaneously promoting node 1 to HotStndbyReplica.
2.Send NotifyLeaderAndIsrRequest
Then, the Coordinator will send a NotfifyLeaderAndIsrRequest to the corresponding TabletServers: [0,1,2,3]. Here we'll focus on analyzing TabletServer-0 and TabletServer-1. In TabletServer-0, replica-0 will remain as the leader and won't need to perform any actions. Meanwhile, replica-1 in TabletServer-1 will enter the becomeHotStandby process.
3/4. BecomeHotStandby (Download KvSnapshot and apply remote/local log)
In this step, the replica will fetch the latest kvSnapshot, start RocksDB, and update its kvApplieOffset parameter to match the LogEndOffset recorded in the latest kvSnapshot. Then it proceeds to apply logs up to the replica's current LogEndOffset. During this process, kvApplieOffset gets updated with each LogSegment applied.
5.FetchLog from leader with kv_applied_offset
While the HotStandbyReplica is loading the complete KV data during the BecomeHotStandby process, the FetchLogRequest send from HotStandbyReplica to leader will also carry the updated kvAppliedOffset. This enables the leader to determine whether the HotStandbyReplica has caught up based on the kvApplieOffset, and whether it can be added to the ISS(InsyncStandby) set. If eligible, the leader will send an AdjustIsrRequest to the Coordinator; if not, the replica won't be included in the ISS(InsyncStandby) set.
6.Apply fetched log
The response of FetchLogRequest now includes an additional check to verify whether the HotStandbyReplica‘s RocksDB data has caught up with the current log data, compared to the previous logic. If confirmed, the log data will be written to RocksDB before being persisted to disk. This ensures the HotStandbyReplica‘s KV data and log data remain consistent with the leader's, preparing for potential leader switchover.
7.Send AdjustIsrRequest
When the HotStandbyReplica catches up with the leader, the leader will send an AdjustIsrRequest to the Coordinator. In addition to setting the ISR to [0,1,2,3], the ISS(InsyncStandby) will also be updated to [1]. Upon receiving this request, the Coordinator recognizes that the HotStandbyReplica is ready to become the new leader. The Coordinator then sends a new NotfifyLeaderAndIsrRequest request to promote replica 1 as the leader, while simultaneously sending a StopReplicaRequest to replica 0 to take it offline. This completes the full rebalance process for the PrimaryKey Table.
Compatibility, Deprecation, and Migration Plan
The proposed change is backward compatible
What's Not Included in This FIP?
- This FIP only introduces the concept of hotStandby for rebalance of PrimaryKey Tables, and does not involve the design of a general-purpose KV backup solution.
Future Work
- Introducing a generic KV hot standby backup solution that maintains compatibility with existing implementations.
Test Plan
Unit tests and IT tests will be added to validate the changes. Cross-testing on real clusters and publishing the test report will be completed before the release of Fluss-0.8.
Rejected Alternatives
N/A
