Status

Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://lists.apache.org/thread/n03bc8o53yj5llnr5xhcnqdxr0goxm5v)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Motivation

This proposal aims to complement a feature introduced in FLIP-246: Dynamic Kafka Source, with a counterpart called the Dynamic Kafka Sink. The feature addresses a common problem often encountered in large infrastructure setups: handle situations where the Kafka clusters and topics it writes to can change during the Flink job's execution, for example during Kafka cluster upgrades or migrations. This is in contrast to a standard Kafka sink, which typically connects to a fixed set of clusters and topics defined at startup. This dynamic behavior is achieved through metadata discovery.

The Dynamic Kafka Sink enables producing records into data streams - topics that belong to multiple Kafka clusters - and supports seamless Kafka failovers during cluster migrations (often occurring during version upgrades). The new sink allows:

  • Transparent Kafka cluster addition/removal without requiring a Flink job restart.
  • Transparent Kafka topic addition/removal without requiring a Flink job restart.

Basic Idea

A Kafka cluster migration is the process of transferring a Kafka cluster from one environment to another, with the eventual decommissioning of the old cluster. During this migration, consumers and producers need to be re-routed and configured dynamically, pointing to the new cluster. To achieve this, Flink connectors can interact with an external metadata service that orchestrates the migration of producers and consumers, ensuring there is no data loss. The Dynamic Kafka Source implements already a solution for orchestrating consumers automatically, and this proposal aims to address the same for producers.

This Dynamic Kafka Sink is capable of producing data into multiple Kafka topics located on multiple Kafka clusters, as well as dynamically adjusting the output clusters and topics based on the subscribed stream ids and the Kafka stream resolver. The sink orchestrates multiple Kafka SinkWriter instances, each responsible for checkpointing, metrics, and writing to a single Kafka cluster.

Metadata Coordination

The DynamicKafkaSink is an API that takes a DataStream<T> as input for which it will write the records into multiple Kafka cluster according to the metadata provided by a KafkaMetadataService. The metadata service can change the metadata any time so we need a coordinator to obtain and reconcile the changes. The DynamicKafkaSink interacts with the metadata service through the internal DynamicKafkaWriter which reconciles the changes if needed. The proposal is based on the FLIP-453: Promote Unified Sink API V2 to Public and Deprecate SinkFunction and some existing components of the Kafka connector, such as the KafkaMetadataService which was already introduced by the DynamicKafkaSource.

In essence, the Kafka Metadata Service (MDS) removes the burden of managing Kafka connection details and infrastructure changes from the Flink developer. This is an external component often hosted by Kafka infra teams (usually exposed as a REST API):

  • No Hardcoding: Instead of hardcoding Kafka broker addresses, topic names, and other details directly into your Flink code, you can fetch them dynamically from the MDS.
  • Dynamic Changes: If the Kafka cluster changes (e.g., a broker is added or removed, a topic is moved), the MDS reflects these changes. Your Flink application, using the MDS, can automatically adapt without manual intervention or redeployment.
  • Environment Agnostic: You can easily switch between development, testing, and production environments because the MDS provides the correct Kafka details for each environment. Your code doesn't need environment-specific configuration.
  • Easier Topic Management: The MDS helps you discover available topics and streams. You can find the right topic for your data without needing to know the exact Kafka cluster details. You use a logical "stream ID," and the MDS tells you the corresponding Kafka topic(s).

Stream Routing

The DynamicKafkaWriter does not directly send the data into SinkWriters but it introduces a new KafkaStreamRecordclass which is basically a wrapper for a Kafka ProducerRecord, and it contains additional information about the stream it sends the data to. The sink offers a KafkaStreamRecordSerializer<T> interface for users for specifying how to serialize a user defined type to KafkaStreamRecord. The KafkaStreamRecord will be routed to the final Kafka SinkWriter by an extensible router interface. In the initial version has a default implementation with a round-robin strategy in which records are routed to various cluster topics of the stream in a circular and equally distributed fashion.

Metadata State

