DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Voting
Discussion thread: here
Vote thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-932: Queues for Kafka introduced queuing semantics to Apache Kafka for consumers using share groups. It describes the concepts of delivery counts and failed delivery attempts, but it did not have any automated processing for records which had reached their delivery attempt limit or just been rejected as unprocessable records.
When a record is repeatedly delivered to a share group without being successfully acknowledged, it may be because the record has some kind of serialisation or semantic problem and is a so-called "poison message". The delivery attempt limit is enforced to prevent an endless loop of failed processing attempts, even in cases where the consuming application does not have adequate error handling and simply fails on each attempt. A well written consuming application can capture a failing record, and explicitly reject it, thus ending its delivery attempts more efficiently. These undeliverable records are candidates for placing on a dead-letter queue (DLQ) topic for inspection or special processing, as opposed to simply being put directly into the Archived state as described in KIP-932.
Apache Kafka already has dead-letter queues for Kafka Connect and Kafka Streams. A dead-letter queue in Kafka is a topic but “dead-letter queue” is a term already used in the industry. This KIP extends the idea to apply to share groups as well.
Proposed Changes
In Kafka, a "dead-letter queue" is a topic onto which records whose delivery failed are automatically written. A share group can be configured with the name of a topic to be used as the group's DLQ topic. For a share group with a DLQ topic, when a record's delivery is rejected by the consumer or the record reaches its maximum number of delivery attempts, it transitions to the Archiving state, a record is written onto the DLQ topic, and then the record transitions into the Archived state. This DLQ record contains the context of the undeliverable record (topic name, partition, offset and so on) as record headers, and optionally the content of the undeliverable record itself.
The following context information headers are included in the DLQ record, overwriting existing headers of the same name if they already exist in the original record. To avoid clashing with headers from the original record, all error context information headers start with "__dlq.errors." :
| Header name | Description |
|---|---|
__dlq.errors.topic | The name of the topic that contained the original record. |
__dlq.errors.partition | The partition that contains the original record (encoded as a UTF-8 string). |
__dlq.errors.offset | The offset of the original record (encoded as a UTF-8 string). |
__dlq.errors.group | If known, the group ID of the delivery of the original record. |
__dlq.errors.delivery.count | If known, the delivery count of the original record (encoded as a UTF-8 string). |
Additionally, the broker can include the header, key and value of the original record in the DLQ record, but that takes additional configuration because copying the original record duplicates its content which may be undesirable in performance or security terms.
The broker may batch writing of records onto the DLQ topic for reasons of efficiency. It is possible in rare situations that more than one DLQ record could be written for a particular undeliverable record. This is because the broker does not guarantee to write the DLQ record onto the DLQ topic atomically with the associated write to the internal share-group state topic.
If the broker fails to write to the DLQ topic, some errors such as stale metadata or a leadership change can obviously be resolved by retrying, and the broker will retry. For other errors, the broker will log an error and the record will progress into the Archived state regardless.
Security considerations
The records are written onto a DLQ topic by a broker which is intrinsically capable of writing to all topics. However, there is a security angle to consider here. If an application writes a record onto a topic T1 for which it has permission, and then causes the record to be treated as undeliverable by a share group with a DLQ topic configured, perhaps by rejecting it explicitly or hitting the maximum number of delivery attempts, the broker will write a DLQ record onto the DLQ topic. The application which wrote the original record onto T1 does not necessarily have permission to write onto the DLQ topic itself. Now let's say that the DLQ topic name configured is the name of a critical business topic, the original application has a convoluted way to write onto this critical topic. This is why the naming of the DLQ topic can be controlled, and why topics must be specifically configured to permit their use as DLQ topics.
In order to be used as a DLQ topic for share groups, and potentially other kinds of group in the future, the topic configuration errors.deadletterqueue.group.enable must be true . This means the administrator of the topic controls whether the topic can be used as a DLQ topic.
A broker configuration errors.deadletterqueue.topic.name.prefix contains the prefix of permitted DLQ topic names. By default, this is "dlq." meaning that DLQ topics must start with this prefix, such as "dlq.T1" . This instantly tells consuming applications that the records they see were indirectly written to this topic. The context information headers also indicate that these are potentially DLQ records.
The broker does not by default automatically create DLQ topics, because that again would give a convoluted way of getting the broker to create a topic with a particular name.
In addition, the DLQ records do not need to contain the actual data from the original records. The copying of the original record data into the DLQ record is controlled by a group configuration errors.deadletterqueue.copy.record.enable which is false by default.
DLQ topic conventions
There are a couple of obvious conventions for the use of DLQ topics.
First, a DLQ topic could be associated with a particular application. In this case, it would make sense to have a DLQ topic per application, probably aligned with the application's share group. If the application's share group was called "PAYMENTS" , then its DLQ topic could be "dlq.PAYMENTS" . In this situation, it is likely that the records on a DLQ topic would have predictable schemas.
Second, a DLQ topic could be considered as a cluster-wide topic. In this case, there would be a single DLQ topic on the cluster with a well-known name. It is entirely possible that the records on the central DLQ topic would have multiple schemas because the records could have originated in a variety of applications.
While it would be a good convention for DLQ topics to begin with the prefix "dlq." in all cases, some clusters implement a simple form of multi-tenancy by employing a prefix naming convention such as "emea.sales" and using prefix ACLs. In order to accommodate naming conventions such as these, the cluster administrator can choose their own DLQ topic name prefix, or even choose not to have a prefix at all thus allowing the DLQ topics to fit in with a pre-existing topic naming convention.
Public Interfaces
Configuration
Cluster configuration
These are dynamic configurations defined at the cluster level.
| Configuration | Description | Values |
|---|---|---|
errors.deadletterqueue.auto.create.topics.enable | Whether to create DLQ topics automatically. Note that automatic creation of the DLQ topics is independent of the auto.create.topics.enable configuration. When a DLQ topic is created automatically, the topic configuration errors.deadletterqueue.group.enable will be set to true . | Type: boolean, default: false |
errors.deadletterqueue.topic.name.prefix | The prefix of permitted dead-letter queue topic names. If this is set to "", there is no restriction on the names used for dead-letter queue topics. | Type: string, default: "dlq." |
For example, the following command would enable auto-creation of DLQ topics for the cluster.
$ bin/kafka-config.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config errors.deadletterqueue.auto.create.topics.enable=true
If KIP-1210: Disallow Configurations at the Broker Level is adopted, these configurations will be added to the list of those which are not permitted at the individual broker level.
Group configuration
Note that these configuration property names do not begin with share even though they are initially only relevant to the behavior of share groups. This is because the concept is likely to be applicable to other group types in the future.
| Configuration | Description | Values |
|---|---|---|
errors.deadletterqueue.topic.name | The name of the topic to be used as the dead-letter queue (DLQ) for undeliverable records. The topic name is blank by default, which means that there is no DLQ topic for this group. | Type: string, default: "", must not start with double underscore "_ _" to ensure that internal topics are not mistakenly used as DLQ topics |
errors.deadletterqueue.copy.record.enable | Whether to copy the record onto the dead-letter queue topic, or just to write a record containing the context information headers. | Type: boolean, default: false |
Topic configuration
| Configuration | Description | Values |
|---|---|---|
errors.deadletterqueue.group.enable | Whether the topic can be used as a DLQ topic for groups. | Type: boolean, default: false |
Records
This KIP defines a new DeliveryState of Archiving with the value of 3 which indicates a record which is in the process of being written to the dead-letter queue topic. As the details of the implementation develop, it will become clear whether this value is written to the share-group state topic, or just a state in memory like Acquired. The KIP reserves it for potential use in the existing share coordinator record schemas.
Metrics
Broker metrics
The following new broker metrics are added:
| Metric Name | Type | Group | Tags | Description | JMX Bean |
|---|---|---|---|---|---|
DeadLetterQueueRecordCount | Meter | ShareGroupMetrics | group: <group_id> | The number of records written to the dead-letter queue topic. |
|
DeadLetterQueueTotalProduceRequestsPerSec | Meter | ShareGroupMetrics | group: <group_id> | The rate of produce requests writing records to the dead-letter queue topic. |
|
DeadLetterQueueFailedProduceRequestsPerSec | Meter | ShareGroupMetrics | group: <group_id> | The rate of failed produce requests writing records to the dead-letter queue topic. |
|
Compatibility, Deprecation, and Migration Plan
Kafka Broker Migration
DLQ topics for share groups are enabled using the share.version=2 feature version. This means that DLQ topics are only enabled when all brokers support the feature.
Test Plan
The feature will be thoroughly tested with unit, integration and system tests. We will also carry out performance testing to ensure that the performance impact of DLQ topics is minimal, and that the cluster is not overwhelmed when the number of undeliverable records is high.
Rejected Alternatives
None.