Status

Current state: accepted

Discussion thread: https://lists.apache.org/thread/ym9skcsfxbgr33y37wjcbdw06tdtzfgx https://lists.apache.org/thread/fygvbwrwmltgtdvsglhz147k6g0fdn1t

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink jobs frequently exchange data with external systems, which consumes their network bandwidth and CPU. When these resources are scarce, pulling data too aggressively can disrupt other workloads. Typical situations are:

  • A shared Kafka cluster with limited egress bandwidth. If one Flink job monopolises the link, other consumers fall behind.

  • A MySQL server used both for online queries and data synchronisation. If a Flink job scans too fast, it saturates the server’s IOPS and slows down user queries.

To avoid such problems, Flink needs a way to rate-limit its interaction with external systems.

Flink connects to external systems through three kinds of components:

  • Bounded / Unbounded Scan Sources

  • Lookup Sources

  • Sinks

This FLIP focuses on rate limiting for Scan Sources first; Lookup Sources and Sinks will be addressed in follow-up proposals.

Current Status

FLIP-238 added the DataGeneratorSource, which uses a RateLimitedSourceReader to throttle data generation. Because this reader is just a thin wrapper around an arbitrary SourceReader, other connectors could in principle reuse it.

However, the RateLimitedSourceReader limits the processing speed based on the RecordsWithSplitIds, not the Record based limitation. In some case, one RecordsWithSplitIds may contains uncertain quantity Records(For example, the number of record pulled by Kafka per poll), and it is not precise.

This FLIP therefore proposes extensions that let connectors adopt fine-grained, record-level rate limiting more easily.

Public Interfaces

Existing interfaces and implementations

FLIP-238 introduced a RateLimiter interface to control the request and a RateLimiterStrategy interface to create the RateLimiter. We will develop based on these two interfaces.

RateLimiter

The interface of CompletionStage<Void> acquire() in RateLimiter was replaced with CompletionStage<Void> acquire(int requestSize).

Add a new interface of notifyAddingSplit(SplitT split), This applies to situations where there may be different types of splits for Hybrid Source.

RateLimiter
/** The interface to rate limit execution of methods. */
@NotThreadSafe
@Experimental
public interface RateLimiter<SplitT extends SourceSplit> {

    /**
     * Returns a future that is completed once another event would not exceed the rate limit. For
     * correct functioning, the next invocation of this method should only happen after the
     * previously returned future has been completed.
     *
     * @param requestSize The number of records that will be emitted.
     */
    CompletionStage<Void> acquire(int requestSize);

    /**
     * Notifies this {@code RateLimiter} that the checkpoint with the given {@code checkpointId}
     * completed and was committed. Makes it possible to implement rate limiters that control data
     * emission per checkpoint cycle.
     *
     * @param checkpointId The ID of the checkpoint that has been completed.
     */
    default void notifyCheckpointComplete(long checkpointId) {}

    /**
     * Notifies this {@code RateLimiter} that a new split has been added.
     * the result of before acquire(int requestSize) method call is ensured to be completed when calling this method.
     */
    default void notifyAddingSplit(SplitT split) {} 
}

RateLimiterStrategy

The existing RateLimiterStrategy implementation will be retained and updated accordingly:

New constructor for SourceReader

Add new constructors for SourceReaderBase and SingleThreadMultiplexSourceReaderBase. The respective new constructors should accept a new parameter @Nullable RateLimiterStrategy rateLimiterStrategy. this rateLimiterStrategy is used to limit the request rate.

SourceReaderBase

New constructor of SourceReaderBase(All other constructors will be remained):
SourceReaderBase
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        implements SourceReader<T, SplitT> {
    ......
    private RateLimiter rateLimiter;

    /**
     * The primary constructor for the source reader.
     *
     * <p>The reader will use a handover queue sized as configured via {@link
     * SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
     */
    public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        this(splitFetcherManager, recordEmitter, null, config, context);
    }

    public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        this.elementsQueue = splitFetcherManager.getQueue();
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = eofRecordEvaluator;

        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }

    /**
     * New constructor.
     */
    public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context,
            @Nullable RateLimiterStrategy rateLimiterStrategy) {
        this.elementsQueue = splitFetcherManager.getQueue();
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = eofRecordEvaluator;
        this.rateLimiterStrategy = rateLimiterStrategy;

        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }
    ......
}

