Proposers
Approvers
Status
Current state:
Current State | |
---|---|
UNDER DISCUSSION | |
IN PROGRESS | |
ABANDONED | |
COMPLETED | |
INACTIVE |
Discussion thread: here
JIRA: here
Released: <Hudi Version>
Abstract
The goal is to build a Kafka Connect Sink that can ingest/stream records from Apache Kafka to Hudi Tables. Since Hudi is a transaction based data lake platform, we have to overcome a few challenges to coordinate the transactions across the tasks and workers in the Kafka Connect framework. In addition, the Hudi platform runs multiple coordinated data and file management services and optimizations, that have to be coordinated with the write transactions.
To achieve this goal today, we can use the deltastreamer
tool provided with Hudi, which runs within the Spark Engine to pull records from Kafka, and ingests data to Hudi tables. Giving users the ability to ingest data via the Kafka connect framework has a few advantages. Current Connect users can readily ingest their Kafka data into Hudi tables, levering the power of Hudi's platform without the overhead of deploying a spark environment.
Background
To appreciate the design proposed in this RFC, it is important to understand Kafka Connect. It is a framework to stream data into and out of Apache Kafka. The core components of the Connect framework that are relevant to this RFC are connectors, tasks, and workers. The connector instance is a logical job that manages the copying of data from Kafka to another system. A connector instance manages a set of tasks that actually copy the data. Using multiple tasks allows for parallelism and scalable data copying. Connectors and tasks are logical execution units that are scheduled within workers. In distributed mode, workers are run across a cluster to provide scalability and fault tolerance. All workers can be configured with the same group.id
and the connect framework automatically manages the execution of tasks across all available workers. As shown in the figure below, tasks are distributed across workers, and each task manages one or more distinct partitions of the Kafka topic.
On system initialization, the workers rebalance the set of tasks so that each worker has a similar amount of work. Dynamically, the system may rebalance when the number of partitions or tasks changes. In addition, on the failure of a worker, the tasks are re-assigned to the other workers to ensure fault tolerance as shown in the figure below.
Goals
We propose to build a Kafka Connect Sink for Hudi with the following assumptions:
- Initially, we will only support Bulk insert, aka append-only. This means that incoming records (immutable data) from Kafka will be appended to log files in Hudi format.
- We will guarantee Exactly-once delivery, no missing records, and no de-dup required.
- The sink will be built in a way that the transactions are centrally coordinated, making it easier to provide other file management optimizations in hudi, such as clustering and compaction. This ensures that the target hudi tables query performance is optimal, despite streaming/ incremental ingestion.
Proposed Solution
The high-level architecture is shown in the figure below. To coordinate the write transactions across the different connect tasks, we require a Coordinator
, Participants
that manage each partition, and a communication channel across the coordinator and all the participants. Since Kafka Connect already assigns unique partitions across tasks, we simply pick the task assigned with partition-0
to be running the coordinator instance. We instantiate one or more participant instances within the Connect tasks to manage the data writes per partition. For instance, Task2 will have 2 participants, one for partition-3 and the other for partition-4 in the figure below.
To avoid additional dependencies, we simply use a dedicated control topic (with 1 partition) to be used for all communication traffic between the coordinator and the participants. To start a write transaction, the coordinator broadcasts a START-COMMIT message with the new commit time. On receiving the START-COMMIT message, each participant starts consuming the Kafka records for the corresponding partition from the connect framework and appends the records to the Hudi file group that is specific to the partition. To ensure that file groups are unique to a partition, we can encode the file group id using the partition id. On the end of a transaction interval, the coordinator sends an END-COMMIT message. Once the participants receive an END-COMMIT, they flush all the existing records to the file, and send back a WRITE-STATUS message with the required details of the write that is required to commit the logs in Hudi. It also includes the offset of the last written Kafka record in the message. Once the coordinator receives all the WRITE-STATUS messages, one per partition, it writes the commit files, including the last written Kafka offsets per partition and sends back a ACK-COMMIT message to acknowledge the commit. Once a participant receives the ACK-COMMIT message, it commits the last written Kafka offset to the Connect platform and waits until the next START-COMMIT message before starting to write records to Hudi files again.
Implementation
To explain the design and implementation in detail, we depict the components of the system in the figure below. We will implement a class called HudiSinkTask
that implements the SinkTask interface defined in Kafka connect. For simplicity, we only show the components of a single Sink task (HudiSinkTask-0
) that handles partitions 0 and 2. Since the task handles partition 0, we will instantiate a Coordinator daemon that will be implemented by TransactionCoordinator
.
KafkaCommunicationAgent
is a singleton class with an instance per Connect Worker shared across all tasks belonging to the same worker process. KafkaCommunicationAgent
implements the Kafka Consumer subscribed to the Kafka control topic in a dedicated thread. Each instance uses a unique group-id
based on the worker id to ensure that each instance belongs to a different Kafka group and hence all workers receive all the messages in the topic. On initialization, the KafkaCommunicationAgent
subscribes to the topic at the latest offset, so it only receives messages posted after its instantiation. While we can improve the efficiency of the system during failures by processing the state messages in the past, we chose this design choice to simplify the system over efficiency.
TransactionParticipant
class implements the processing of records per partition. Since in this case, Task0 is responsible for partitions 0 and 2, the HudiSinkTask
will instantiate two instances of TransactionParticipant
that will each handle a single partition. Each participant maintains a buffer for incoming Kafka records and an event queue that is used by the KafkaCommunicationAgent
to asynchronously submit control messages received from the Kafka control topic. The state machine manager represents the state of each participant based on events received from the coordinator via the control topic. Finally, the HudiJavaWriter
is the java based implementation of the Hudi Writer that will read records from the incoming buffer and append the records to the Hudi Filegroup.
Connect Sink Task
The HudiSinkTask
pseudo code with key interface implementations is shown below. On initialization or when a new partition is assigned to the task (OPEN
API), a new instance of the TransactionParticipant
is assigned to the partition. If the task manages partition 0, then an instance of TransactionCoordinator
is also created, which runs continuously in a separate thread.
Kafka connect calls the PUT
call periodically with records read from Kafka. First, the records are added to the buffer of the corresponding TransactionParticipant
based on the partition. Then we execute the processRecords
method of TransactionParticipant
that runs the state machine followed by writing of the records to Hudi files. The implementation of TransactionParticipant
is explained in more detail in the next section.
Kafka connect calls the PRECOMMIT
periodically (the interval can be configured by user) to commit the last Kafka offset. The offset is returned by the TransactionParticipant
, which is updated based on the written records that were committed. Since Kafka offsets are committed as Hudi files are committed, we suggest setting the interval for PRECOMMIT
similar to the transaction intervals.
Coordinator State Machine
The coordinator state machine is implemented as TransactionCoordinator
is shown in the Figure below. On the initialization of the system, after a pre-configured delay, the coordinator sends a control message START-COMMIT
. In the case the worker where the coordinator is running fails, and the Connect framework re-assigns partition 0 to another Connect worker, the new coordinator does not have any state about the previous coordinator. The only state that the new coordinator has is the Kafka offsets that were committed for each partition, which it reads from the latest Hudi Commit file. The new coordinator sends a START-COMMIT
message to start a new transaction. If a transaction initiated by the previous coordinator is ongoing, all participants will discontinue that transaction in favor of the new transaction.
After a pre-configured, fixed transaction interval, the coordinator will broadcast an END-COMMIT
and wait for receiving the WRITE-STATUS
from all participants. The WRITE-STATUS
messages will contain information about the previous transaction, including the Hudi files, where records were appended, the commit time, the last written Kafka offset, etc. At this stage, the coordinator also triggers a timeout event that can be configured by the user. Upon expiration of this timeout, if the WRITE-STATUS
is not received from all participants, the coordinator will discard the current transaction and send a START-COMMIT
to trigger the start of a new transaction. This will take care of the failures of one or more tasks during the transaction. Although the Connect framework will re-assign those partitions to other existing tasks, we avoid newly assigned participants to write records for an ongoing transaction.
If it does receive the WRITE-STATUS
from all participants, the coordinator will write the Hudi commit file for this transaction and send a ACK-COMMIT
to the participants for them to commit the Kafka offsets. Henceforth, the coordinator will trigger a START-COMMIT
to trigger a new transaction.
Participant State Machine
When initialized, TransactionParticipant
pauses the Kafka messages using SinkTaskContext
. This signals to the Connect platform to avoid consuming records unless we receive a START-COMMIT
from the coordinator. On every PUT call in the sink task, the processRecords
method is called. Firstly, it processes the latest events from the Kafka control topic from the coordinator and adjusts the state accordingly. When a START-COMMIT
is received, the current transaction is discarded if there was one ongoing. This may happen when the worker running the coordinator thread fails, resulting in Connect framework re-assigning the partition 0 to another worker. The records written in the previous transaction will still exist, but since they are not committed by Hudi, they will be cleaned up later. In addition, the local Kafka Commit offset is reset to the one sent in the START-COMMIT
message from the coordinator. In most cases, the local offset should match the one with the coordinator, but in case of task failures, the local offset may need to be updated. In that case, we resort to the Kafka offset from the coordinator as the source of truth. This avoids duplication of records since the Kafka offset was committed in Hudi and is in sync with the data records written and committed in Hudi. A new transaction is started, and the Kafka message consumption is resumed using the resume API provided by the Connect framework in SinkTaskContext
.
If an END-COMMIT
is received that is meant for the current ongoing transaction, the Kafka message consumption is paused, and the Write Status of the transaction is sent to the coordinator. Following that, an ACK-COMMIT
event is received that confirms the coordinator has committed the transaction and the Kafka commit offset is reset to represent the latest record that was written and committed by Hudi. In the TransactionParticipant
, we do not have timeouts to handle failures, instead, we resort to receiving a START-COMMIT
from the coordinator to reset the state of all participants.
Once the state machine is updated, the writeRecords
method flushes all the records from the buffer and writes them to Hudi using the HudiJavaClient
A local variable storing the last written Kafka records is maintained as records are written. At the start of the transaction, this local variable is initialized to the Kafka offset committed to ensuring we do not miss any records.
Configurations
To use this connector, specify the name of the connector class in the connector.class
configuration property.
connector.class=org.apache.hudi.connect.HudiSinkConnector
Connector-specific configuration properties are described below.
Hudi
target.base.path
schemaprovider.class
write.ignore.failed
hoodie.table.base.file.format
hoodie.allow.empty.commit
hoodie.writestatus.class
Connector
kafka.topic.names
kafka.bootstrap.servers
kafka.tls.certificate
tasks.max
key.converter
value.converter
value.converter.schemas.enable
format.class
flush.size
enhanced.avro.schema.support
connect.meta.data
avro.codec
parquet.codec
Transaction Coordination Protocol
coordinator.writestatus.timeout
write.retry.timeout
Rollout/Adoption Plan
Test Plan
We have validated the working of the protocol by building a PoC. In the current PoC, we have not integrated with the Hudi Write Client, but we have implemented the transaction protocol within the Connect platform. We have implemented a Simple File Writer that mimics the Hudi writer, and have validated that no duplicate or missing records were found. We also tested for cases of worker failures that caused either the Coordinator instance to fail and restart or caused one or more Participant instances to get re-assigned to another worker.