...
Code Block |
---|
package org.apache.kafka.streams; public class StoreLagInfo { // Estimate of number messages the store is behind by from the tail of the changelog private long messageLagEstimateoffsetLagEstimate; // Estimate of number of ms the store is behind from the tail of the changelog. private long timeLagEstimateMs; // Store name private String storeName; // Changelog topic partition private TopicPartition topicPartition; ... // standard getters/setters will be added } |
...
Code Block |
---|
// Global map containing latest lag information across hosts. This is collected by the Streams application instances outside of Streams, just relying on the local lag APIs in each. // Exact communication mechanism is left to the application. Some options : a gossip protocol, maintain another single partition Kafka topic to implement lag broadcast final Map<HostInfo, List<StoreLagInfo>> globalLagInformation; // Key which needs to be routed final K key; // Store to be queried final String storeName; // Fetch the metadata related to the key KeyQueryMetadata queryMetadata = allMetadataForKey(store, key, serializer); // Acceptable lag for the query final long acceptableMessageLagacceptableOffsetLag = 10000; // Stream of active + standby metadata Stream<StreamsMetadata> allHostMetadata = Stream.concat( Stream.of(queryMetadata.activeMetadata()).stream(), queryMetadata.standbyMetadata().stream()) // filter out all the hosts with more than acceptable lag for the TopicPartition where the key resides using queryMetadata.topicPartition() List<StreamsMetdata> inSyncHostMetadata = hostMetadata.filter(metadata -> { StoreLagInfo lagForHostPartition = globalLagInforation.get(metadata.hostInfo()).stream() .filter(lagInfo -> lagInfo.storeName().equals(storeName) && lagInfo.topicPartition().equals(queryMetadata.topicPartition())) .findAny().get() return lagForHostPartition.messageLagEstimateoffsetLagEstimate() < acceptableMessageLagacceptableOffsetLag; }).collect(Collectors.toList()); // Proceed to query, as documented today query(store, key, inSyncHostMedata); |
...