You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 25 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-7236

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The "min.insync.replicas" configuration specifies the minimum number of insync replicas required for a partition to accept messages from the producer. If the insync replica count of a partition falls under the specified "min.insync.replicas", then the broker will reject messages for producers using acks=all. These producers will suffer unavailability as they will see a NotEnoughReplicas or NotEnoughReplicasAfterAppend exception.

We currently have an UnderMinIsrPartitionCount metric which is useful for identifying when partitions fall under "min.insync.replicas", however it is still difficult to identify which topic partitions are affected and need fixing.

We can leverage the describe topics command in TopicCommand to add an option "--under-minisr-partitions" to list out exactly which topic partitions are below "min.insync.replicas".

Public Interfaces

This change would add an additional flag "--under-minisr-partitions" to TopicCommand, but the output will follow the same format as the "under-replicated-partitions" and "offline-partitions" options.

Proposed Changes

The challenge with supporting this additional feature is that the "min.insync.replicas" configuration may be set at a broker or topic level.

We can use the `AdminClient.describeConfigs` on the topics as that the API call will give us the "computed" proper values for configurations (ConfigSource as "DYNAMIC_TOPIC_CONFIG", "DYNAMIC_BROKER_CONFIG", "DYNAMIC_DEFAULT_BROKER_CONFIG", "STATIC_BROKER_CONFIG", and "DEFAULT_CONFIG").

We can pre-fetch the "computed" topic configurations if "--under-minisr-partitions" option is specified to avoid making a separate AdminClient call per topic.

# Assuming we have an AdminClient instance
val adminClient = ...

// Pre-fetch and get "computed" topic configs for all specified topics
val computedTopicConfigs = if (reportUnderMinISRPartitions)
  Option(adminClient.describeConfigs(
    topics.map(topic => new ConfigResource(ConfigResource.Type.TOPIC, topic)).asJavaCollection).all().get()) else None

for (topic <- topics)
  ...
  if (describePartitions) {
    // Get "computed" topic "min.insync.replicas" for this topic
    val computedTopicMinISR = if (reportUnderMinISRPartitions)
      Option(computedTopicConfigs.get.get(new ConfigResource(ConfigResource.Type.TOPIC, topic))
      .get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt) else None

    for ((partitionId, assignedReplicas) <- sortedPartitions) {
      ...

      // Print current topic partition if reportUnderMinISRPartitions and ISR count < "computed" min ISR
      if (... ||
        (reportUnderMinISRPartitions && inSyncReplicas.size < computedTopicMinISR.get) {
      ...


This means we need an additional flag "--bootstrap-server" to use AdminClient. KIP-377: TopicCommand to use AdminClient is already proposing a change to use AdminClient and introduce a "--bootstrap-server" option, so we can wait on KIP-377 to be completed first before merging this KIP's code.

Compatibility, Deprecation, and Migration Plan

As this change adds a new option instead of modifying existing ones, there will not be any compatibility issues or a migration.

Rejected Alternatives

None so far.

  • No labels