The DynamicKafkaWriter caches and uses Flink's state management to store the mapping between logical streams and physical Kafka clusters/topics. The writer uses a background thread and a reconciliation process to dynamically update this state based on information from the MDS. This state is check-pointed for fault tolerance and becomes the "source of truth." when recovering its internal state. Flink state becomes the "source of truth" after fetching this metadata from the MDS, the initial source of truth is the MDS. This allows the Flink job to properly handle the unavailability of the KafkaMetadataService at startup in most cases. If there is no unknown stream in state, then the sink has all required metadata to produce to Kafka clusters. The DynamicKafkaWriter is stateful with DynamicKafkaWriterState containing the metadata for streams.

Metadata Reconciliation Mechanism

When a metadata change is detected, the reconciliation mechanism in the DynamicKafkaWriter needs to modify its internal state to reflect the fresh metadata. This means removing SinkWriters that are not needed anymore, by flushing their internal buffers, launching new SinkWriters, as well as updating the record router pointing to the new writers. Reconciliation is necessary along the record flow and on checkpointing as well. Checking when metadata has changed requires expensive IO and it can be especially heavy in the per record flow. A sample caching mechanism is needed, with periodic refresh in DynamicKafkaWriter.

Data Guarantees and Consistency

This DynamicKafkaSink only supports at least once semantics as of now. The sink flushes all buffered records during checkpoint and during stream metadata change reconciliation. Since data is written to one cluster topic from the set represented by a stream, all data can be read from downstream consumers by consuming all cluster and topics, assuming Kafka availability.

Public Interfaces

This proposal does not include any changes to existing public interfaces of the KafkaSink or DynamicKafkaSourceconnectors. A new DynamicKafkaSink and builder API serves as the main entry point of this connector. Users have to implement a KafkaStreamRecordSerializer<T> to convert from Flink data types to KafkaStreamRecord These interfaces are marked with @Experimental annotation. All other APIs is marked with @Internal annotation in this proposal. The KafkaMetadataService is an existing component in the DynamicKafkaSource. The new sink goes into the Kafka connector module and follow any connector repository changes of Kafka Sink.


DynamicKafkaSink.<Long>builder()
.setSubscribedStreamIds(Set.of("stream1"))
.setKafkaMetadataService(KafkaMetadataServiceImpl())
.setKafkaStreamRecordSerializer(new LongKafkaStreamRecordSerializer("stream1"))
.setProperty("stream-metadata-discovery-interval-ms", "1")
.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100000")
.build()

Proposed Changes

DynamicKafkaSink

This sink is capable of producing data into multiple Kafka topics located on multiple Kafka clusters, as well as dynamically adjusting the output clusters and topics based on the specified stream ids.

@Experimental
public class DynamicKafkaSink<T> implements StatefulSink<T, DynamicKafkaWriterState> {
    private final KafkaStreamRecordSerializer<T> kafkaStreamRecordSerializer;
    private final Set<String> subscribedStreamIds;
    private final KafkaMetadataService kafkaMetadataService;
    private final Properties kafkaProducerConfig;
    
    public static <T> DynamicKafkaSinkBuilder<T> builder() {
        return new DynamicKafkaSinkBuilder<>();
    }    
    ...
 }

KafkaStreamRecord

An analog solution to the Kafka ProducerRecord. The KafkaStreamRecord also contains the key, value, headers, etc. but, most importantly, the stream ID to which the record belongs.

@Experimental
public class KafkaStreamRecord {
    private final String streamId;
    private final Headers headers;
    private final byte[] key;
    private final byte[] value;
    private final Long timestamp;
    ...
}

KafkaStreamRecordSerializer<T>

This serializer is responsible for converting the incoming Flink data elements (T) into KafkaStreamRecord objects.

@Experimental
public interface KafkaStreamRecordSerializer<T> extends Serializable {    
    default void open(SerializationSchema.InitializationContext context) throws Exception {}
    KafkaStreamRecord serialize(T element, Long timestamp);
}

ReconcilableStreamRouter and RoundRobinReconcilableStreamRouter

ReconcilableStreamRouter is a crucial component for handling dynamic changes. It takes a KafkaStreamRecord and determines the correct ClusterTopic to which the record should be written. The reconcile method is used to update the router's internal state when the stream metadata changes. RoundRobinReconcilableStreamRouter is the default implementation of the ReconcilableStreamRouter with a round-robin distribution of records across available clusters/topics for a given stream.

@Internal
public interface ReconcilableStreamRouter extends Serializable {    
    ClusterTopic route(KafkaStreamRecord record);  
    void reconcile(Map<String, Set<ClusterTopic>> streamMetadataMap);
}

