This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-409: Allow creating under-replicated topics and partitions
Skip to end of metadata
Go to start of metadata

co-authored-by: Mickael Maison <mickael.maison@gmail.com>

co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>

Status

Current state: Draft

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently when handling topics or partitions creation requests, Kafka enforces all replicas to be created in order to fulfill the request. While all other functionalities (produce/consume) are fault tolerant and can handle some brokers down, topics and partitions creations stop working as soon as there are no enough replicas available. In small clusters, when one node is unavailable, for example when a broker is being restarted, it's possible that there are not enough alive replicas to satisfy topic/partition creation.

This is obvious in a few scenarios:

  • In a 3 node cluster, while a rolling restart is happening, users can't create topics with replication factor 3. 

  • In a 4 node cluster, while a node is down, a rolling restart also prevent topics with replication factor 3 from being created.

  • in a 9 node cluster with 3 nodes in each zone, if 1 zone was to go offline, the cluster would still contain enough nodes (6) to host a topic with replication factor 3. However, in some environments it may still be preferable to only assign 2 replicas to currently alive nodes (in 2 zones) and assign the last replica to a broker in the unavailable zone that is expected to come back online later.

The same scenarios also apply to adding partitions to existing topics (CreatePartitions API).

Proposed Changes

We would like to allow users to create topics even when the available number of brokers is less than the number of requested replicas, as long as the expected size of the cluster under normal conditions is large enough. As soon as brokers re-join the cluster, the missing replicas will be created.

To allow that, we propose to keep track of brokers that are part of the cluster in zookeeper even after they disconnect ("observed brokers"), and use that information to determine an assignment (or to determine if a given assignment is feasible) on handling a topic creation request or partition creation request.

When the number of available brokers is sufficient and no assignment is specified, creating an under-replicated topic/partition may be preferred: e.g. replication factor 3, in a 3-zones region with 2 brokers per region, but a zone is down.
In such case the create request could make use of all observed brokers rather than the currently online ones.

We propose a protocol upgrade to CreateTopicsRequest and CreatePartitionsRequest to specify a minimum number of replicas to be available at creation to satisfy the request. 
A new error code will be added to indicate a topic/partition creation was completed without the full replication factor.

A
broker configuration will be added to select the desired behavior handling a create request that accepts under-replication.
Topic policies could be used to add more complex validations, for example to allow creation with min-isr factor but not less.

Public Interfaces

Network protocol and api behavior


CreateTopicsRequest
CreateTopics Request (Version: 4) => [create_topic_requests] timeout validate_only 
  create_topic_requests => topic num_partitions replication_factor [replica_assignment] [config_entries] min_required_replication_factor
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    replica_assignment => partition [replicas] 
      partition => INT32
      replicas => INT32
    config_entries => config_name config_value 
      config_name => STRING
      config_value => NULLABLE_STRING
    min_required_replication_factor => INT16  //new field
  timeout => INT32
  validate_only => BOOLEAN


CreateTopicsResponse (Version: 4) //same schema as version 3



CreatePartitionsRequest
CreatePartitions Request (Version: 2) => [topic_partitions] timeout validate_only 
  topic_partitions => topic new_partitions 
    topic => STRING
    new_partitions => count [assignment] min_required_replication_factor
      count => INT32
      assignment => ARRAY(INT32)
      min_required_replication_factor => INT16  //new field
  timeout => INT32
  validate_only => BOOLEAN


CreatePartitionsResponse (Version: 2) //same schema as version 1

Error Codes

ERROR

CODE

RETRIABLE

DESCRIPTION

CREATED_UNDER_REPLICATED

79

False

The server created a topic or partitions without the full set of replicas being active.


Errors
package org.apache.kafka.common.protocol;
public enum Errors {
...
    CREATED_UNDER_REPLICATED(79, "The server created a topic or partitions without the full set of replicas being active.",
        CreatedUnderReplicatedException::new);
}

AdminClient APIs

The AdminClient will be updated to expose the new fields. 

NewTopic
// Keep existing constructors but add a getter/setter pair 
// to specify the minimum required replication factor if different from full 
package org.apache.kafka.clients.admin;
public class NewTopic {
    ....   
    // return `this` for chaining 
    public NewTopic minRequiredReplicationFactor(short requiredReplicationFactor) {}
    public short minRequiredReplicationFactor() {}
}


NewPartition
// Keep existing factory methods and add a getter/setter pair (like NewTopic)  
// to specify the minimum required replication factor if different from full
package org.apache.kafka.clients.admin;
public class NewPartition {
    ....   
    // return `this` for chaining 
    public NewPartition minRequiredReplicationFactor(short requiredReplicationFactor) {}
    public short minRequiredReplicationFactor() {}
}

Broker Configuration

NAME

DESCRIPTION

TYPE

DEFAULT

VALID VALUES

IMPORTANCE

DYNAMIC UPDATE MODE

under.replicated.creation.policy

Behavior on Topic or Partition creation without the full set of replicas being active.

STRING

disabled

disabled, enabled, enabled_prefer_observed

Medium

cluster-wide

Administrators can disable (disabled), or enable (enabled) creations of topics/partitions without the full replication factor. When enabled a sub-option is to always prefer using all known brokers and their racks and have some under replication, rather than using currently available online brokers (enabled_prefer_observed)

Command line tools and arguments

New optional argument to the kafka-topics script:

--min-required-replication-factor <Integer:  min-required-replication-factor>
   The minimum required replication factor for topic/partition creation

Zookeeper changes:

New non-ephemeral node under /brokers called observed_ids.
Upon starting brokers will create/update the non ephemeral node for their current id containing the same information as the existing ephemeral brokers/ids node. 

The data in these new nodes (including rack if available) will be used on Topic/Partition creation when the request number of replicas (or the requested broker ids if assignment is specified) are not currently available.



Compatibility, Deprecation, and Migration Plan

  • For compatibility the default behavior remains unaltered: a number of active brokers equals (or greater) to the full replication factor is required for successful creation.

  • No migration is necessary.

  • Updates to the Decommissioning brokers section in the documentation
    • will mention that if a broker id is never to be reused then its corresponding node in zookeeper /brokers/observed_ids will need to be removed manually

Rejected Alternatives

  • Async replica assignment: Instead of relying on previously known brokers, we considered creating replicas on the available brokers at topic/partition create time and saving in zookeeper some information to asynchronously create missing replicas on the next suitable brokers that would join the cluster. Such a process would involve the controller checking if missing replicas existed when a broker joins the cluster. This alternative felt too complicated. Also the next joining broker may not be on the preferred rack assignment.
  • Instead of specifying a minimum replication factor on creation, we could have used a boolean to allow creation at min-isr size only. This approach looked a bit less flexible but not significantly simpler.

  • Update schema of CreateTopicsResponse and CreatePartitionsResponse to contain the actual replication factor at creation. Like assignments, it's not data the client can act on. We think the error message associated with the new error code can suffice.


  • No labels