Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the standby on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.There could be a case where all standby of a partition become unavailable and active and all replicas of that partition are building themselves from scratch, in this case state in active is far behind even though it is in Restoring state. To cater to such cases that we don’t serve from this state we can either add another state before Restoring or check difference between last committed offset and current position. Only if it is within a permissible range (say 10000) we will serve from Restoring state of active

The new plan is to enhance the serving query capabilities to include standbys as well and have minimal changes in the core streams code. Initially, when a query comes a user can request for streams Metadata holding the active task(where the key resides) and then query for the key on that host which will return back the response along with the record lag(0 in case of Running tasks) and time lag for the specific store being queried. If such a query fails due to Rebalancing/Node-unavailable user can decide to query streams Metadata holding standby tasks for the partition(where the key resides). This will just return a list of all available standby’s in the system and the user can make IQ query any of those nodes which will return the response, and the record lag and time lag. Based on which user can decide if the user wants to return the response back or call another standby.

  • AssignmentInfo changes to include Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;  so that each machine knows which machine holds which standby partitions along with the active partitions(which they currently do).

  • Changing singnature of setPartitionsByHostState(partitionsByHostState, standbyPartitionsByHost) and to onChange() and further rebuildMetadata() to add Set<TopicPartition> standbyTopicPartitions in StreamsMetadata. This will add standby partitions in the metadata.

  • To serve queries from this we will add a flag in *Addition of StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing, String storeName, K key, Serializer<K> keySerializer) to get from users if they are okay to serve from standbys, default would be false. This function getStandbyMetadataListForKey() to returns a list of StreamsMetadata which contains Active first and then all the standbys available in the system for the partition. In allMetadataForKey we We would have access to allMetadata containing activePartitions as well as ReplicaPartitions in the StreamsMetadataState.

We need to distinguish between genuine state Restoration (in case a replica becomes active) and Buildup of state from Scratch since we can serve data from the prior but not from the latter.  Let's discuss this case by case basis.

...

num.standby.replicas=0 : The system should be unavailable for reads even if there is 1 failure. Since now this partition has to recreate the whole state from scratch.

 num.standby.replicas=1 : In case of 1 failure, we can serve from the available replica (if it doesn't lag too much) till the available replica is promoted to active and from this point we can start serving from this new active(which will undergo state changes from restoring to running).

  • java with the above changes.

With this KIP, the onus is left on a user of how much lag they are okay to serve queries with. Since, after this KIP there is the capability of serving from a restoring active as well as running standby task and each response returns the lag along with the actual value, so a user can either decide to discard it or return it back to the client. 

...

Future Enhancements

Once KIP-441 is completed, the lag on each of the standby would be available in the assignment itself. This would enable us to add only those standby's in the metadata which have lag less than 10000 from the active as well send a sorted list back as a result which contains active node first and then the standbys sorted by freshness of data they have.

Compatibility, Deprecation, and Migration Plan

...