Status

Current state: Accepted

Discussion thread: here

JIRA: KAFKA-17885

Released: <Kafka Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-899 added support for clients to rebootstrap when all nodes in the last known metadata are unavailable. As mentioned in the motivation for KIP-899, rebootstrapping is useful when metadata in the client is stale and doesn’t contain any of the currently active brokers in the cluster. Typically, periodic metadata refresh is sufficient for clients to obtain the latest set of brokers and new metadata is obtained from one of these brokers. But in some cases described in the KIP and associated tickets, all the brokers in the current client metadata may have been shut down and this prevents clients from obtaining any more metadata. 

The conditions under which rebootstrapping is triggered by KIP-899 is very limited, making this feature difficult to use in practice. A node is considered unavailable if there is no connection to the node and connections cannot be established to the node due to backoff from a previous connection attempt. Backoff for reconnection attempts to nodes are configured using the pair of exponential backoff configs reconnect.backoff.ms and reconnect.backoff.max.ms with default values of 50ms and 1000ms respectively. So, in order to trigger re-bootstrap, clients have to fail connections to all nodes in the current metadata within a second. For each connection, the default connection set up timeout is an exponential timeout between 10s and 30s configured using socket.connection.setup.timeout.ms and socket.connection.setup.timeout.max.ms. So if a connection attempt is stuck for over a second, other nodes that are in reconnection backoff state will no longer be in backoff and as a result, rebootstrap will never be attempted. We cannot recommend arbitrarily increasing reconnect backoff intervals since this affects typical reconnection times after disconnections, which are fairly common in Cloud environments. We also cannot recommend arbitrarily decreasing connection setup timeouts since this can result in unnecessary reconnections. So this KIP proposes a couple of other mechanisms to trigger rebootstrap in clients.

Since 4.0 is a major release, it seems like the right time to enable rebootstrapping by default. Clients are unlikely to know if rebootstrapping may be needed in future. By changing the default, all clients from 4.0 onwards will benefit from this feature. 

Public Interfaces

Configuration

We propose to introduce a new rebootstrap trigger configuration for clients that will initiate rebootstrap if new metadata cannot be obtained for the configured timeout. Timeout interval will be calculated from the time metadata request is first attempted. Rebootstrap will be attempted only if metadata.recovery.strategy=rebootstrap.

  • Config name: metadata.recovery.rebootstrap.trigger.ms

  • Config type: Long

  • Default value: 300000 (5 minutes)

  • Description: If a client that requires new metadata is unable to obtain metadata for this interval from any of the nodes in the current metadata, client repeats the bootstrap process using bootstrap.servers configuration. 

Default value for metadata.recovery.strategy introduced in KIP-899 will be changed from none to rebootstrap. The change will apply for producers, consumers and admin clients. Clients may disable re-bootstrapping by setting metadata.recovery.strategy=none.

In addition to producers, consumers and admin clients, rebootstrap strategy and trigger configuration will be added to share consumers, KStreams and Connect worker clients with the same defaults.

Protocol: Metadata Request

We propose to introduce a new error code REBOOTSTRAP_REQUIRED for metadata responses to enable Kafka-protocol-aware proxies to request rebootstrap in clients when brokers have changed. KIP-559 has already made Kafka protocol more proxy-friendly and the new error code will help proxies to recover clients with stale metadata without client restart. Metadata request version will be bumped from 12 to 13 and a new top-level error code will be added in the metadata response. This will enable proxies to return the new error code only to clients using version 13 and above.

Metadata request format remains unchanged:

Metadata Request
{
  "apiKey": 3,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "MetadataRequest",
  "validVersions": "0-13",
  "deprecatedVersions": "0-3",
  "flexibleVersions": "9+",
  "fields": [
    // In version 0, an empty array indicates "request metadata for all topics."  In version 1 and
    // higher, an empty array indicates "request metadata for no topics," and a null array is used to
    // indicate "request metadata for all topics."
    //
    // Version 2 and 3 are the same as version 1.
    //
    // Version 4 adds AllowAutoTopicCreation.
    //
    // Starting in version 8, authorized operations can be requested for cluster and topic resource.
    //
    // Version 9 is the first flexible version.
    //
    // Version 10 adds topicId and allows name field to be null. However, this functionality was not implemented on the server.
    // Versions 10 and 11 should not use the topicId field or set topic name to null.
    //
    // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed
    // by the DescribeCluster API (KIP-700).
    // Version 12 supports topic Id.
    // Version 13 supports top-level error code in the response.
    { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
      "about": "The topics to fetch metadata for.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "nullableVersions": "10+",
        "about": "The topic name." }
    ]},
    { "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false,
      "about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." },
    { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8-10",
      "about": "Whether to include cluster authorized operations." },
    { "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+",
      "about": "Whether to include topic authorized operations." }
  ]
}


