Status
Current state: Under Discussion
Discussion thread:
JIRA:
-
KAFKA-10357Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Streams uses repartition topics to repartition the data when a key-based operation, e.g., aggregation or join, follows a key-changing operation, e.g., map. Additionally, Kafka Streams uses changelog topics to replicate the data in its state store for fault-tolerance. If any required repartition topic or changelog topic does not exist, it is created during a rebalance. More precisely these internal topics are created during the computation of the assignment. Consequently, if one of the internal topics is deleted between rebalances, it will be silently recreated as an empty topic and data might be lost. Kafka Streams users would either not notice the loss at all or notice it too late to limit the damage by stopping processing. Deletion of internal topics may happen by mistake. This silent recreation of internal topics could be avoided by creating the internal topics only once during the first-ever rebalance of the application (or after an application reset). However, determining the first-ever rebalance of an application is not always straightforward. For example, if all Kafka Streams clients of a Kafka Streams application are stopped, an internal topic is deleted, and then the Kafka Streams application is restarted, the deleted topic would be silently recreated as an empty topic because Kafka Streams does not have any information that it could leverage to recognize that the first rebalance after restarting is not the first-ever rebalance of the application.
We propose to move the creation of the internal topics to an initialization method that users can call before they start their application for the first time. That way, users have full control over when the internal topics are created and Kafka Streams can reliably notify the users about possible data loss by throwing an exception when internal topics do unexpectedly not exist. Additionally, we propose to add a configuration that allows to turn automatic creation of internal topics on or off. The configuration is needed for backward compatibility but also to keep the first steps with Kafka Streams simple for new users.
Public Interfaces
We propose to add an initialization method to the KafkaStreams
class. Method init()
sets up the broker-side state for a Kafka Streams application. It needs to be called at least from one Kafka Streams client.
public class KafkaStreams { public void init(final InitProperties initProperties); public class InitProperties { public void setUpRepartitionTopicsIfAbsentEnabled(); public void setUpRepartitionTopicsIfAbsentDisabled(); public void setUpChangelogTopicsIfAbsentEnabled(); public void setUpChangelogTopicsIfAbsentDisabled(); public void setUpInternalTopicsIfAbsentEnabled(); public void setUpInternalTopicsIfAbsentDisabled(); } }
We propose to add a new configuration to Kafka Streams to determine whether the initialization should be done automatically during a rebalance or by users calling KafkaStreams#init()
.
public class StreamsConfig { // possible values public static final String AUTOMATIC_SETUP = "automatic"; public static final String MANUAL_SETUP = "manual"; // configuration public static final String INTERNAL_TOPIC_SETUP = "internal.topics.setup"; // default is AUTOMATIC_SETUP }
We propose to add a new exception that helps to discriminate errors originating from missing internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException) might be different from reacting on missing internal topics, because the process for re-creating source topics might differ from the process for re-creating internal topics. Furthermore, source topics might be owned by a different team than the internal topics, consequently different people need to be paged.
package org.apache.kafka.streams.errors; public class MissingInternalTopicException extends StreamsException { }
Proposed Changes
If configuration APPLICATION_INITIALIZATION
is set to AUTOMATIC_INITIALIZATION
, the internal topics will be set up during a rebalance. If the internal topics do not exist, they will be created. That corresponds to the current behavior of Kafka Streams. Users can also call KafkaStreams.init()
to setup the internal topics when APPLICATION_INITIALIZATION
is set to AUTOMATIC_INITIALIZATION
, but the call is not necessary since the internal topics would be created anyways during the next rebalance.
If configuration APPLICATION_INITIALIZATION
is set to USER_INITIALIZATION
, the internal topics will not be set up during a rebalance but users need to call KafkaStreams.init()
to setup the internal topics. If the internal topics do not exist during a rebalance because KafkaStreams.init()
has not been called or one or more internal topics have been deleted, a MissingInternalTopicException
is thrown in each Kafka Streams client.
If KafkaStreams.init()
is called and one or more internal topics already exist, then only the missing internal topics are created as empty topics. In addition to creating internal topics, KafkaStreams.init()
will make all checks that are currently done during a rebalance including checks for source and sink topics.
Compatibility, Deprecation, and Migration Plan
Since we introduce configuration APPLICATION_INITIALIZATION
with default value AUTOMATIC_INITIALIZATION
that ensures the current Kafka Streams behavior, this KIP should not affect backward-compatibility.
We do not need to deprecate any public interfaces since we propose to add a new method to the public API that does not replace any other method.
Migrating from the current behavior to APPLICATION_INITIALIZATION
set to USER_INITIALIZATION
can be done without any specific migration plan. Users need to set APPLICATION_INITIALIZATION
to USER_INITIALIZATION
. They can additionally also call KafkaStreams.init()
if they think it is safe to re-create missing internal topics or if they know that no internal topics are missing.
Rejected Alternatives
- Persist a flag for the first-ever rebalance broker side: This approach was rejected because that would imply changes on the brokers which we thought we can avoid and still get a good solution with the approach proposed in this KIP.
- Use committed offsets for a repartition topic to verify if a repartition topic existed: This approach would not work since committed offsets are removed when a topic is deleted.