DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Ongoing Vote
Discussion thread: https://lists.apache.org/thread/8v7wlx23svvwzb9568fos8lkxobcty7y and https://lists.apache.org/thread/r8oo4y7djss9r06np6ddcx0ffwtjgdt5
Vote thread: https://lists.apache.org/thread/hqxnmorm417ozbyhnqnbd5o2b03zm91d
JIRA: https://issues.apache.org/jira/browse/FLINK-26821
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The existing Cassandra Sink implementation in Flink is based on the deprecated RichSinkFunction (to be removed in future versions), which has a few limitations:
- The current implementation defines separate sink classes for each data type—CassandraPojoSink, CassandraTupleSink, CassandraRowSink, etc., each repeating very similar logic but tailored to a specific input type. This results in a higher maintenance burden and limited extensibility for new data formats.
- For example, Adding support for a new format like RowData would require writing an entirely new sink, duplicating logic for CQL preparation, session management, even though most of it is common across existing sinks.
- As mentioned above, there is no support for Flink's Table API or SQL API (RowData input format).
- Limited retry support and minimal configurability in error handling — while CassandraFailureHandler exists, it only allows post-failure hooks without integrated retry or backoff mechanisms.
- Lack of batching or control over Cassandra write semantics (e.g., partition-aware batching).
- Lastly, async writes in the original sink are managed using manual Semaphore control and raw FutureCallback handlers. These callbacks execute outside Flink’s threading and mailbox model.
Public Interfaces
Sink: https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
Async Sink: https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
Proposed Changes
This phase introduces a Cassandra Sink built on Flink's Sink V2 API. It replaces the legacy RichSinkFunction-based implementation with a config-based architecture. The new design separates configuration and execution logic to improve reusability and compatibility with Flink's checkpointing and threading model.
Key Goals:
- Use Flink Sink V2 interfaces (Sink, SinkWriter) for lifecycle integration
- Support multiple input types (POJO, Tuple, Row, RowData) through shared configuration
- Improve async handling by integrating with MailboxExecutor
- Add configurable retry handling and metrics
- Enable forward compatibility with future batching support (Phase 2)
Phase 1: Cassandra Sink V2 (No Batching)
Configurations
The sink uses a set of config classes to abstract different input types and serialization strategies:
- PojoSinkConfig<T>: For POJO-based Cassandra Mapper writes.
- CQLSinkConfig<T>: For Tuple, Row, or other structured types using a CQL statement
- RowDataSinkConfig: For Table/SQL API integration via RowData
All of these implement a common interface:
public interface CassandraSinkConfig<Input> extends Serializable {
CassandraRecordWriter<Input> accept(CassandraWriterVisitor visitor);
}
This allows the sink to delegate the creation of a record writer based on input type without duplicating logic.
Visitor
To map the config to a concrete writer implementation, a CassandraWriterVisitor is used:
public interface CassandraWriterVisitor {
<IN> CassandraRecordWriter<IN> visit(PojoSinkConfig<IN> config)
CassandraRecordWriter<Row> visit(RowSinkConfig config);
CassandraRecordWriter<Tuple> visit(TupleSinkConfig config);
CassandraRecordWriter<RowData> visit(RowDataSinkConfig config);
}
The default implementation of this visitor instantiates the appropriate writer:
public class DefaultWriterVisitor implements CassandraWriterVisitor {
private final ClusterBuilder clusterBuilder;
public DefaultWriterVisitor(ClusterBuilder clusterBuilder) {
this.clusterBuilder = clusterBuilder;
}
@Override
public <IN> CassandraRecordWriter<IN> visit(PojoSinkConfig<IN> config) {
return new PojoRecordWriter<>(clusterBuilder, config);
}
// Other visit methods
}
Record Writer
The core abstraction that powers the SinkWriter implementation is the CassandraRecordWriter<Input> interface:
public interface CassandraRecordWriter<Input> {
Session getSession();
Statement prepareStatement(Input input);
default ResultSetFuture write(Input input) {
return executeStatement(prepareStatement(input));
}
ResultSetFuture executeStatement(Statement statement);
void close();
}
This interface defines how input elements are translated into Cassandra write operations. It supports:
- Creating bound statements from records.
- Executing writes asynchronously via ResultSetFuture.
- Lifecycle management of the Cassandra session and cluster
Each format has its own CassandraRecordWriter<Input> implementation. This interface handles:
- Session and cluster creation.
- Statement preparation and binding.
- Async write submission
NOTE: The instantiation model is one CassandraRecordWriter per subtask. During CassandraSinkWriter construction, each Flink subtask creates and owns a writer instance.
Example for CQLRecordWriter:
@Override
public Statement prepareStatement(IN input) {
Object[] values = CqlStatementHelper.extractFields(input, config.getRecordFormatType());
return CqlStatementHelper.bind(preparedStatement, values, config.getIgnoreNullFields());
}
For Flink Table API inputs using RowData, RowDataRecordWriter uses field getters to extract typed fields from the logical schema:
public class RowDataRecordWriter extends AbstractRecordWriter<RowData> {
private final RowData.FieldGetter[] fieldGetters;
public RowDataRecordWriter(ClusterBuilder builder, RowDataSinkConfig config) {
this.session = builder.getCluster().connect();
PreparedStatement ps = session.prepare(config.getInsertQuery());
RowType rowType = (RowType) config.getRowDataType().getLogicalType();
this.fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
this.fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
}
@Override
public Statement prepareStatement(RowData input) {
Object[] values = new Object[fieldGetters.length];
for (int i = 0; i < fieldGetters.length; i++) {
values[i] = fieldGetters[i].getFieldOrNull(input);
}
return CqlStatementHelper.bind(preparedStatement, values, config.getIgnoreNullFields());
}
}
CqlStatementHelper provides reusable utilities to construct and bind CQL insert queries:
public static Insert getInsertQuery(CQLWriteOptions options) {
return QueryBuilder.insertInto(options.getKeyspace(), options.getTableName())
.values(options.getColumnNames(), placeholders);
}
It also extracts field values for binding:
public static Object[] extractFields(Object record, RecordFormatType formatType) {
switch (formatType) {
case TUPLE: return extractFromTuple(...);
case ROW: return extractFromRow(...);
default: throw new IllegalArgumentException("Unsupported format");
}
}
Sink Writer Integration
The CassandraSinkWriter performs core sink logic with the following behavior:
- Concurrency management with Semaphore: The SinkWriter limits the number of in-flight requests using a Semaphore (same as the current design) but integrates with Flink’s MailBox Executor.
while (!semaphore.tryAcquire(...)) {
mailboxExecutor.yield();
}
- Mailbox Executor Integration: As Flink provides a MailboxExecutor to ensure all async callbacks are thread-safe. The writer uses it to schedule all future completion handling. All success/failure callbacks are handled via Flink’s MailboxExecutor. This ensures:
- Thread-safety
- Avoidance of blocking task threads.
- Integration with Flink’s cooperative threading model.
mailboxExecutor.execute(() -> {
semaphore.release();
successfulRecordsCounter.inc();
}, "Handle success");
- Record Writing with Cassandra Record Writer: The CassandraRecordWriter is the core interface responsible for generating and executing Cassandra statements. It is selected at runtime via the visitor pattern based on the input config type. The writer receives input records, converts them to statements, and submits them asynchronously using:
ListenableFuture<ResultSet> future = cassandraRecordWriter.write(input);
The CassandraRecordWriter abstracts the Cassandra session, statement preparation, and execution.
Retryability : Failures are evaluated using CassandraFatalExceptionClassifier. If a failure is retryable, the same record is resubmitted recursively:
if (!isFatal && attempt < maxRetries) {
retryCounter.inc();
submitWithRetry(input, attempt + 1);
}
This logic ensures transient errors can be retried within the configured limit.
Checkpointing and Flush Handling: On a checkpoint or explicit flush, the sink waits for all in-flight requests to complete. This ensures checkpoint consistency:
flush() {
executePendingTasks();
}
executePendingTasks() {
while (semaphore.availablePermits() < maxConnections) {
mailboxExecutor.yield();
}
}
High-Level View:
To summarize here is a high-level flow:
- Sink Writer uses mailbox executor and futures to perform async writes.
- A semaphore enforces concurrency limits (maxConnections). <same as current sink>
- Retry logic is recursive with a max retry cap and failure classification.
- Writers are instantiated based on the config via a DefaultWriterVisitor.
- CQLStatementHelper supports structured data binding.
- flush() blocks until all permits are released, ensuring pending writes are complete.
User-Facing Module
DataStream API Usage
Here is an example for the POJO Sink
CassandraSink<PojoRecord> sink = new CassandraSink<>( new PojoSinkConfig<>(PojoRecord.class, "my_keyspace", mapperOptions), clusterBuilder, failureHandler, requestConfig );
Table API SQL Usage
CREATE TABLE cass_sink ( id STRING, value INT, ts TIMESTAMP(3) ) WITH ( 'connector' = 'cassandra', 'keyspace' = 'my_keyspace', 'table-name' = 'metrics', 'ttl' = '86400', 'write.consistency' = 'QUORUM', 'ignore-null-fields' = 'true' );
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
RequestConfiguration requestConfig =
new RequestConfiguration.Builder()
.setMaxConnections(5)
.setMaxRetries(2)
.setMaxTimeout(Duration.ofSeconds(3))
.build();
RowDataSinkConfig config =
new RowDataSinkConfig(
StringUtils.EMPTY,
true,
consumedDataType,
CQLWriteOptions.builder()
.keyspace(connectorConfig.keySpace)
.tableName(connectorConfig.tableName)
.ttlInSeconds(connectorConfig.ttl)
.consistencyLevel(
ConsistencyLevel.valueOf(connectorConfig.getWriteConsistencyLevel()))
.columnNames(List.of(fieldNames))
.build());
CassandraSink<RowData> sink;
try {
ClusterBuilder clusterBuilder = CassandraUtils.buildCluster(connectorConfig);
CassandraFailureHandler failureHandler =
failure -> {
throw new RuntimeException("Fatal Cassandra sink failure", failure);
};
sink = new CassandraSink<>(config, clusterBuilder, failureHandler, requestConfig);
} catch (Exception e) {
throw new RuntimeException("Failed to create Cassandra sink", e);
}
return SinkV2Provider.of(sink, this.parallelism);
}
Phase 2: Cassandra Sink V2 (Async Writes, With Batching)
This phase introduces a new asynchronous Cassandra sink that leverages Flink’s AsyncSinkWriter API (This will be in addition to the current SinkV2 <non-batch>) . It is focused on enabling efficient, partition-aware batching flushing for improved throughput and Cassandra write efficiency.
NOTE: Before implementing the batching-based async Cassandra sink, the connector must be upgraded to depend on Apache Flink 2.1. This is necessary because FLIP-509 (introduced in Flink 2.1) provides the required interfaces, such as BatchCreator and BufferWrapper, to support pluggable, partition-aware batching inside AsyncSinkWriter.
NOTE: Why can’t the current Async Sink be used?
According to DataStax documentation, using BATCH statements that span multiple partition keys is a known anti-pattern in Cassandra. This practice:
- Adds stress to the coordinator node
- Increases write latency, reduces throughput
- Can fail under load due to cross-partition write amplification
Before FLIP-509 (Flink 2.1), the built-in AsyncSinkWriter batches records using a flat FIFO queue, based solely on arrival order. It has no visibility into partition keys and therefore cannot construct partition-local batches. This change enables the Cassandra sink to:
- Group records by partition key before batching
- Construct BatchStatements that operate on a single partition
Key Goals:
- Migrate the Cassandra sink to Flink 2.1, enabling use of AsyncSinkWriter and FLIP-509 abstractions.
- Implement a new CassandraAsyncSink that extends Flink’s native async lifecycle (AsyncSinkWriter, BufferWrapper, BatchCreator).
- Support partition-aware batching, grouping records by partition key to maximize Cassandra write efficiency.
- Keep core components modular by reusing existing configs and record writers (i.e, Configs, Visitor & Record Writer will be reused)
Currently, the main branch of the Cassandra connector is built against Flink 1.18. As a prerequisite for batching support in Phase 2, the connector must first be migrated to Flink 2.1 (once it is released).
This migration will involve:
- Replacing deprecated components like AppendStreamTableSink with the Flink 2.1 equivalents.
- Testing any legacy RichSinkFunction paths for compatibility (if still exposed).
Once this migration is complete and a new version of the Cassandra connector targeting Flink 2.1 is released, work on the batching-based AsyncSinkWriter implementation can begin.
Once the connector is aligned with Flink 2.1, we will begin implementing the new CassandraAsyncSink using the following core abstractions:
- AsyncSinkWriter for efficient, checkpoint-coordinated, non-blocking I/O
- ElementConverter to attach partition metadata to buffered records
- BufferWrapper to group buffered entries by partition
- BatchCreator to form batches based on size, count, and partition key
The rest of this section details the design of the batching-aware Cassandra Async Sink.
The following components will be introduced:
- CassandraAsyncSink<InputT>: A Sink<InputT> implementation that sets up the async writer
- CassandraAsyncSinkWriter: Extends Flink’s AsyncSinkWriter<InputT, RequestEntryT>
- ElementConverter<InputT, RequestEntryT>: Converts user records into buffered request entries with metadata like partition key and size
- BatchCreator<RequestEntryT>: Creates batches from buffered entries for submission
- PartitionKeyExtractor: A lambda or method reference provided by the user that extracts a Cassandra partition key from each record. This is used to:
- Group records by partition key inside the BufferWrapper
- Form efficient BatchStatements that avoid cross-partition batching
- CassandraAsyncSinkBuilder: Collects Sink config and the Partition Key Extractor required for batching.
BatchCreator Design and Implementation: (For Async Sink)
The BatchCreator is responsible for creating the next batch of records to send to Cassandra from the buffer. Batching is done during flush, not during write, to keep record ingestion lightweight and ensure that batching decisions are made only when they are needed. Batches are grouped by partition key (So that BatchStatement can be created), since Cassandra requires BatchStatements to be partition-local for best performance and correctness.
Each record is evaluated once, and the partitionKeyExtractor is applied only at the point where a record is considered for batching. There is no upfront grouping or repeated scanning. We stop scanning as soon as a full batch is found, minimizing overhead.
Here is a high-level code implementation :
@Override
public BatchCreationResult<T> createNextBatch(RequestInfo requestInfo, RequestBuffer<T> buffer) {
Map<Object, List<T>> grouped = new LinkedHashMap<>();
Map<Object, Long> sizeByKey = new HashMap<>();
for (RequestEntryWrapper<T> wrapper : buffer.getBufferedState()) {
T record = wrapper.getRequestEntry();
Object key = partitionKeyExtractor.apply(record);
List<T> group = grouped.computeIfAbsent(key, k -> new ArrayList<>());
long currentSize = sizeByKey.getOrDefault(key, 0L);
if (group.size() < maxRecords && currentSize + wrapper.getSize() <= maxBytes) {
group.add(record);
sizeByKey.put(key, currentSize + wrapper.getSize());
}
if (group.size() == maxRecords || sizeByKey.get(key) >= maxBytes) {
break;
}
}
for (Map.Entry<Object, List<T>> entry : grouped.entrySet()) {
List<T> batch = entry.getValue();
if (!batch.isEmpty()) {
return new BatchCreationResult<>(batch, sizeByKey.get(entry.getKey()), batch.size());
}
}
return new BatchCreationResult<>(List.of(), 0L, 0);
}
Compatibility, Deprecation, and Migration Plan
- Async mode without batching can be used with the current Cassandra connector.
- Async mode will require Flink 2.1+ due to AsyncSinkWriter improvements. (FLIP-509)
- Legacy Cassandra sink implementations will be deprecated but not removed.
- Users can migrate to CassandraSink by providing appropriate config implementations.
Test Plan
- Unit Tests: Writers, statement binding, retry logic, and metrics.
- Integration Tests: End-to-end Flink job with Cassandra backend.
Rejected Alternatives
None