Metadata response includes top-level error code:

Metadata Response
{
  "apiKey": 3,
  "type": "response",
  "name": "MetadataResponse",
  // Version 1 adds fields for the rack of each broker, the controller id, and
  // whether or not the topic is internal.
  //
  // Version 2 adds the cluster ID field.
  //
  // Version 3 adds the throttle time.
  //
  // Version 4 is the same as version 3.
  //
  // Version 5 adds a per-partition offline_replicas field. This field specifies
  // the list of replicas that are offline.
  //
  // Starting in version 6, on quota violation, brokers send out responses before throttling.
  //
  // Version 7 adds the leader epoch to the partition metadata.
  //
  // Starting in version 8, brokers can send authorized operations for topic and cluster.
  //
  // Version 9 is the first flexible version.
  //
  // Version 10 adds topicId.
  //
  // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
  // by the DescribeCluster API (KIP-700).
  // Version 12 supports topicId.
  // Version 13 supports top-level error code in the response.
  "validVersions": "0-13",
  "flexibleVersions": "9+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Brokers", "type": "[]MetadataResponseBroker", "versions": "0+",
      "about": "A list of brokers present in the cluster.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId",
        "about": "The broker ID." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The broker hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The broker port." },
      { "name": "Rack", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true, "default": "null",
        "about": "The rack of the broker, or null if it has not been assigned to a rack." }
    ]},
    { "name": "ClusterId", "type": "string", "nullableVersions": "2+", "versions": "2+", "ignorable": true, "default": "null",
      "about": "The cluster ID that responding broker belongs to." },
    { "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, "entityType": "brokerId",
      "about": "The ID of the controller broker." },
    { "name": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+",
      "about": "Each topic in the response.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The topic error, or 0 if there was no error." },
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "12+",
        "about": "The topic name. Null for non-existing topics queried by ID. This is never null when ErrorCode is zero. One of Name and TopicId is always populated." },
      { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
        "about": "The topic id. Zero for non-existing topics queried by name. This is never zero when ErrorCode is zero. One of Name and TopicId is always populated." },
      { "name": "IsInternal", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
        "about": "True if the topic is internal." },
      { "name": "Partitions", "type": "[]MetadataResponsePartition", "versions": "0+",
        "about": "Each partition in the topic.", "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The partition error, or 0 if there was no error." },
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the leader broker." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true,
          "about": "The leader epoch of this partition." },
        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of all nodes that host this partition." },
        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
          "about": "The set of nodes that are in sync with the leader for this partition." },
        { "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true, "entityType": "brokerId",
          "about": "The set of offline replicas of this partition." }
      ]},
      { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
        "about": "32-bit bitfield to represent authorized operations for this topic." }
    ]},
    { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648",
      "about": "32-bit bitfield to represent authorized operations for this cluster." },
    { "name": "ErrorCode", "type": "int16", "versions": "13+", "ignorable": true,                         <====== NEW
      "about": "The top-level error code, or 0 if there was no error." }
  ]
}


On receiving REBOOTSTRAP_REQUIRED error, clients will repeat the bootstrap process using bootstrap.servers configuration. A new exception class RebootstrapRequiredException will be associated with the error REBOOTSTRAP_REQUIRED.

RebootstrapRequiredException.java
package org.apache.kafka.common.errors;
public class RebootstrapRequiredException extends ApiException {
    private static final long serialVersionUID = 1L;
    public RebootstrapRequiredException(String message) {
        super(message);
    }
    public RebootstrapRequiredException(String message, Throwable cause) {
        super(message, cause);
    }
} 


Proposed Changes

