Status

Current state: Under Discussion

Discussion thread: here

Vote: here

JIRA: KAFKA-17200 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note: The original name of the KIP was changed from "Make the replication of internal topics configurable" to "Allow the replication of user internal topics" as it better describes the latest proposal and the goal in general.

Motivation

In the current Mirror Maker 2 implementation, topics ending in ".internal" or "-internal" cannot be replicated as they are considered connect / mm2 internal topics. In some cases, users have business topics ending in ".internal" or "-internal" that are excluded from the replication for the same reason. This is because of two things:

(1) The ReplicationPolicy interface explicitly excludes topics from the replication that seem internal based on the following rules:

ReplicationPolicy#isInternalTopic
    /** Internal topics are never replicated. */
    default boolean isInternalTopic(String topic) {
        boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".");
        boolean isDefaultConnectTopic =  topic.endsWith("-internal") ||  topic.endsWith(".internal");
        return isMM2InternalTopic(topic) || isKafkaInternalTopic || isDefaultConnectTopic;
    }


(2) The topic filter excludes internal topics from the replication by default:

DefaultTopicFilter
    public static final String TOPICS_EXCLUDE_CONFIG_ALIAS = "topics.blacklist";
    private static final String TOPICS_EXCLUDE_DOC = "List of topics and/or regexes that should not be replicated.";     


While the exclude list of the topic filter is configurable, the ReplicationPolicy interface cannot be configured in a way to enable replicating such topics. Currently, if a user already has business topics ending in for example ".internal" or "-internal", the only option is to implement a custom replication policy and override the isInternalTopic method. The goal of this proposal is to make this behaviour configurable.

Public Interfaces

This KIP proposes to change the ReplicationPolicy interface, and make the filtering of internal topics more precise as described in the "Proposed Changes" section.

With changing the implementation of the interface, this would also affect implementing classes:

  • DefaultReplicationPolicy
  • IdentityReplicationPolicy

The default exclude list of the DefaultTopicFilter will use the same, more specific pattern.

Proposed Changes

This KIP proposes to make the condition for internal topics more specific in the ReplicationPolicy:

ReplicationPolicy.java
public interface ReplicationPolicy {
    ...

    default boolean isMM2InternalTopic(String topic) {
        // With this change, we only consider a topic to be mm2 internal if it
        // 1. starts with "mm2" and ends wit "internal", or
        // 2. it is a checkpoint topic (as by default, it ends with ".checkpoints.internal", but can be overwritten by implementing classes)
        return  topic.startsWith("mm2") && topic.endsWith(".internal")  || isCheckpointsTopic(topic);
    }

    default boolean isInternalTopic(String topic) {
        boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".");
        return isMM2InternalTopic(topic) || isKafkaInternalTopic;
    }

}

And modify the implementation of the isMM2InternalTopic method in the DefaultReplicationPolicy, to include the configurable internal suffix:

DefaultReplicationPolicy
public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable { 
    ...      

    @Override
    public boolean isMM2InternalTopic(String topic) {
        return topic.startsWith("mm2") && topic.endsWith(internalSuffix()) || isCheckpointsTopic(topic);
    }    
}

The exclude list of the DefaultTopicFilter would also require some modification to reflect the new pattern:

DefaultTopicFilter
public static final String TOPICS_EXCLUDE_DEFAULT = "mm2.*internal, .*\\.replica, __.*";

Compatibility, Deprecation, and Migration Plan

Backward Compatibility Considerations:

  1. Anyone who relies on the current behaviour to block the replication of already existing user topics ending in ".internal" or "-internal", might need to update the TopicFilter, as with this change these topics will not be explicitly excluded.
  2. Anyone who uses a custom ReplicationPolicy implementation might need to update their source code to get the same behaviour.

Test Plan

Beside unit tests, this change can be tested on two Kafka cluster, with setting up a replication between them.

Rejected Alternatives

Already existing "workarounds" :

1. Use non-conflicting names for user topics

    This could only be a feasible option for newly created topics. In some cases it might cause too much overhead as the business / internal applications rely on this topic naming.

2. Use the replication.policy.separator to use a non-conflicting separator character

    This is only an option for new setups, as already existing setups without setting this config would use the default .internal convention. The default implementation of the ReplicationPolicy would still filter out topics ending in '.internal' or '-internal' as it is hardcoded.

3. Use a custom ReplicationPolicy that overrides this behavior

    This would be a feasible option for existing workloads too, but in my opinion this requires too much work for something that could possibly be controlled by a configuration property.

Solution proposals:

1. Add a new configuration property "internal.topic.replication.capability.enabled" that could control if the ReplicationPolicy should filter internal topics or not

   In this solution, we would override the isInternalTopic method of the DefaultReplicationPolicy based on the mentioned new configuration property. If set to true, the policy would not filter internal topics, and the isInternalTopic method would return false, leaving it to the TopicFilter to filter out any topics that should not be replicated. If set to false, the original behavior is preserved.

Risks:

  • May allow replication cycles
  • Higher chances of connect / mm2 internal topics being replicated because of a wrong configuration.

2. Make the internal topics configurable with a regex.

    In this solution, the ReplicationPolicy would consider a topic to be internal, if it matches a specific regex. The default regex would result in the same behavior as before, something like "__.*|\\..*|.*-internal|.*\\.internal". We would make this regex configurable by the user using a new configuration property called replication.policy.internal.topics. When users specify a value for this property, it overwrites the default behavior.

Risks:

  • Easier to make mistakes resulting in inconsistent behavior
  • It becomes possible for users to configure the DefaultReplicationPolicy class in a way that for some topic T, isInternalTopic(T) returns false, but isCheckpointsTopic(T), isHeartbeatsTopic(T), or isMM2InternalTopic(T) return true.


   



  • No labels