Status

Current state: Voting

Discussion thread: here

Vote thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-932: Queues for Kafka introduced queuing semantics to Apache Kafka for consumers using share groups. It describes the concepts of delivery counts and failed delivery attempts, but it did not have any automated processing for records which had reached their delivery attempt limit or just been rejected as unprocessable records.

When a record is repeatedly delivered to a share group without being successfully acknowledged, it may be because the record has some kind of serialisation or semantic problem and is a so-called "poison message". The delivery attempt limit is enforced to prevent an endless loop of failed processing attempts, even in cases where the consuming application does not have adequate error handling and simply fails on each attempt. A well written consuming application can capture a failing record, and explicitly reject it, thus ending its delivery attempts more efficiently. These undeliverable records are candidates for placing on a dead-letter queue (DLQ) topic for inspection or special processing, as opposed to simply being put directly into the Archived state as described in KIP-932.

Apache Kafka already has dead-letter queues for Kafka Connect and Kafka Streams. A dead-letter queue in Kafka is a topic but “dead-letter queue” is a term already used in the industry. This KIP extends the idea to apply to share groups as well.

Proposed Changes

In Kafka, a "dead-letter queue" is a topic onto which records whose delivery failed are automatically written. A share group can be configured with the name of a topic to be used as the group's DLQ topic. For a share group with a DLQ topic, when a record's delivery is rejected by the consumer or the record reaches its maximum number of delivery attempts, it transitions to the Archiving state, a record is written onto the DLQ topic, and then the record transitions into the Archived state. This DLQ record contains the context of the undeliverable record (topic name, partition, offset and so on) as record headers, and optionally the content of the undeliverable record itself.

The following context information headers are included in the DLQ record, overwriting existing headers of the same name if they already exist in the original record. To avoid clashing with headers from the original record, all error context information headers start with "__dlq.errors." :

Header nameDescription
__dlq.errors.topic 

The name of the topic that contained the original record.

__dlq.errors.partition 

The partition that contains the original record (encoded as a UTF-8 string).

__dlq.errors.offset 

The offset of the original record (encoded as a UTF-8 string).

__dlq.errors.group 

If known, the group ID of the delivery of the original record.

__dlq.errors.delivery.count 

If known, the delivery count of the original record (encoded as a UTF-8 string).

Additionally, the broker can include the header, key and value of the original record in the DLQ record, but that takes additional configuration because copying the original record duplicates its content which may be undesirable in performance or security terms.

The broker may batch writing of records onto the DLQ topic for reasons of efficiency. It is possible in rare situations that more than one DLQ record could be written for a particular undeliverable record. This is because the broker does not guarantee to write the DLQ record onto the DLQ topic atomically with the associated write to the internal share-group state topic.

If the broker fails to write to the DLQ topic, some errors such as stale metadata or a leadership change can obviously be resolved by retrying, and the broker will retry. For other errors, the broker will log an error and the record will progress into the Archived state regardless.

Security considerations

The records are written onto a DLQ topic by a broker which is intrinsically capable of writing to all topics. However, there is a security angle to consider here. If an application writes a record onto a topic T1  for which it has permission, and then causes the record to be treated as undeliverable by a share group with a DLQ topic configured, perhaps by rejecting it explicitly or hitting the maximum number of delivery attempts, the broker will write a DLQ record onto the DLQ topic. The application which wrote the original record onto T1  does not necessarily have permission to write onto the DLQ topic itself. Now let's say that the DLQ topic name configured is the name of a critical business topic, the original application has a convoluted way to write onto this critical topic. This is why the naming of the DLQ topic can be controlled, and why topics must be specifically configured to permit their use as DLQ topics.

In order to be used as a DLQ topic for share groups, and potentially other kinds of group in the future, the topic configuration errors.deadletterqueue.group.enable  must be true . This means the administrator of the topic controls whether the topic can be used as a DLQ topic.

A broker configuration errors.deadletterqueue.topic.name.prefix  contains the prefix of permitted DLQ topic names. By default, this is "dlq."  meaning that DLQ topics must start with this prefix, such as "dlq.T1" . This instantly tells consuming applications that the records they see were indirectly written to this topic. The context information headers also indicate that these are potentially DLQ records.

The broker does not by default automatically create DLQ topics, because that again would give a convoluted way of getting the broker to create a topic with a particular name.

In addition, the DLQ records do not need to contain the actual data from the original records. The copying of the original record data into the DLQ record is controlled by a group configuration errors.deadletterqueue.copy.record.enable  which is false by default.

DLQ topic conventions

There are a couple of obvious conventions for the use of DLQ topics.