DynamicKafkaWriter

DynamicKafkaWriter is the class where all the components are wired together providing the ability to write to multiple, dynamically changing Kafka clusters. It leverages a metadata service, a stream router, and a collection of individual Kafka producers to achieve this. The key to its dynamic nature is the reconcileState() method, which constantly checks for and adapts to changes in the Kafka cluster and topic metadata.

@Internal
public class DynamicKafkaWriter<T> implements StatefulSinkWriter<T, DynamicKafkaWriterState> {    
    
    DynamicKafkaWriter(
        reconcileState();
        ...
    );

    @Override
    public void write(T element, Context context) throws IOException, InterruptedException {
        reconcileState();
        ...
    }
    
    @Override
    public void flush(boolean endOfInput) throws IOException, InterruptedException {
        reconcileState();
        ...
    }
    
    @Override
    public List<DynamicKafkaWriterState> snapshotState(long checkpointId) {
        ...
    }    
    ...
}

write(T element, Context context):

  • This is the main method called for each incoming data element.
  • It first calls reconcileState() to check for and apply any metadata changes.
  • It uses the KafkaStreamRecordSerializer to convert the element to a KafkaStreamRecord.
  • It uses the ReconcilableStreamRouter to determine the target ClusterTopic.
  • It creates a ProducerRecord (the standard Kafka record type).
  • It retrieves the appropriate SinkWriter from it’s internal cache.
  • It calls write() on the selected SinkWriter to send the record to Kafka.

reconcileState():

  • This is the heart of the dynamic behavior. It's called in the constructor, before each write() and during flush().
  • It checks if there are any metadata changes.
  • If there are changes:
    • It calls the router.reconcile() to update the router.
    • It removes the SinkWriter instances for clusters that are no longer part of any stream
    • It adds new SinkWriter instances for any new clusters

flush(boolean endOfInput):

  • Called to flush any buffered records to Kafka.
  • Calls reconcileState() to ensure any pending metadata changes are applied.
  • Calls flush() on all SinkWriter instances

snapshotState(long checkpointId):

  • Called by Flink during checkpointing to save the writer's state.
  • Returns a list containing a DynamicKafkaWriterState object, which holds the metadata cache. This allows the writer to be restored to the correct state after a failure.

close():

  • Called when the writer is closed (e.g., when the Flink job ends). Shuts down all resources (SinkWriter, MetadataService) properly.

DynamicKafkaWriterState

This class represents the state of the DynamicKafkaWriter. In essence, DynamicKafkaWriterState is a snapshot of the writer's current understanding of the Kafka Stream to cluster/topic mapping. This is the critical piece of information that gets saved during checkpointing and restored during recovery.

@Internal
public class DynamicKafkaWriterState {
    private final Map<String, Set<ClusterTopic>> streamMetadataMap;

    public DynamicKafkaWriterState(Map<String, Set<ClusterTopic>> streamMetadataMap) {
        this.streamMetadataMap = streamMetadataMap;
    }

    public Map<String, Set<ClusterTopic>> getStreamMetadataMap() {
        return streamMetadataMap;
    }
}

ClusterTopic

It's a simple pairing of a cluster identifier and a topic name.

@Internal
public class ClusterTopic {
    private final KafkaClusterIdentifier kafkaClusterIdentifier;
    private final String topic;
}

KafkaClusterIdentifier

Uniquely identifies a Kafka cluster. This is important because the DynamicKafkaWriter can write to multiple clusters. It has a user-friendly, short, and unique name for the Kafka cluster. This is used for things like metrics. It should be unique at least within the scope of a single Flink job. The bootstrap servers string (e.g., "kafka1:9092,kafka2:9092") used to connect to the Kafka cluster. Apart from the bootstrap servers the remaining producer configuration is identical across all SinkWriter instances.

@Internal
public class KafkaClusterIdentifier implements Comparable<KafkaClusterIdentifier>, Serializable {
    private final String name;
    private final String bootstrapServers;
}

Compatibility, Deprecation, and Migration Plan

TBD

Test Plan

This will be tested by unit and integration tests. The work will extend existing KafkaSource and DynamicKafkaSource test utilities in Flink to exercise multiple clusters.

Rejected Alternatives

None

  • No labels