Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Motivation

The Kafka configuration can be applied at the topic, broker, or cluster level. If a config value is defined at multiple levels, Kafka uses the following order of precedence:

  • Dynamic topic config stored in the metadata log
  • Dynamic per-broker config stored in the metadata log
  • Dynamic cluster-wide default config stored in the metadata log
  • Static broker config from server.properties
  • Kafka default

This proposal is inspired by the way KIP-966 handles the config min.insync.replicas. Based on configuration precedence described above, when ELR is enabled, KIP-966:

  1. disallow min.insync.replicas at the broker level
  2. automatically add min.insync.replicas at the cluster level, if not present
  3. disallow removing min.insync.replicas at the cluster level

The reason for this is that if brokers disagree about which partitions are under min ISR, it breaks the KIP-966 replication invariants.

However, even if ELR is not enabled, it's undesirable to have different min.insync.replicas on different brokers since if a leader is moved to a different broker, it will behave differently on the min.insync.replicas semantic. So, it's probably better to always enforce the above regardless of whether ELR is enabled or not.

In addition to config min.insync.replicas, there are more configurations where our primary concern is the consistent behavior, ensuring that topics operate consistently across the cluster, rather than expecting the same topic to behave differently on different brokers.

This KIP aims to enforce that the proposed configurations have the cluster-level settings, disallow their removal at the cluster level,  and disallow them from being set at the broker level, ensuring consistent behavior across the cluster.

Public Interfaces

Proposed Configurations to Enforce

  1. min.insync.replicas
  2. unclean.leader.election.enable
  3. message.max.bytes
  4. log.message.timestamp.type
  5. log.cleanup.policy
  6. compression.gzip.level
  7. compression.lz4.level
  8. compression.type
  9. compression.zstd.level
  10. log.cleaner.delete.retention.ms

  11. log.cleaner.max.compaction.lag.ms

  12. log.message.timestamp.after.max.ms

  13. log.message.timestamp.before.max.ms

  14. log.cleaner.min.compaction.lag.ms
  15. remote.log.copy.disable

  16. remote.log.delete.on.disable
  17. remote.storage.enable

Cluster's first startup

  • On the cluster's first startup, initialize proposed configurations at the cluster level with their static default values.

incrementalAlterConfigs and the deprecated AlterConfigs APIs 

  • Disallow setting these configurations at the broker level via the incrementalAlterConfigs API or the deprecated AlterConfigs API. The INVALID_CONFIG error with the corresponding error message will be returned if such requests are made.
  • Disallow removing these configurations at the cluster level via the incrementalAlterConfigs API or the deprecated AlterConfigs API. The INVALID_CONFIG error with the corresponding error message will be returned if such requests are made.

Upgrade

  • When upgrading to a MetadataVersion that includes this KIP, any proposed configurations at the broker level will be removed. For the cluster level, if they are not set, they will be initialized to their default values. The default values are the static configs.

Proposed Changes

Cluster's first startup

On the cluster's first startup, initialize the proposed configurations at the cluster level. Since dynamic configs have higher priority, any corresponding static broker configs will not take effect.

incrementalAlterConfigs and the deprecated AlterConfigs APIs

Disallow setting the proposed configurations at the broker level and removal of proposed configurations at the cluster level. The corresponding errors will be thrown if such requests are received.

Upgrade

When using the updateFeatures API to upgrade to a MetadataVersion that includes this KIP change:

  • Any proposed broker-level configurations will be removed.
  • If the cluster-level configurations are not set, default values will be applied.

New enum structure

Specifically, introduce a new enum GuardedBrokerConfig that includes the proposed configurations and helper methods to keep the code concise and extensible.

GuardedBrokerConfig.java
public enum GuardedBrokerConfig {
    MIN_IN_SYNC_REPLICAS(MIN_IN_SYNC_REPLICAS_CONFIG, ConfigDef.Type.INT),
    UNCLEAN_LEADER_ELECTION_ENABLE(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN),
    MESSAGE_MAX_BYTES(MESSAGE_MAX_BYTES_CONFIG, ConfigDef.Type.INT),
    LOG_MESSAGE_TIMESTAMP_TYPE(LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, ConfigDef.Type.STRING),
    LOG_CLEANUP_POLICY(LOG_CLEANUP_POLICY_CONFIG, ConfigDef.Type.STRING);
    // ... remaining configs ... //
	
    private final String configName;
    private final ConfigDef.Type type;

    private static final Map<String, GuardedBrokerConfig> NAME_TO_CONFIG = new HashMap<>();

    static {
        for (GuardedBrokerConfig config : GuardedBrokerConfig.values()) {
            NAME_TO_CONFIG.put(config.configName, config);
        }
    }

    GuardedBrokerConfig(String name, ConfigDef.Type type) {
        this.configName = name;
        this.type = type;
    }


	// ... remaining helper methods ... //
}


Compatibility, Deprecation, and Migration Plan

  • For the incrementalAlterConfigs and AlterConfigs APIs, any disallowed requests will be rejected and immediately throw an error. This may break user applications.

  • On upgrade, any broker-level settings for the proposed configurations will be removed. This may break users who set these configs at the broker level.

Test Plan

The typical suite of unit/integration tests will be added.

Rejected Alternatives

1. Disallow alter API requests with warnings and ignore setting

Instead of immediately rejecting disallowed requests with an error, only log a warning and ignore the setting. This would avoid breaking user applications until the next major release.

Reason for Rejection: Users may easily overlook warnings and not realize their configurations have not taken effect, which could lead to even more severe operational issues later.

2. Each proposed config with its own implementation logic

Instead of introducing a new enum structure, handle each configuration individually with its own logic (similar to how min.insync.replicas is currently implemented, see https://github.com/apache/kafka/pull/17952), 

Reason for Rejection: This approach is harder to extend and more error-prone. Having a centralized structure provides a cleaner and more extensible solution.


  • No labels