DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Motivation
In the current implementation of Apache Flink's AsyncSinkWriter, batching is primarily controlled by parameters such as maxBatchSize & maxBatchSizeInBytes.
While these parameters are effective for general use cases, they lack the flexibility to accommodate more complex batching strategies, such as grouping events based on specific attributes like partition keys.
For sinks such as Cassandra: it supports batch writes via BatchStatement, but not all batch writes are efficient.
Batch inserts perform best when they group data that belongs to the same partition key.
For the current implementation, requests that are randomly grouped together, possibly across multiple partitions. This causes poor write performance in Cassandra due to cross-partition batches.
Moreover, relying solely on a queue-based structure like Deque may not always be ideal, as some batching strategies require more efficient lookup or prioritization mechanisms, making alternative data structures more suitable.
Goal
This FLIP proposes introducing a pluggable batching mechanism that allows users to define custom batching strategies tailored to their specific requirements.
Additionally, this proposal ensures that sink implementers are not restricted to a single data structure (Deque) for buffering requests. With customized batch creators, different batching strategies may require alternative data structures that better align with their performance needs, lookup efficiency, or prioritization logic. Hence, this FLIP also provides the flexibility to choose the most suitable storage (data structure) mechanism for efficient batch formation.
Public Interfaces
- Introduction of the BufferWrapper Interface
A new interface for managing buffered request entries in an async sink, enabling sink implementations to define and optimize their own data structures for request buffering.
It provides methods for adding, polling, and peeking entries, computing the total byte size, and retrieving the buffered state for checkpointing the current elements in the buffer.
public interface BufferWrapper<RequestEntryT extends Serializable> { long totalSizeInBytes(); |
- Implementation of Batch Class
This class encapsulates the result of the batching process, including the batch entries, their total size in bytes, and the record count.
public class Batch<RequestEntryT extends Serializable> { |
- Introduction of the BatchCreator Interface
The BatchCreator interface introduces a flexible mechanism for defining custom batching strategies in an async sink. It leverages the BufferWrapper to retrieve buffered elements, allowing clients to implement optimized batch formation.
public interface BatchCreator<RequestEntryT extends Serializable> { Batch<RequestEntryT> createNextBatch(RequestInfo requestInfo, BufferWrapper<RequestEntryT> bufferedRequestEntries ); |
- Default Implementation - DequeBufferWrapper
A buffer wrapper implementation that uses the current ArrayDeque as the underlying data structure. The logic for Dequeue currently remains the same, it is just abstracted out into a BufferWrapper.
public class DequeBufferWrapper<RequestEntryT extends Serializable> implements BufferWrapper<RequestEntryT> { private long totalSizeInBytes; totalSizeInBytes = 0L; totalSizeInBytes += entry.getSize(); public int size() { return buffer.size(); } public Collection<RequestEntryWrapper<RequestEntryT>> getBufferedState() { if (entry != null) { totalSizeInBytes = Math.max(0, totalSizeInBytes - entry.getSize()); } return entry; @Override public long totalSizeInBytes() { return totalSizeInBytes; } |
- Default Implementation - SimpleBatchCreator
Default implementation similar to the current implementation will be provided, keeping it backward compatible.
public class SimpleBatchCreator<RequestEntryT extends Serializable> |
Proposed Changes
- Integration into AsyncSinkWriter
The AsyncSinkWriter will be modified to utilize the BatchCreator & BufferWrapper for batch formation and customizing the storage for Buffered entries.So an additional constructor would be added to Async Sink Writer, and default values will be initialized.
private final BatchCreator<RequestEntryT> batchCreator; // The Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries is replaced with BufferWrapper. private final BufferWrapper<RequestEntryT> bufferedRequestEntries; // The totalByte size calculation is no longer required to be done in the AsyncSinkWriter, as it is done by the BufferWrapper // private double bufferedRequestEntriesTotalSizeInBytes; // Old Constructor will be deprecated @Deprecated public AsyncSinkWriter( // Default implementation. // Other Initialization } |
- Modification to flush() & nonBlockingFlush() Method
private void flush() throws InterruptedException { numBytesOutCounter.inc(entries.getSizeInBytes()); numRecordsOutCounter.dec(entries.getRecordCount()); // No Change to the core logic, just instead of using, // bufferedRequestEntriesTotalSizeInBytes → bufferedRequestEntries.totalSizeInBytes() would be used. private void nonBlockingFlush() throws InterruptedException { while (!rateLimitingStrategy.shouldBlock(createRequestInfo()) && (bufferedRequestEntries.size() >= getNextBatchSizeLimit() || bufferedRequestEntries.totalSizeInBytes() >= maxBatchSizeInBytes)) { flush(); } } |
- Updates to the addEntryToBuffer() Method
if (insertAtHead) { bufferedRequestEntriesTotalSizeInBytes += entry.getSize(); |
Will just be replaced with one line.
bufferedRequestEntries.add(entry, insertAtHead); |
- A new constructor method() to be added BufferedRequestState
A simple constructor added to incorporate BufferWrapper within the BufferedRequestState.
public BufferedRequestState(BufferWrapper<RequestEntryT> bufferWrapper) { this.bufferedRequestEntries = new ArrayList<>(bufferWrapper.getBufferedState()); this.stateSize = calculateStateSize(); } |
With a customizable BatchCreator, developers can define their own batching logic to efficiently process buffered requests when the buffer reaches capacity.
With a customizable BufferWrapper, sink implementers are no longer restricted to using Deque for buffering requests. They can choose the most suitable data structure for their batching logic and request prioritization.
NOTE: Issues with using SPI,Reflection for BatchCreator and BufferWrapper Initialization
No Support for Parameterized Constructors: ServiceLoader/Reflection requires implementations to have a no-arg constructor. Many batching strategies require configuration parameters (e.g., maxBatchSize, flushInterval), which cannot be passed through SPI. If a sink implementer needs different configurations for BatchCreator, they cannot provide parameters via SPI.
Different implementations may require different dependencies, making it difficult to dynamically instantiate with unknown arguments.
Compatibility, Deprecation, and Migration Plan
- The existing batching logic will not be removed but will be replaced by the default SimpleBatchCreator.
- The existing Buffering Datastructure would remain as Dequeue and the functionality is replicated in DequeBufferWrapper.
- Users who do not need custom batching can continue using the default implementation without any changes.
- Users who do not need custom data structure would continue using the default queue without any changes.
- This change will be fully backward compatible, and no public API’s would be changed.
Test Plan
- Unit Tests: Validate the functionality of the BatchCreator interface and the default SimpleBatchCreator implementation.
- Integration Tests: Ensure that the AsyncSinkWriter correctly integrates with BatchCreator/BufferWrapper and produces expected batching behavior.
Future Work
This proposal can be extended in future versions of Flink to support additional pluggable mechanisms proposed within FLIP-284