First, a DLQ topic could be associated with a particular application. In this case, it would make sense to have a DLQ topic per application, probably aligned with the application's share group. If the application's share group was called "PAYMENTS" , then its DLQ topic could be "dlq.PAYMENTS" . In this situation, it is likely that the records on a DLQ topic would have predictable schemas.

Second, a DLQ topic could be considered as a cluster-wide topic. In this case, there would be a single DLQ topic on the cluster with a well-known name. It is entirely possible that the records on the central DLQ topic would have multiple schemas because the records could have originated in a variety of applications.

While it would be a good convention for DLQ topics to begin with the prefix "dlq." in all cases, some clusters implement a simple form of multi-tenancy by employing a prefix naming convention such as "emea.sales"  and using prefix ACLs. In order to accommodate naming conventions such as these, the cluster administrator can choose their own DLQ topic name prefix, or even choose not to have a prefix at all thus allowing the DLQ topics to fit in with a pre-existing topic naming convention.

Public Interfaces

Configuration

Cluster configuration

These are dynamic configurations defined at the cluster level.

ConfigurationDescriptionValues
errors.deadletterqueue.auto.create.topics.enable Whether to create DLQ topics automatically. Note that automatic creation of the DLQ topics is independent of the auto.create.topics.enable  configuration. When a DLQ topic is created automatically, the topic configuration errors.deadletterqueue.group.enable  will be set to true .Type: boolean, default: false 
errors.deadletterqueue.topic.name.prefix The prefix of permitted dead-letter queue topic names. If this is set to "", there is no restriction on the names used for dead-letter queue topics.Type: string, default: "dlq." 

For example, the following command would enable auto-creation of DLQ topics for the cluster.

$ bin/kafka-config.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config errors.deadletterqueue.auto.create.topics.enable=true

If KIP-1210: Disallow Configurations at the Broker Level is adopted, these configurations will be added to the list of those which are not permitted at the individual broker level.

Group configuration

Note that these configuration property names do not begin with share  even though they are initially only relevant to the behavior of share groups. This is because the concept is likely to be applicable to other group types in the future.

ConfigurationDescriptionValues
errors.deadletterqueue.topic.name The name of the topic to be used as the dead-letter queue (DLQ) for undeliverable records. The topic name is blank by default, which means that there is no DLQ topic for this group.Type: string, default: "", must not start with double underscore "_ _" to ensure that internal topics are not mistakenly used as DLQ topics
errors.deadletterqueue.copy.record.enable Whether to copy the record onto the dead-letter queue topic, or just to write a record containing the context information headers.Type: boolean, default: false 

Topic configuration

ConfigurationDescriptionValues
errors.deadletterqueue.group.enable Whether the topic can be used as a DLQ topic for groups.Type: boolean, default: false 

Records

This KIP defines a new DeliveryState  of Archiving with the value of 3 which indicates a record which is in the process of being written to the dead-letter queue topic. As the details of the implementation develop, it will become clear whether this value is written to the share-group state topic, or just a state in memory like Acquired. The KIP reserves it for potential use in the existing share coordinator record schemas.

Metrics

Broker metrics

The following new broker metrics are added:

Metric NameTypeGroupTagsDescriptionJMX Bean
DeadLetterQueueRecordCount MeterShareGroupMetricsgroup: <group_id> The number of records written to the dead-letter queue topic.

kafka.server:type=ShareGroupMetrics,name=DeadLetterQueueRecordCount

kafka.server:type=ShareGroupMetrics,name=DeadLetterQueueRecordCount,group={group_id} 

DeadLetterQueueTotalProduceRequestsPerSec MeterShareGroupMetricsgroup: <group_id> The rate of produce requests writing records to the dead-letter queue topic.

kafka.server:type=ShareGroupMetrics,name=DeadLetterQueueTotalProduceRequestsPerSec

kafka.server:type=ShareGroupMetrics,name=DeadLetterQueueTotalProduceRequestsPerSec,group={group_id} 

DeadLetterQueueFailedProduceRequestsPerSec MeterShareGroupMetricsgroup: <group_id> The rate of failed produce requests writing records to the dead-letter queue topic.

kafka.server:type=ShareGroupMetrics,name=DeadLetterQueueFailedProduceRequestsPerSec

kafka.server:type=ShareGroupMetrics,name=DeadLetterQueueFailedProduceRequestsPerSec,group={group_id}

Compatibility, Deprecation, and Migration Plan

Kafka Broker Migration

DLQ topics for share groups are enabled using the share.version=2  feature version. This means that DLQ topics are only enabled when all brokers support the feature.

Test Plan

The feature will be thoroughly tested with unit, integration and system tests. We will also carry out performance testing to ensure that the performance impact of DLQ topics is minimal, and that the cluster is not overwhelmed when the number of undeliverable records is high.

Rejected Alternatives

None.

  • No labels