Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka powers real-time data processing, and kafka failure could cause severe impact to many businesses that depend on it. Among different types of kafka failures, one notable type of failure is deployment caused failure. According to failed components the problem can be classified into 

  1. Kafka service failure - Deploy bad broker build could fail both message producing and consuming, cause delay message processing even data loss
  2. Kafka producer/consumer service failure - Deploy bad producer or consumer service could cause data corruption or delay message processing

Even though the majority of code bugs can be captured by various test methods, there could still be unknowns including bad configuration, uncovered scenarios that could lead to failure after deployment. For a service that requires 3-4 9’s SLA, it needs extra layers to minimize impact of deployment caused failure.

Kafka canary isolation is proposed to isolate impact of deployment caused failure in a small portion of traffic (canary traffic), detect and mitigate deployment caused failure before it develops into catastrophic disaster.

For Kafka user services that have a canary environment, Kafka lacks solution to achieve end to end canary isolation between producer and consumer services, thus hard to limit blast radius of a bad producer/consumer service deployment

Non-Goal

Canary Isolation is not hard isolation. There are situations where the system is in sub optimal state, and kafka will prioritize availability over canary isolation.

E.g. If a topic has no canary partition, producers in canary will be able to produce messages to non-canary partitions.

Proposed Changes

Canary kafka broker

Canary broker is a subset of kafka brokers that serves canary traffic. Canary brokers can be identified by Kafka broker metadata pod. Pod is a new broker metadata introduced to identify a subset of brokers. Pod value will be canary-broker to identify canary brokers, while its default value is broker.

With canary brokers, deployment of a kafka cluster should always start from canary brokers.  Add baking time between deployment of canary broker and non-canary brokers, pause or rollback deployment if failure is detected after canary broker deployment.

Minimal number of canary brokers of a cluster depends on maximum replication factors. E.g. if max replication factor of topics is 3,  minimal number of  canary brokers is 3 then

Canary data

Canary data aka canary partitions are topic partitions that are placed on canary brokers. A general guide is that canary data should be a small amount of data, however the definition of small should be defined by kafka service owners. E.g. If the service owner defined that small amount as about 3%, 1 out of 32 partitions should be canary partitions. So that a topic must have more than 32 partitions to qualify for canary, and for a topic with 128 partitions, partitions 0, 32, 64 and 96 should be canary partitions. 

To make the amount of canary data configurable, a new kafka configuration, percent.canary.partition will be introduced, with 0 as default value. Topic creation and partition expansion will use the configuration value to determine canary partitions.

Canary environment of kafka user services

For kafka user services such as producer and consumer service. If there is already a canary environment. Kafka canary can align with the user service canary environment to isolate bad deployment of kafka user services within the canary environment. Producer service and consumer service can achieve canary isolation independently. Note solution to define kafka user service canary environment is out of scope of this KIP.

Canary producer

Canary producer is producer service instances running in a canary environment, and kafka traffic generated from canary producer is canary traffic. The goal is to isolate canary traffic in canary partitions. 

To achieve producer canary isolation, the producer needs to leverage PartitionInfo in topic metadata.  As Node in PartitionInfo contains pod which helps producer to identify canary partition

Producer service needs to implement a customer Partitioner to assign canary traffic to canary partition. Take following code a example

For keyed topics, there is a contract partition decided by key hash.  It will require

  1. A keys are isolated in canary environment called canary keys
  2. The hash algorithm is a consistent hash that assigns only canary keys to canary partitions.

Canary consumer

Similar to canary producer, canary consumer is consumer service instances running in a canary environment. Canary isolation to consumer is canary consumer only consume canary partitions.

To achieve consumer canary isolation, the consumer needs to implement the assign function of ConsumerPartitionAssignor, and make sure canary partitions are assigned to canary consumer members.

It’s the consumer service owner’s responsibility to identify canary consumer instances. In consumer rebalance protocol, each consumer instance is identified by a member Id. Member Id has 2 components, clientId + UUID if consumer is a dynamic member or groupInstanceID + UUID if consumer is a static member. Consumer service owners can encode canary into the clientId or groupInstanceId to ensure the consumer leader can identify canary consumer instances and achieve canary isolation. 

 

Public Interfaces

New Configurations

New broker Configs

percent.canary.partition

Percentage of partitions are canary partitions

Kafka brokers use the percentage to determine which partition should be placed on canary brokers. It will be use when topic creation or partition expansion

Default value: 0.0


Protocol Changes

{
 "apiKey": 3,
 "type": "response",
 "name": "MetadataResponse",
 "validVersions": "0-12",
 "flexibleVersions": "9+",
 "fields": [
{ "name": "Brokers", "type": "[]MetadataResponseBroker", "versions": "0+",
 "about": "Each broker in the response.", "fields": [   
    // start of new field
    { "name": "Pod", "type": "string", "versions": "12+", "about": "The broker pod " }
   // end of new field
   ]
]},


{
 "apiKey": 60,
 "type": "response",
 "name": "DescribeClusterResponse",
 "validVersions": "0-1",
 "flexibleVersions": "0+",
 "fields": [
       …
    // start of new field
    { "name": "Pod", "type": "string", "versions": "1+", "about": "The broker pod " }

    // end of new field
   ]},
  ]
}


{
 "apiKey": 6,
 "type": "request",
 "listeners": ["zkBroker"],
 "name": "UpdateMetadataRequest",
 "validVersions": "0-8",
 "flexibleVersions": "6+",
 "fields": [
   { "name": "LiveBrokers", "type": "[]UpdateMetadataBroker", "versions": "0+", "fields": [
    // start of new field
    { "name": "Pod", "type": "string", "versions": "8+", "about": "The broker pod " }

   // end of new field
   ]}
 ],
 }


{
 "apiKey": 0,
 "type": "metadata",
 "name": "RegisterBrokerRecord",
 "validVersions": "0-1",
 "fields": [
    // start of new field
    { "name": "Pod", "type": "string", "versions": "1+", "about": "The broker pod " }
   // end of new field
   ]}
 ]
}


{
 "apiKey":62,
 "type": "request",
 "listeners": ["controller"],
 "name": "BrokerRegistrationRequest",
 "validVersions": "0-1",
 "flexibleVersions": "0+",
 "fields": [
    // start of new field
    { "name": "Pod", "type": "string", "versions": "1+", "about": "The broker pod " }
    // end of new field 
  ]
}


Zookeeper data schema changes

Version 6 JSON schema for a broker is:
{
  "version":6,
  "host":"localhost",
  //start of new field
  “pod”:“broker”,  
  //end of new field
}




Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels

1 Comment

  1. Renamed the KIP to 1095 as 1094 is already taken