DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: TODO
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
We have the RemoteClusterUtils API to enable users to perform some operations of their MirrorMaker environment. One method, translateOffsets(), allows to manually translate consumer group offsets. This is useful when MirrorCheckpointConnector is not configured to automatically sync offsets or when dealing with a disaster and unplanned failover. This method works by fully reading the checkpoints topic which is populated by MirrorCheckpointConnector, so this can be an expensive call and it can take a long time depending on the size of the topic. This method takes a single consumer group id as input, so it makes it unsuitable in cases when the offsets of several consumer groups have to be translated as each call re-reads the full topic.
Since the full topic is read anyway, we should have a method to translate the committed offsets of multiple consumer groups at the same time.
Public Interfaces
A new translateOffsets() method in RemoteClusterUtils, that takes a set of consumer group Ids:
/**
* Translates remote consumer groups' offsets into corresponding local offsets. Topics are automatically
* renamed according to the configured {@link ReplicationPolicy}.
* @param properties Map of properties to instantiate a {@link MirrorClient}
* @param remoteClusterAlias The alias of the remote cluster
* @param consumerGroupIds The set of consumer group Ids
* @param timeout The maximum time to block when consuming from the checkpoints topic
*/
public static Map<String, Map<TopicPartition, OffsetAndMetadata>> translateOffsets(Map<String, Object> properties, String remoteClusterAlias, Set<String> consumerGroupIds, Duration timeout) {
}
The matching method in MirrorClient:
/**
* Translates remote consumer groups' offsets into corresponding local offsets. Topics are automatically
* renamed according to the ReplicationPolicy.
* @param consumerGroupIds The set of consumer group Ids
* @param remoteClusterAlias The alias of remote cluster
* @param timeout The maximum time to block when consuming from the checkpoints topic
*/
public Map<String, Map<TopicPartition, OffsetAndMetadata>> remoteConsumerOffsets(Set<String> consumerGroupIds, String remoteClusterAlias, Duration timeout) {
}
For both methods, in case there are no offsets for a group, the Map will still contain an entry with the group Id as the key but the value will be an empty Map.
Proposed Changes
The existing MirrorClient.remoteConsumerOffsets() will invoke the new method with a Set containing just a single consumer group Id.
Compatibility, Deprecation, and Migration Plan
- This is adding new methods. There are no changes to existing methods or logic.
Test Plan
The new methods will be tested using unit tests. This will require some refactoring in MirrorClient to make the remoteConsumerOffsets() method testable (we currently don't have tests for that method).
Rejected Alternatives
- Deprecate the existing RemoteClusterUtils.translateOffsets() and MirrorClient.remoteConsumerOffsets() methods: Internally these will use the same logic as the new methods, so we can keep them.