Document the state by adding a label to the FIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FIP-8: Support Cluster Rebalance, we introduced cluster rebalance, but this mechanism was initially designed specifically for Log Tables. However, PrimaryKey Tables come with significant limitations in rebalance.

Key Limitations with Log Tables vs. PrimaryKey Tables

AspectLog TablesPrimaryKey Tables
Replication3 replicas (high availability)No built-in replica concept
Leader Switch SpeedMilliseconds (instantaneous)Seconds or Minutes (Slow)
Recovery Process-

1.Apply latest kv snapshot

2.Start RocksDB instance

3.Apply logs 

User impact-Potentially significant downtime

To ensure rebalancing does not disrupt user reading/writing on PrimaryKey tables, we need a dedicated rebalance mechanism for PrimaryKey Tables

Public Interfaces

To support rebalance for PrimaryKey Tables, this FIP introduces a concept of a HotStandbyReplica. A HotStandbyReplica refers to a non-leader replica of a PrimaryKey Table that maintains identical RocksDb data as the leader. It directly writes fetched logs into its active RocksDB instance. During leader switch, it can seamlessly switch to become the new leader and immediately provide read/write services within milliseconds interruption.


To support HotStandbyReplica and rebalance for PrimaryKey Table, the following basic concepts and interfaces will be introduced:

  1. Add variables List<Integer> hotStandbyReplicas and List<Integer> iss in LeaderAndIsr and related ZkData. 
  2. Add fields hot_standby_replicas and iss in PbNotifyLeaderAndIsrReqForBucket of NotifyLeaderAndIsrRequest
  3. Add field kv_applied_offset in PbFetchLogReqForBucket of FetchLogRequest
  4. Add field new_iss in PbAdjustIsrReqForBucket of AdjustIsrRequest
  5. Add fields hot_standby_replicas and iss in PbAdjustIsrRespForBucket of AdjustIsrResponse

1. ZkData change of LeaderAndIsr

Add variables Integer hotStandbyReplica and List<Integer> iss in  LeaderAndIsr

The original znode content example is as follow:

{
	"version":1,
    "leader":1,
    "leader_epoch":10,
    "isr":[1,2,3],
    "coordinator_epoch":1000
};

The new znode content example is as follow, iss means inSyncStandby replica:

{
	"version":2,
    "leader":1,
    "leader_epoch":10,
    "isr":[1,2,3],
    "coordinator_epoch":1000,
    "hot_standby_replica":1,  # The latest hotStandbyReplica.
    "iss":[1] # The latest inSyncStandby replica collection.
};

2. FlussApis change

2.1 NotifyLeaderAndIsrRequest

PbNotifyLeaderAndIsrReqForBucket
message PbNotifyLeaderAndIsrReqForBucket {    
  required PbPhysicalTablePath physical_table_path = 1;
  required PbTableBucket table_bucket = 2;
  required int32 leader = 3;
  required int32 leader_epoch = 4;
  repeated int32 replicas = 5 [packed = true];
  repeated int32 isr = 6 [packed = true];
  required int32 bucket_epoch = 7;
  optional int32 hot_standby_replica = 8 [packed = true];  //newly added. The latest hotStandbyReplica.
  repeated int32 iss = 9 [packed = true];   // newly added. The latest inSyncStandby replica collection.
}

2.2 FetchLogRequest

PbFetchLogReqForBucket
message PbFetchLogReqForBucket {
  optional int64 partition_id = 1;
  required int32 bucket_id = 2;
  // TODO leader epoch
  required int64 fetch_offset = 3;
  required int32 max_fetch_bytes = 4;
  optional int64 kv_applied_offset = 5; // newly added. The offset indicates the kv applied endOffset.
}

2.3 AdjustIsrRequest

PbAdjustIsrReqForBucket
message PbAdjustIsrReqForBucket { 
  optional int64 partition_id = 1;
  required int32 bucket_id = 2;
  required int32 leader_epoch = 3;
  repeated int32 new_isr = 4 [packed = true];
  required int32 coordinator_epoch = 5;
  required int32 bucket_epoch = 6;
  // inSyncStandbyReplicas
  repeated int32 new_iss = 7 [packed = true]; // newly added. The newly inSyncStandby replica collection.
}

2.4 AdjustIsrResponse

