Status
Current state: accepted
Discussion thread: https://lists.apache.org/thread/ym9skcsfxbgr33y37wjcbdw06tdtzfgx https://lists.apache.org/thread/fygvbwrwmltgtdvsglhz147k6g0fdn1t
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink jobs frequently exchange data with external systems, which consumes their network bandwidth and CPU. When these resources are scarce, pulling data too aggressively can disrupt other workloads. Typical situations are:
A shared Kafka cluster with limited egress bandwidth. If one Flink job monopolises the link, other consumers fall behind.
A MySQL server used both for online queries and data synchronisation. If a Flink job scans too fast, it saturates the server’s IOPS and slows down user queries.
To avoid such problems, Flink needs a way to rate-limit its interaction with external systems.
Flink connects to external systems through three kinds of components:
This FLIP focuses on rate limiting for Scan Sources first; Lookup Sources and Sinks will be addressed in follow-up proposals.
Current Status
FLIP-238 added the DataGeneratorSource, which uses a RateLimitedSourceReader to throttle data generation. Because this reader is just a thin wrapper around an arbitrary SourceReader, other connectors could in principle reuse it.
However, the RateLimitedSourceReader limits the processing speed based on the RecordsWithSplitIds, not the Record based limitation. In some case, one RecordsWithSplitIds may contains uncertain quantity Records(For example, the number of record pulled by Kafka per poll), and it is not precise.
This FLIP therefore proposes extensions that let connectors adopt fine-grained, record-level rate limiting more easily.
Public Interfaces
Existing interfaces and implementations
FLIP-238 introduced a RateLimiter interface to control the request and a RateLimiterStrategy interface to create the RateLimiter. We will develop based on these two interfaces.
RateLimiter
The interface of CompletionStage<Void> acquire() in RateLimiter was replaced with CompletionStage<Void> acquire(int requestSize).
Add a new interface of notifyAddingSplit(SplitT split), This applies to situations where there may be different types of splits for Hybrid Source.
/** The interface to rate limit execution of methods. */
@NotThreadSafe
@Experimental
public interface RateLimiter<SplitT extends SourceSplit> {
/**
* Returns a future that is completed once another event would not exceed the rate limit. For
* correct functioning, the next invocation of this method should only happen after the
* previously returned future has been completed.
*
* @param requestSize The number of records that will be emitted.
*/
CompletionStage<Void> acquire(int requestSize);
/**
* Notifies this {@code RateLimiter} that the checkpoint with the given {@code checkpointId}
* completed and was committed. Makes it possible to implement rate limiters that control data
* emission per checkpoint cycle.
*
* @param checkpointId The ID of the checkpoint that has been completed.
*/
default void notifyCheckpointComplete(long checkpointId) {}
/**
* Notifies this {@code RateLimiter} that a new split has been added.
* the result of before acquire(int requestSize) method call is ensured to be completed when calling this method.
*/
default void notifyAddingSplit(SplitT split) {}
}
RateLimiterStrategy
The existing RateLimiterStrategy implementation will be retained and updated accordingly:
New constructor for SourceReader
Add new constructors for SourceReaderBase and SingleThreadMultiplexSourceReaderBase. The respective new constructors should accept a new parameter @Nullable RateLimiterStrategy rateLimiterStrategy. this rateLimiterStrategy is used to limit the request rate.SourceReaderBase
New constructor of SourceReaderBase(All other constructors will be remained):
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
......
private RateLimiter rateLimiter;
/**
* The primary constructor for the source reader.
*
* <p>The reader will use a handover queue sized as configured via {@link
* SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
*/
public SourceReaderBase(
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
this(splitFetcherManager, recordEmitter, null, config, context);
}
public SourceReaderBase(
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
this.elementsQueue = splitFetcherManager.getQueue();
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
this.splitStates = new HashMap<>();
this.options = new SourceReaderOptions(config);
this.config = config;
this.context = context;
this.noMoreSplitsAssignment = false;
this.eofRecordEvaluator = eofRecordEvaluator;
numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
}
/**
* New constructor.
*/
public SourceReaderBase(
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context,
@Nullable RateLimiterStrategy rateLimiterStrategy) {
this.elementsQueue = splitFetcherManager.getQueue();
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
this.splitStates = new HashMap<>();
this.options = new SourceReaderOptions(config);
this.config = config;
this.context = context;
this.noMoreSplitsAssignment = false;
this.eofRecordEvaluator = eofRecordEvaluator;
this.rateLimiterStrategy = rateLimiterStrategy;
numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
}
......
}
SingleThreadMultiplexSourceReaderBase
New constructor of SingleThreadMultiplexSourceReaderBase(All other constructors will be remained):
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<
E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
/**
* The primary constructor for the source reader.
*
* <p>The reader will use a handover queue sized as configured via {@link
* SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
*/
public SingleThreadMultiplexSourceReaderBase(
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(
new SingleThreadFetcherManager<>(splitReaderSupplier, config),
recordEmitter,
config,
context);
}
/**
* This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link
* FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
*/
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(splitFetcherManager, recordEmitter, config, context);
}
/**
* This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link
* FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and {@link
* RecordEvaluator}.
*/
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context);
}
/**
* New constructor.
*/
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context,
@Nullable RateLimiterStrategy rateLimiterStrategy) {
super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context, rateLimiterStrategy);
}
}
Proposed Changes In SourceReaderBase
New fields
RateLimiter rateLimiter – instantiated from the RateLimiterStrategy that the constructor receives.CompletionStage<Void> rateLimitFuture – remembers the future returned by the most recent rateLimiter.acquire(...) call.
New inner class
If rateLimiter is present, the reader wraps the real SourceOutput in a RateLimitedSourceOutputWrapper, which simply counts how many records have been forwarded since the last call to acquire.
Class sketch of RateLimitedSourceOutputWrapper:
/** This output counted the amount of record emitted. */
private static final class RateLimitedSourceOutputWrapper<T> implements SourceOutput<T> {
final SourceOutput<T> sourceOutput;
final RateLimiter rateLimiter;
// The amount of data collected in each recordEmitter.emitRecord call.
int recordCount;
public RateLimitedSourceOutputWrapper(
SourceOutput<T> sourceOutput, RateLimiter rateLimiter) {
this.sourceOutput = sourceOutput;
this.rateLimiter = rateLimiter;
this.recordCount = 0;
}
@Override
public void emitWatermark(Watermark watermark) {
sourceOutput.emitWatermark(watermark);
}
@Override
public void markIdle() {
sourceOutput.markIdle();
}
@Override
public void markActive() {
sourceOutput.markActive();
}
@Override
public void collect(T record) {
sourceOutput.collect(record);
recordCount++;
}
@Override
public void collect(T record, long timestamp) {
sourceOutput.collect(record, timestamp);
recordCount++;
}
/**
* Gets the number of records emitted.
*
* @return the number of records emitted.
*/
public int getRecordCount() {
return recordCount;
}
/** Resets the record count to 0. */
public void resetRecordCount() {
recordCount = 0;
}
}
Method of Change
Check the rateLimitFuture to confirm if the previous request exceeded the limit, if it did, return InputStatus.MORE_AVAILABLE directly and wait for the next pull.
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
......
// we need to loop here, because we may have to go across splits
while (true) {
// check if the previous record count reached the limit of ratelimiter.
if (rateLimitFuture != null && !rateLimitFuture.toCompletableFuture().isDone()) {
return trace(InputStatus.MORE_AVAILABLE);
}
// Process one record.
final E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
// emit the record.
numRecordsInCounter.inc(1);
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
// Acquire permit from rateLimiter.
if (rateLimiter != null) {
RateLimitedSourceOutputWrapper<T> rateLimitedSourceOutputWrapper =
(RateLimitedSourceOutputWrapper<T>) currentSplitOutput;
rateLimitFuture =
rateLimiter.acquire(rateLimitedSourceOutputWrapper.getRecordCount());
rateLimitedSourceOutputWrapper.resetRecordCount();
}
LOG.trace("Emitted record: {}", record);
// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
// more is available. If nothing more is available, the next invocation will find
// this out and return the correct status.
// That means we emit the occasional 'false positive' for availability, but this
// saves us doing checks for every record. Ultimately, this is cheaper.
return trace(InputStatus.MORE_AVAILABLE);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
// The fetch is done and we just discovered that and have not emitted anything, yet.
// We need to move to the next fetch. As a shortcut, we call pollNext() here again,
// rather than emitting nothing and waiting for the caller to call us again.
return pollNext(output);
}
}
}
Call RateLimiter#notifyCheckpointComplete method for each Splits to update the status of RateLimiter in SourceReaderBase#snapshot.
public List<SplitT> snapshotState(long checkpointId) {
List<SplitT> splits = new ArrayList<>();
splitStates.forEach(
(id, context) -> {
splits.add(toSplitType(id, context.state));
if (context.rateLimiter != null) {
context.rateLimiter.notifyCheckpointComplete(checkpointId);
}
});
return splits;
}
Call RateLimiter#notifyAddingSplit method for each Splits to update the status of RateLimiter in SourceReaderBase#moveToNextSplit.
private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
String nextSplitId = recordsWithSplitIds.nextSplit();
if (nextSplitId == null) {
LOG.trace("Current fetch is finished.");
this.finishCurrentFetch(recordsWithSplitIds, output);
return false;
} else {
this.currentSplitContext = (SplitContext)this.splitStates.get(nextSplitId);
Preconditions.checkState(this.currentSplitContext != null, "Have records for a split that was not registered");
Function<T, Boolean> eofRecordHandler = null;
if (this.eofRecordEvaluator != null) {
eofRecordHandler = (record) -> {
if (!this.eofRecordEvaluator.isEndOfStream(record)) {
return false;
} else {
SplitT split = this.toSplitType(this.currentSplitContext.splitId, this.currentSplitContext.state);
this.splitFetcherManager.removeSplits(Collections.singletonList(split));
return true;
}
};
}
if (this.rateLimiter != null) {
rateLimiter.notifyAddingSplit(this.toSplitType(this.currentSplitContext.splitId, this.currentSplitContext.state));
}
this.currentSplitOutput = this.currentSplitContext.getOrCreateSplitOutput(output, eofRecordHandler);
LOG.trace("Emitting records from fetch for split {}", nextSplitId);
return true;
}
}
Compatibility, Deprecation, and Migration Plan
Compatibility for existed connector
Adaption plan for external connector
Adapt DataGeneratorSource using the change in this FLIP.
Adapt the API to Kafka Source connector.
For other external connectors, it will depend on whether there is a demand of rate limit and their progress of bumping to Flink 2.0.
How to apply this FLIP in SQL connector
1.Define the rate limit of Source in Flink SQL.
Expose rate-limit properties in table DDL. Example for a Kafka source:
CREATE TABLE my_source (
id INT,
score INT
) WITH (
'connector' = 'kafka',
'scan.rate.limit.record-per-second' = '100000'
......
);
Note:scan.rate.records-per-second is only a placeholder key; the final option name may change.
The planner forwards this option to the connector via the DynamicTableFactory.
2.Connector Source created a RateLimiterStrategy using the configuration in SourceReaderContext.
For example, the createReader method in KafkaSource may add one line to create RateLimiterStrategy:
SourceReader<OUT, KafkaPartitionSplit> createReader(
SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
throws Exception {
......
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
elementsQueue, splitReaderSupplier::get, splitFinishedHook),
recordEmitter,
toConfiguration(props),
readerContext,
kafkaSourceReaderMetrics,
RateLimiterStrategy.perSecond(readerContext.getConfiguration().get("scan.rate.limit.record-per-second")));
}
3.SourceReader in connector should use the new constructor of SourceReaderBase(or SingleThreadMultiplexSourceReaderBase).
For example, the constructor of KafkaSourceReader in Kafka connector should be like:
public KafkaSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
elementsQueue,
KafkaSourceFetcherManager kafkaSourceFetcherManager,
RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState>
recordEmitter,
Configuration config,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
@Nullable RateLimiterStrategy rateLimiterStrategy) {
super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config, context, rateLimiterStrategy);
this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
this.commitOffsetsOnCheckpoint =
config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
if (!commitOffsetsOnCheckpoint) {
LOG.warn(
"Offset commit on checkpoint is disabled. "
+ "Consuming offset will not be reported back to Kafka cluster.");
}
}
The impact of this feature on checkpoints
Test Plan
We will adapt the change to DataGeneratorSource(Proposed in FLIP-238) firstly and add related tests to validate the proposed changes.
Rejected Alternatives
Global rate limitation
Using global rate limit can more accurately control the number of requests made to the server. However, this involves communication between the Operator and Coordinator though OperatorEvents, which will increases complexity and may also encounters instability issues caused by network communication.
Considering that the global limitation is the upper limit of the request, we can also meet the requirements by imposing limitations on each reader on average.
Acquire permission before fetching records
For unbounded split(MySQL CDC/Kafka and other MQ), we may not be able to determine the number of records before fetching data, because pull requests are limited by batch size/record count/request timeout. Therefore, we cannot use RateLimiter to assess requests.
Pass RateLimiter to SourceReader through SourceReaderContext
1.Add a new interface SupportsRateLimiting for Source to provide RateLimiter
public interface SupportsRateLimiting {
RateLimiter rateLimiter(int currentParallelism);
}
2.Add a new method getRateLimiter in SourceReaderContext to provide RateLimiter for SourceReader
public interface SourceReaderContext extends Serializable {
/** @return The metric group this source belongs to. */
SourceReaderMetricGroup metricGroup();
/** Gets the configuration with which Flink was started. */
Configuration getConfiguration();
......
/** @return The RateLimiter to control the reading rate of SourceReader. */
default RateLimiter getRateLimiter() {
throw new UnsupportedOperationException();
}
}
3.When creating SourceOperator in SourceOperatorFactory, if Source implement SupportsRateLimiting, SourceOperatorFactory will extract the RateLimiter from Source and pass to SourceOperator.
public <T extends StreamOperator<OUT>> T createStreamOperator(
StreamOperatorParameters<OUT> parameters) {
......
FunctionWithException<Integer, RateLimiter, Exception>
rateLimiterBuildFunction = currentParallelism -> {
if (source instanceof SupportsRateLimiting) {
return ((SupportsRateLimiting) source).rateLimiter(currentParallelism);
} else {
return null;
}
};
final SourceOperator<OUT, ?> sourceOperator =
instantiateSourceOperator(
parameters,
source::createReader,
gateway,
source.getSplitSerializer(),
watermarkStrategy,
parameters.getProcessingTimeService(),
parameters
.getContainingTask()
.getEnvironment()
.getTaskManagerInfo()
.getConfiguration(),
parameters
.getContainingTask()
.getEnvironment()
.getTaskManagerInfo()
.getTaskManagerExternalAddress(),
emitProgressiveWatermarks,
parameters.getContainingTask().getCanEmitBatchOfRecords(),
rateLimiterBuildFunction);
......
}
4.SourceOperator use RateLimiter to initialize SourceReaderContext.
public void initReader() throws Exception {
if (sourceReader != null) {
return;
}
final int subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
final SourceReaderContext context = new SourceReaderContext(){.....}
sourceReader = readerFactory.apply(context);
}
5.SourceReaderBase get the RateLimiter from SourceReaderContext.