SingleThreadMultiplexSourceReaderBase

New constructor of SingleThreadMultiplexSourceReaderBase(All other constructors will be remained):

SingleThreadMultiplexSourceReaderBase
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<
                E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {

    /**
     * The primary constructor for the source reader.
     *
     * <p>The reader will use a handover queue sized as configured via {@link
     * SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
     */
    public SingleThreadMultiplexSourceReaderBase(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                new SingleThreadFetcherManager<>(splitReaderSupplier, config),
                recordEmitter,
                config,
                context);
    }

    /**
     * This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier,
     * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link
     * FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
     */
    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(splitFetcherManager, recordEmitter, config, context);
    }

    /**
     * This constructor behaves like {@link #SingleThreadMultiplexSourceReaderBase(Supplier,
     * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a specific {@link
     * FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and {@link
     * RecordEvaluator}.
     */
    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context);
    }

    /**
     * New constructor.
     */
    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context,
            @Nullable RateLimiterStrategy rateLimiterStrategy) {
        super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context, rateLimiterStrategy);
    }
}

Proposed Changes In SourceReaderBase

New fields

  • RateLimiter rateLimiter – instantiated from the RateLimiterStrategy that the constructor receives.
  • CompletionStage<Void> rateLimitFuture – remembers the future returned by the most recent rateLimiter.acquire(...) call.

New inner class

If rateLimiter is present, the reader wraps the real SourceOutput in a RateLimitedSourceOutputWrapper, which simply counts how many records have been forwarded since the last call to acquire.

Class sketch of RateLimitedSourceOutputWrapper:

RateLimitedSourceOutputWrapper
/** This output counted the amount of record emitted. */
private static final class RateLimitedSourceOutputWrapper<T> implements SourceOutput<T> {
    final SourceOutput<T> sourceOutput;

    final RateLimiter rateLimiter;
 	// The amount of data collected in each recordEmitter.emitRecord call.
    int recordCount;

    public RateLimitedSourceOutputWrapper(
            SourceOutput<T> sourceOutput, RateLimiter rateLimiter) {
        this.sourceOutput = sourceOutput;
        this.rateLimiter = rateLimiter;
        this.recordCount = 0;
    }

    @Override
    public void emitWatermark(Watermark watermark) {
        sourceOutput.emitWatermark(watermark);
    }

    @Override
    public void markIdle() {
        sourceOutput.markIdle();
    }

    @Override
    public void markActive() {
        sourceOutput.markActive();
    }

    @Override
    public void collect(T record) {
        sourceOutput.collect(record);
        recordCount++;
    }

    @Override
    public void collect(T record, long timestamp) {
        sourceOutput.collect(record, timestamp);
        recordCount++;
    }

    /**
     * Gets the number of records emitted.
     *
     * @return the number of records emitted.
     */
    public int getRecordCount() {
        return recordCount;
    }

    /** Resets the record count to 0. */
    public void resetRecordCount() {
        recordCount = 0;
    }
}

Method of Change

Check the rateLimitFuture to confirm if the previous request exceeded the limit, if it did, return InputStatus.MORE_AVAILABLE directly and wait for the next pull.

SourceReaderBase#pollNext
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
    ......
    // we need to loop here, because we may have to go across splits
    while (true) {
        // check if the previous record count reached the limit of ratelimiter.
        if (rateLimitFuture != null && !rateLimitFuture.toCompletableFuture().isDone()) {
            return trace(InputStatus.MORE_AVAILABLE);
        }
        // Process one record.
        final E record = recordsWithSplitId.nextRecordFromSplit();
        if (record != null) {
            // emit the record.
            numRecordsInCounter.inc(1);
            recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
            // Acquire permit from rateLimiter.
            if (rateLimiter != null) {
                RateLimitedSourceOutputWrapper<T> rateLimitedSourceOutputWrapper =
                        (RateLimitedSourceOutputWrapper<T>) currentSplitOutput;
                rateLimitFuture =
                        rateLimiter.acquire(rateLimitedSourceOutputWrapper.getRecordCount());
                rateLimitedSourceOutputWrapper.resetRecordCount();
            }
            LOG.trace("Emitted record: {}", record);

            // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
            // more is available. If nothing more is available, the next invocation will find
            // this out and return the correct status.
            // That means we emit the occasional 'false positive' for availability, but this
            // saves us doing checks for every record. Ultimately, this is cheaper.
            return trace(InputStatus.MORE_AVAILABLE);
        } else if (!moveToNextSplit(recordsWithSplitId, output)) {
            // The fetch is done and we just discovered that and have not emitted anything, yet.
            // We need to move to the next fetch. As a shortcut, we call pollNext() here again,
            // rather than emitting nothing and waiting for the caller to call us again.
            return pollNext(output);
        }
    }
}