PbAdjustIsrRespForBucket
message PbAdjustIsrRespForBucket {
  optional int64 partition_id = 1;
  required int32 bucket_id = 2;
  optional int32 error_code = 3;
  optional string error_message = 4;
  optional int32 leader_id = 5;
  optional int32 leader_epoch = 6;
  repeated int32 isr = 7 [packed = true];
  optional int32 bucket_epoch = 8;
  optional int32 coordinator_epoch = 9;
  optional int32 hot_standby_replica = 10 [packed = true];  // newly added.
  repeated int32 iss = 11 [packed = true]; // newly added. The newly inSyncStandby replica collection. }

Proposed Changes

To support rebalance for PrimaryKey Table, the example execution process is illustrated in the following:

The case is the the RebalanceManager generates a rebalance task to change the replica assignment of the PrimaryKey table with tableId=1 from [0,1,2] to [1,2,3], where the original leader is 0, and the new leader is 1. For details on how the rebalance task is generated, please refer to FIP-8: Support Cluster Rebalance. The diagram below illustrates the execution process of this rebalance task:

1.submit task

In this step, RebalanceManager register a rebalance task to the CoordinatorProcessor. Upon registration, the system will execute the first step: adjust both the assignment and LeaderAndIsr. Using the previous example, this involves modifying the original assignment from [0,1,2] to [0,1,2,3] while keeping the same leader 0, and adding 1 as HotStndbyReplica. This approach allows node 3 to join as a regular follower while simultaneously promoting node 1 to HotStndbyReplica.

2.Send NotifyLeaderAndIsrRequest

Then, the Coordinator will send a NotfifyLeaderAndIsrRequest to the corresponding TabletServers: [0,1,2,3]. Here we'll focus on analyzing TabletServer-0 and TabletServer-1. In TabletServer-0, replica-0 will remain as the leader and won't need to perform any actions. Meanwhile, replica-1 in TabletServer-1 will enter the becomeHotStandby process.

3/4. BecomeHotStandby (Download KvSnapshot and apply remote/local log)

In this step, the replica will fetch the latest kvSnapshot, start RocksDB, and update its kvApplieOffset parameter to match the LogEndOffset recorded in the latest kvSnapshot. Then it proceeds to apply logs up to the replica's current LogEndOffset. During this process, kvApplieOffset gets updated with each LogSegment applied.  

5.FetchLog from leader with kv_applied_offset

While the HotStandbyReplica is loading the complete KV data during the BecomeHotStandby process, the FetchLogRequest send from HotStandbyReplica to leader will also carry the updated kvAppliedOffset. This enables the leader to determine whether the HotStandbyReplica has caught up based on the kvApplieOffset, and whether it can be added to the ISS(InsyncStandby) set. If eligible, the leader will send an AdjustIsrRequest to the Coordinator; if not, the replica won't be included in the ISS(InsyncStandby) set.

6.Apply fetched log

The response of FetchLogRequest now includes an additional check to verify whether the HotStandbyReplica‘s RocksDB data has caught up with the current log data, compared to the previous logic. If confirmed, the log data will be written to RocksDB before being persisted to disk. This ensures the HotStandbyReplica‘s KV data and log data remain consistent with the leader's, preparing for potential leader switchover.

7.Send AdjustIsrRequest

When the HotStandbyReplica catches up with the leader, the leader will send an AdjustIsrRequest to the Coordinator. In addition to setting the ISR to [0,1,2,3], the ISS(InsyncStandby) will also be updated to [1]. Upon receiving this request, the Coordinator recognizes that the HotStandbyReplica is ready to become the new leader. The Coordinator then sends a new NotfifyLeaderAndIsrRequest request to promote replica 1 as the leader, while simultaneously sending a StopReplicaRequest to replica 0 to take it offline. This completes the full rebalance process for the PrimaryKey Table.

Compatibility, Deprecation, and Migration Plan

The proposed change is backward compatible

What's Not Included in This FIP?

  1. This FIP only introduces the concept of hotStandby for rebalance of PrimaryKey Tables, and does not involve the design of a general-purpose KV backup solution.

Future Work

  1. Introducing a generic KV hot standby backup solution that maintains compatibility with existing implementations.

Test Plan

Unit tests and IT tests will be added to validate the changes. Cross-testing on real clusters and publishing the test report will be completed before the release of Fluss-0.8.

Rejected Alternatives

N/A

  • No labels