Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • StreamsMetadata

Proposed Changes

In the current code, t0 and t1 serve queries from Active(Running) partition. For case t2, we are planning to return List<StreamsMetadata> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the replica on B can serve the data by enabling serving from replicas. This still does not solve case t3 and t4 since B has been promoted to active but it is in Restoring state to catchup till A’s last committed position as we don’t serve from Restoring state in active and new replica on R is building itself from scratch. Both these cases can be solved if we start serving from Restoring state of active as well, since it is almost equivalent to previous active.

There could be a case where all replicas of a partition become unavailable and active and all replicas of that partition are building themselves from scratch, in this case state in active is far behind even though it is in Restoring state. To cater to such cases that we don’t serve from this state we can either add another state before Restoring or check difference between last committed offset and current position. Only if it is within a permissible  range(say 10000) we will serve from Restoring state of active.

  • AssignmentInfo changes to include Map<HostInfo, Set<TopicPartition>> replicaPartitionsByHost;  so that each machine knows which machine holds which replica partitions along with the active partitions(which they currently do).

  • Changing singnature of setPartitionsByHostState(partitionsByHostState, replicaPartitionsByHost) and to onChange() and further rebuildMetadata() to add Set<TopicPartition> replicaTopicPartitions in StreamsMetadata. This will add replica partitions in the metadata.

  • To serve queries from this we will add a flag in hostWithPartitionForKey*StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing, String storeName, K key, Serializer<K> keySerializer) to get from users if they are okay to serve from replicas, default would be false. Then it is just a matter of getting the host via getHostWithPartitionForKey() where This function returns a list of StreamsMetadata which contains Active first and then all the replicas in the system for the partition. In allMetadataForKey we would have access to allMetadata containing activePartitions as well as ReplicaPartitions. The decision to serve from either replica or active if a user with okay with serving from replicas can be chosen randomly to share the load across machines(Let me know if you have better thoughts).

Open Questions

  • What if num.standby.replicas=0 or >1

...

This KIP affects StreamsMetadata and AssignmentInfo. Currently, StreamsMetadata is not following any versioning so we might have to upgrade the version for it. Also, since this would include AssignmentInfo changes to add replicaPartitionsByHost, we would need to upgrade AssignmentInfo as well.

Rejected Alternatives

  • Serving data from Active machine during Restoration State. None so far.