Call RateLimiter#notifyCheckpointComplete method for each Splits to update the status of RateLimiter in SourceReaderBase#snapshot.

SourceReaderBase#snapshotState
public List<SplitT> snapshotState(long checkpointId) {
    List<SplitT> splits = new ArrayList<>();
    splitStates.forEach(
            (id, context) -> {
                splits.add(toSplitType(id, context.state));
                if (context.rateLimiter != null) {
                    context.rateLimiter.notifyCheckpointComplete(checkpointId);
                }
            });
    return splits;
}

Call RateLimiter#notifyAddingSplit method for each Splits to update the status of RateLimiter in SourceReaderBase#moveToNextSplit.

SourceReaderBase#moveToNextSplit
    private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
        String nextSplitId = recordsWithSplitIds.nextSplit();
        if (nextSplitId == null) {
            LOG.trace("Current fetch is finished.");
            this.finishCurrentFetch(recordsWithSplitIds, output);
            return false;
        } else {
            this.currentSplitContext = (SplitContext)this.splitStates.get(nextSplitId);
            Preconditions.checkState(this.currentSplitContext != null, "Have records for a split that was not registered");
            Function<T, Boolean> eofRecordHandler = null;
            if (this.eofRecordEvaluator != null) {
                eofRecordHandler = (record) -> {
                    if (!this.eofRecordEvaluator.isEndOfStream(record)) {
                        return false;
                    } else {
                        SplitT split = this.toSplitType(this.currentSplitContext.splitId, this.currentSplitContext.state);
                        this.splitFetcherManager.removeSplits(Collections.singletonList(split));
                        return true;
                    }
                };
            }
            if (this.rateLimiter != null) {
                rateLimiter.notifyAddingSplit(this.toSplitType(this.currentSplitContext.splitId, this.currentSplitContext.state));
            }

            this.currentSplitOutput = this.currentSplitContext.getOrCreateSplitOutput(output, eofRecordHandler);
            LOG.trace("Emitting records from fetch for split {}", nextSplitId);
            return true;
        }
    }

Compatibility, Deprecation, and Migration Plan

Compatibility for existed connector

  • The changes introduced by this FLIP are fully backward-compatible.

  • No existing API is removed or deprecated; current source implementations continue to compile and run unchanged.

Adaption plan for external connector

  1. Adapt DataGeneratorSource using the change in this FLIP.

  2. Adapt the API to Kafka Source connector.

  3. For other external connectors, it will depend on whether there is a demand of rate limit and their progress of bumping to Flink 2.0.

How to apply this FLIP in SQL connector

1.Define the rate limit of Source in Flink SQL.

Expose rate-limit properties in table DDL. Example for a Kafka source:

FlinkSQL
CREATE TABLE my_source (
 id INT,
 score INT
) WITH (
 'connector' = 'kafka',
 'scan.rate.limit.record-per-second' = '100000'
  ......
);

Note:scan.rate.records-per-second is only a placeholder key; the final option name may change.

The planner forwards this option to the connector via the DynamicTableFactory.

2.Connector Source created a RateLimiterStrategy using the configuration in SourceReaderContext.

For example, the createReader method in KafkaSource may add one line to create RateLimiterStrategy:

