Status
Current state: Accepted
Discussion thread: here
Vote: here
JIRA: KAFKA-17200
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Note: The original name of the KIP was changed from "Make the replication of internal topics configurable" to "Allow the replication of user internal topics" as it better describes the latest proposal and the goal in general.
Motivation
In the current Mirror Maker 2 implementation, topics ending in ".internal" or "-internal" cannot be replicated as they are considered connect / mm2 internal topics. In some cases, users have business topics ending in ".internal" or "-internal" that are excluded from the replication for the same reason. This is because of two things:
(1) The ReplicationPolicy interface explicitly excludes topics from the replication that seem internal based on the following rules:
/** Internal topics are never replicated. */ default boolean isInternalTopic(String topic) { boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith("."); boolean isDefaultConnectTopic = topic.endsWith("-internal") || topic.endsWith(".internal"); return isMM2InternalTopic(topic) || isKafkaInternalTopic || isDefaultConnectTopic; }
(2) The topic filter excludes internal topics from the replication by default:
public static final String TOPICS_EXCLUDE_CONFIG_ALIAS = "topics.blacklist"; private static final String TOPICS_EXCLUDE_DOC = "List of topics and/or regexes that should not be replicated.";
While the exclude list of the topic filter is configurable, the ReplicationPolicy interface cannot be configured in a way to enable replicating such topics. Currently, if a user already has business topics ending in for example ".internal" or "-internal", the only option is to implement a custom replication policy and override the isInternalTopic method. The goal of this proposal is to make this behaviour configurable.
Public Interfaces
This KIP proposes to change the ReplicationPolicy interface, and make the filtering of internal topics more precise as described in the "Proposed Changes" section.
With changing the implementation of the interface, this would also affect implementing classes:
- DefaultReplicationPolicy
- IdentityReplicationPolicy
The default exclude list of the DefaultTopicFilter will use the same, more specific pattern.
Proposed Changes
This KIP proposes to make the condition for internal topics more specific in the ReplicationPolicy:
public interface ReplicationPolicy { ... default boolean isMM2InternalTopic(String topic) { // With this change, we only consider a topic to be mm2 internal if it // 1. starts with "mm2" and ends wit "internal", or // 2. it is a checkpoint topic (as by default, it ends with ".checkpoints.internal", but can be overwritten by implementing classes) return topic.startsWith("mm2") && topic.endsWith(".internal") || isCheckpointsTopic(topic); } default boolean isInternalTopic(String topic) { boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith("."); return isMM2InternalTopic(topic) || isKafkaInternalTopic; } }
And modify the implementation of the isMM2InternalTopic method in the DefaultReplicationPolicy, to include the configurable internal suffix:
public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable { ... @Override public boolean isMM2InternalTopic(String topic) { return topic.startsWith("mm2") && topic.endsWith(internalSuffix()) || isCheckpointsTopic(topic); } }
The exclude list of the DefaultTopicFilter would also require some modification to reflect the new pattern:
public static final String TOPICS_EXCLUDE_DEFAULT = "mm2.*internal, .*\\.replica, __.*";
Compatibility, Deprecation, and Migration Plan
Backward Compatibility Considerations:
- Anyone who relies on the current behaviour to block the replication of already existing user topics ending in ".internal" or "-internal", might need to update the TopicFilter, as with this change these topics will not be explicitly excluded.
- Anyone who uses a custom ReplicationPolicy implementation might need to update their source code to get the same behaviour.
Test Plan
Beside unit tests, this change can be tested on two Kafka cluster, with setting up a replication between them.
Rejected Alternatives
Already existing "workarounds" :
1. Use non-conflicting names for user topics
This could only be a feasible option for newly created topics. In some cases it might cause too much overhead as the business / internal applications rely on this topic naming.
2. Use the replication.policy.separator to use a non-conflicting separator character
This is only an option for new setups, as already existing setups without setting this config would use the default .internal convention. The default implementation of the ReplicationPolicy would still filter out topics ending in '.internal' or '-internal' as it is hardcoded.
3. Use a custom ReplicationPolicy that overrides this behavior
This would be a feasible option for existing workloads too, but in my opinion this requires too much work for something that could possibly be controlled by a configuration property.
Solution proposals:
1. Add a new configuration property "internal.topic.replication.capability.enabled" that could control if the ReplicationPolicy should filter internal topics or not
In this solution, we would override the isInternalTopic method of the DefaultReplicationPolicy based on the mentioned new configuration property. If set to true, the policy would not filter internal topics, and the isInternalTopic method would return false, leaving it to the TopicFilter to filter out any topics that should not be replicated. If set to false, the original behavior is preserved.
Risks:
- May allow replication cycles
- Higher chances of connect / mm2 internal topics being replicated because of a wrong configuration.
2. Make the internal topics configurable with a regex.
In this solution, the ReplicationPolicy would consider a topic to be internal, if it matches a specific regex. The default regex would result in the same behavior as before, something like "__.*|\\..*|.*-internal|.*\\.internal". We would make this regex configurable by the user using a new configuration property called replication.policy.internal.topics. When users specify a value for this property, it overwrites the default behavior.
Risks:
- Easier to make mistakes resulting in inconsistent behavior
- It becomes possible for users to configure the DefaultReplicationPolicy class in a way that for some topic T, isInternalTopic(T) returns false, but isCheckpointsTopic(T), isHeartbeatsTopic(T), or isMM2InternalTopic(T) return true.