In NetworkClient.DefaultMetadataUpdater and AdminMetadataManager, we will track the start time when metadata request is first attempted. This time will be cleared when a metadata response containing one or more brokers is received. If no good response is received, for example due to connectivity issues for an interval greater than the rebootstrap timeout, the client will close all its current connections and perform rebootstrap using its bootstrap.servers configuration. Rebootstrapping will be performed only if client has configured metadata.recovery.strategy=rebootstrap (new default).

If metadata response is received with error REBOOTSTRAP_REQUIRED, e.g. from a proxy, client will close all its current connections and perform rebootstrap using its bootstrap.servers configuration. Rebootstrapping will be performed only if client has configured metadata.recovery.strategy=rebootstrap.

If client configures metadata.recovery.strategy=none, rebootstrapping will not be performed under any condition.

Compatibility, Deprecation, and Migration Plan

The default value of metadata.recovery.strategy is currently none. Since 4.0 is a major release, we are proposing to change the default to rebootstrap so that clients can always connect either through their current metadata or by rebootstrapping. Clients may configure metadata.recovery.strategy=none to disable rebootstrapping.

For clients that have not explicitly configured  metadata.recovery.strategy=none, client behaviour will change in case brokers in the current metadata are unavailable. Clients will rebootstrap if any of the following conditions are met:

  1. None of the nodes in the current metadata is available. This is based on the KIP-899 definition of availability where a node is considered unavailable if no connections exist and new connections are not allowed due to reconnection backoff.

  2. Rebootstrap may be performed on the configured metadata.recovery.rebootstrap.trigger.ms, set by default to 5 minutes. This gives sufficient time to connect to multiple existing brokers with default connection timeouts. Clients may set rebootstrap trigger to higher values to avoid rebootstrapping based on timeout.

  3. Rebootstrap may be performed if requested by a server or Kafka protocol-aware proxy using the new error code REBOOTSTRAP_REQUIRED . We are bumping up metadata version to ensure that proxies only send the new error to clients supporting the new metadata version. Older clients can use KIP-899 as-is or require restart instead of rebootstrap as they do today.

Test Plan

Existing rebootstrap integration tests will be updated to also run the same tests with metadata.recovery.rebootstrap.trigger.ms  instead of the increased reconnection backoff times currently used in the tests.

Unit tests will be added to verify handling of REBOOTSTRAP_REQUIRED error code.

Documentation Plan

Documentation will be added for the new configuration option metadata.recovery.rebootstrap.trigger.ms and the new default value of metadata.recovery.strategy .

Rejected Alternatives

Use higher reconnect backoff and lower connection setup timeouts instead of introducing new rebootstrap trigger time config

If we don’t add rebootstrap timeout, clients that potentially require rebootstrap will need to set higher reconnect.backoff.ms and reconnect.backoff.max.ms to ensure that nodes stay in backoff state for long enough to trigger rebootstrap. For a cluster with a large number of nodes or a cluster where connection setup is stuck, backoff times will need to be very large to trigger rebootstrap. Higher backoff times could impact reconnections during normal maintenance operations on the cluster like rolls as well as availability during transient connectivity issues. To handle stuck connections, clients will also need to lower socket.connection.setup.timeout.ms and socket.connection.setup.timeout.max.ms. Setting connection timeouts too low could result in connection failures and re-connections during normal operations. So this KIP proposes a separate rebootstrap timeout to enable clients to use connection setup and backoff intervals that are suitable to their environments and still have the ability to rebootstrap.

Disable rebootstrap trigger time by default even when metadata.recovery.strategy=rebootstrap

The KIP proposes to enable rebootstrap timeout by default to 5 minutes for clients that have configured metadata.recovery.strategy=rebootstrap. We could instead require explicit setting of rebootstrap timeout to retain existing behaviour of rebootstrap strategy. Since KIP-899 is a new KIP that was only introduced in Apache Kafka 3.8 and its current usage requires changing five configs (metadata.recovery.strategy, reconnect.backoff.ms , reconnect.backoff.max.ms , socket.connection.setup.timeout.ms and socket.connection.setup.timeout.max.ms) similar to how we set this up in the integration tests, it seems better to have a default timeout value that simplifies client configuration to enable this feature. With the proposed timeout with default of 5 minutes and change in default metadata recovery strategy to rebootstrap, clients will automatically rebootstrap without additional configuration changes.



  • No labels