KafkaSource#createReader
SourceReader<OUT, KafkaPartitionSplit> createReader(
        SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
        throws Exception {
    ......
    return new KafkaSourceReader<>(
            elementsQueue,
            new KafkaSourceFetcherManager(
                    elementsQueue, splitReaderSupplier::get, splitFinishedHook),
            recordEmitter,
            toConfiguration(props),
            readerContext,
            kafkaSourceReaderMetrics,
RateLimiterStrategy.perSecond(readerContext.getConfiguration().get("scan.rate.limit.record-per-second")));
}

3.SourceReader in connector should use the new constructor of SourceReaderBase(or SingleThreadMultiplexSourceReaderBase).

For example, the constructor of KafkaSourceReader in Kafka connector should be like:

KafkaSourceReader
public KafkaSourceReader(
        FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
                elementsQueue,
        KafkaSourceFetcherManager kafkaSourceFetcherManager,
        RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState>
                recordEmitter,
        Configuration config,
        SourceReaderContext context,
        KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
        @Nullable RateLimiterStrategy rateLimiterStrategy) {
    super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config, context, rateLimiterStrategy);
    this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
    this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
    this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
    this.commitOffsetsOnCheckpoint =
            config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
    if (!commitOffsetsOnCheckpoint) {
        LOG.warn(
                "Offset commit on checkpoint is disabled. "
                        + "Consuming offset will not be reported back to Kafka cluster.");
    }
}

The impact of this feature on checkpoints

  • SourceOutput do not block the checkpoint.

  • The state of SourceReader will be updated only after the records corresponding to the split has been fully collect.

Test Plan

We will adapt the change to DataGeneratorSource(Proposed in FLIP-238) firstly and add related tests to validate the proposed changes.

Rejected Alternatives

Global rate limitation

Using global rate limit can more accurately control the number of requests made to the server. However, this involves communication between the Operator and Coordinator though OperatorEvents, which will increases complexity and may also encounters instability issues caused by network communication.

Considering that the global limitation is the upper limit of the request, we can also meet the requirements by imposing limitations on each reader on average.

Acquire permission before fetching records

For unbounded split(MySQL CDC/Kafka and other MQ), we may not be able to determine the number of records before fetching data, because pull requests are limited by batch size/record count/request timeout. Therefore, we cannot use RateLimiter to assess requests.

Pass RateLimiter to SourceReader through SourceReaderContext

1.Add a new interface SupportsRateLimiting for Source to provide RateLimiter

SupportsRateLimiting
public interface SupportsRateLimiting {
    RateLimiter rateLimiter(int currentParallelism);
}

2.Add a new method getRateLimiter in SourceReaderContext to provide RateLimiter for SourceReader

SourceReaderContext
public interface SourceReaderContext extends Serializable {

    /** @return The metric group this source belongs to. */
    SourceReaderMetricGroup metricGroup();

    /** Gets the configuration with which Flink was started. */
    Configuration getConfiguration();

    ......
    /** @return The RateLimiter to control the reading rate of SourceReader. */
    default RateLimiter getRateLimiter() {
        throw new UnsupportedOperationException();
    }
}

3.When creating SourceOperator in SourceOperatorFactory, if Source implement SupportsRateLimiting, SourceOperatorFactory will extract the RateLimiter from Source and pass to SourceOperator.

SourceOperatorFactory
public <T extends StreamOperator<OUT>> T createStreamOperator(
        StreamOperatorParameters<OUT> parameters) {
    ......
    FunctionWithException<Integer, RateLimiter, Exception>
            rateLimiterBuildFunction = currentParallelism -> {
                if (source instanceof SupportsRateLimiting) {
                    return ((SupportsRateLimiting) source).rateLimiter(currentParallelism);
                } else {
                    return null;
                }
            };
    final SourceOperator<OUT, ?> sourceOperator =
            instantiateSourceOperator(
                    parameters,
                    source::createReader,
                    gateway,
                    source.getSplitSerializer(),
                    watermarkStrategy,
                    parameters.getProcessingTimeService(),
                    parameters
                            .getContainingTask()
                            .getEnvironment()
                            .getTaskManagerInfo()
                            .getConfiguration(),
                    parameters
                            .getContainingTask()
                            .getEnvironment()
                            .getTaskManagerInfo()
                            .getTaskManagerExternalAddress(),
                    emitProgressiveWatermarks,
                    parameters.getContainingTask().getCanEmitBatchOfRecords(),
                    rateLimiterBuildFunction);
  ......
}

4.SourceOperator use RateLimiter to initialize SourceReaderContext.

SourceOperator
public void initReader() throws Exception {
    if (sourceReader != null) {
        return;
    }

    final int subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();

    final SourceReaderContext context = new SourceReaderContext(){.....}

    sourceReader = readerFactory.apply(context);
}

5.SourceReaderBase get the RateLimiter from SourceReaderContext.

  • No labels