Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when a user decides to delete/recreate a kafka source topic, the flink job silently adapts,

Reality proves users are not always aware of changes to their topics and sometimes are desperately trying to figure out why their job is not advancing, or producing incomplete results.

Moreover, topic changes impair recovery abilities, and thuse should be advertised accordingly - with a global failure. 

The proposed change doesn't aim to change the responsibility model between the user and the job, but allow for a “strict mode” one could enable and expect a failure to follow mismatch, improving system visibility and data correctness. 

Topic deletion 

When a source topic is deleted:

  • No error is flagged if partition discovery is disabled
  • When partition discovery is enabled, UNKNOWN_TOPIC_OR_PARTITION (transient) is thrown during discovery.

Topic recreation 

kafka api traditionally uses topic names, with the following changes addressing topicId:

  • KIP-516 Discussed topicId support for consumer and producer APIs, but left regular clients out of scope

  • KIP-848 Introduced topicId based communication internally for the rebalance protocol

  • KAFKA-14499 Implemented topic id support on public APIs OffsetCommit/OffsetFetch). 

So far no topicId communication is supported on the client side. This effectively means we are unable to tell between old and the new topics with the same name, so recreation would either go undetected, or the job will (incorrectly) recover from a transient failure.

Source topic recreation implications: 

  • Checkpoints referring to offsets in the old topic become obsolete - the job is effectively unable to recover, but continues to run without disclosing this.
  • org.apache.kafka.common.errors.TimeoutException: Timeout of <> expired before the position for partition <>-<> could be determined occurs when the topic is recreated with fewer partitions, still not flagging the actual issue. 

Raising topicId awareness in kafka connector APIs, proposed in this FLIP, sets the grounds for future topicId based communication with the kafka server, once supported by kafka client. 

Public Interfaces

The FLIP proposes to add an additional connector option allowing for topic integrity checks. When enabled, jobs will fail unrecoverabley with a TopicIntegrityException(topic <> was deleted/ recreated) error if a referenced source topic was deleted or recreated.

For the check to be performed periodically, partition discovery must be enabled, otherwise performed once on startup.

DataStream

kafka source: Additional KafkaSourceBuilder option

 /** 
 * Whether perform a check for the integrity of source topics, And fail the job if the 
 * topic was deleted or recreated. 
 * For the check to be performed periodically during source runtime, 
 * partition discovery must be enabled (KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS set to a positive number)
 * 
 * @return {@link KafkaSourceBuilder} 
 */ 
 public KafkaSourceBuilder<OUT> enableTopicIntegrityCheck();

KafkaSourceOptions.java

    public static final ConfigOption<Boolean> TOPIC_INTEGRITY_CHECK_ENABLED =
            ConfigOptions.key("scan.topic-integrity-check.enabled")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "Whether to verify topic id during runtime and fail if the topic is missing or recreated");


Usage example

KafkaSource<?> source = KafkaSource.<?>builder() 
	.setBootstrapServers(<>) 
	.setTopics(new TopicIdentifier(SOURCE_TOPIC_NAME, SOURCE_TOPIC_ID)) 
	.setValueOnlyDeserializer(new SimpleStringSchema()) 
+	.enableTopicIntegrityCheck()
	.build(); 

SQL / CREATE TABLE

KafkaConnectorOptions.java:

    public static final ConfigOption<Boolean> SCAN_TOPIC_INTEGRITY_CHECK_ENABLED =
            ConfigOptions.key("scan.topic-integrity-check.enabled")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "Whether to verify topic id during runtime and fail if the topic is missing or recreated");

Usage example

CREATE TABLE KafkaSourceTable ( 
  `user_id` BIGINT, 
  `item_id` BIGINT, 
  `behavior` STRING, 
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' 
) WITH (
  'connector' = 'kafka', 
  'topic' = 'user_behavior',
+ 'scan.topic-integrity-check.enabled' = 'true',
  'scan.topic-partition-discovery.interval' = '300000 ms' 
);


Proposed Changes

Obtaining and preserving topicId

For user convenience, and to comply with kafka topic-name based APIs, flink will figure out the topicId on its own (i.e user will not need to provide it).

To ensure topic integrity over recoveries, topic names → ids mapping will be preserved in KafkaSourceEnumeratorState.topicIntegrityMapping checkpointed state, and propagated to kafka subscriber post recovery.

For older state versions, or when the feature is disabled, this field holds an empty dataset so by default no additional state is acquired.

Integrity check 

The implementation avoids additional calls to kafka server, and normally bases of existing retrieved metadata (see disclaimer),

On every metadata fetch (on startup / upon partition discovery interval), the topic Id from kafka server is compared against the stored topic id. A mismatch will trigger a global failure.

If UNKNOWN_TOPIC_OR_PARTITION was thrown during topic discovery, topic integrity provider will issue a followup call (list topics) to determine if the exception was thrown due to topic missing in metadata, and if yes, will throw a TopicIntegrityException("topic <> is missing"). If no topic is missing, the original UNKNOWN_TOPIC_OR_PARTITION will be thrown for retry. (The reasoning for throwing TopicIntegrityException over UNKNOWN_TOPIC_OR_PARTITION  is that TopicIntegrityException triggers a global failure instead of a transient, continuousely retried RuntimeException)


Disclaimer

While the job will eventually fail after a topic recreation, there is an inevitable short period of time (depending on the defined interval config) before the recreation detection that the job will read from the new topic. 

This can not be avoided, unless metadata is retrieved on every fetch/write cycle, which is resource-expensive, or communication with the broker is topicId based (not supported yet). This inconsistency period is configurable, and can be reduced at a trade off (more frequent metadata calls).

Out of scope: 

Sink topics integrity

While the integrity of sink topic is important for data completeness, it does not risk job recovery, and also comes with much higher LOE (as kafka sink doesn't operate on a coordinator yet, orchestrating periodical checks is much more complex, and also comes at the costs of introducing additional calls to the kafka server and introducing state to currently-stateless operators like the non-transactional sink)

Compatibility, Deprecation, and Migration Plan

No public deprecations intended for the current iteration. 

Existing methods will be adapted internally to handle TopicIdentifier instead of a String topicName , (with topicId being null). 

Test Plan

  • Unit tests for the topic integrity check 
  • ITCases covering both successful and failed topic integrity checks per kafka subscription mode
  • State migration tests

Rejected Alternatives

Ideally we would communicate with kafka server on topicId, however there’s currently lack of support on the kafka client side, nor future plans on the roadmap. 

Future plans 

Though there’s no KIP yet, once topicId based communication is supported on kafka client we could introduce a much more smooth and real-time topic integrity validation