Status


Discussion threadhttps://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
Vote thread
JIRA

Release1.14


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In practice, many Flink jobs need to read data from multiple sources in sequential order. Change Data Capture (CDC) and machine learning feature backfill are two concrete scenarios of this consumption pattern.

In the past, users may have to either run two different Flink jobs or have some hacks in the SourceFunction to address such use cases. However, most users may find it is not as easy it sounds like:

To support such scenarios smoothly, the Flink jobs need to first read from HDFS for historical data then switch to Kafka for real-time records. Therefore, this FLIP proposes introducing a “Hybrid Source” API built on top of the new Source API (FLIP-27) to help users with such use cases.

The goal of this FLIP include:

Basic Idea

A hybrid source is a source that contains a list of concrete sources. The hybrid source reads from each contained source in the defined order. It switches from source A to the next source B when source A finishes. So that from the users’ perspective, all the sources act as a single source.

In most cases, the hybrid source only contains two sources, but it can contain more sources if needed.

This FLIP proposes to support switching sources with either predetermined start positions or with position conversion at switch time. The former mode is very simple - sources are configured upfront with their start/end positions and wrapped into HybridSource. No special support is required on existing sources.

With position conversion at switch time the end position of the current source is converted into the start position of the next source. This requires support in the split enumerator of the current source to provide the end position, support in the next source to set the start position (like the start timestamp in KafkaSource) and a user supplied function that converts the end position into the start position.

  1. The HybridSource enumerator manages the process of switching between two sources.
  2. A user provided implementation of SourceFactory is used to create the next Source when the previous Source finishes reading.
  3. The SourceFactory is expected to do the following:
    a. Get the end_position from the SplitEnumerator of the previous finished Source. (This may require modification of existing source like FileSource to expose the end position.)
    b. Translate that end_position to the start_position of the next source.
    c. Construct and setup the next Source.

The switch is done in the following way:

To make it work, we have to:

Example with FileSource and KafkaSource

When FileSource is used as the source to be switched, FileSplitEnumerator could use the max timestamp of File records as the return value of the getEndState method, which is regarded as END_POSITION for the file system source.

When KafkaSource is used as the switched source, KafkaSplitEnumerator could use the maximum timestamp of the upstream source as the parameter value of the setStartState method, which is considered to be START_POSITION for the Kafka consumer to seek.

For example, the HybridSource composed of HDFSSource and KafkaSource is as follows:

Prototype implementation

A PoC has been implemented and evolved into a pull requestThe PR contains a working HybridSource implementation that satisfies the goals listed above. It is subject to resolution of few outstanding questions in the mailing list discussion and differs in following aspects from the original proposal:

Source switch

The implementation of the switching mechanism in the PR deviates from what was originally outlined in this FLIP:

Start position

As discussed on the mailing list, there is a broad range of possible use cases and requirements for "hybrid sources" that could possibly result in different implementations. Within this proposal, we are considering a fixed sequence of sources with following options for defining their start positions:

  1. Fixed, pre-configured start positions. Example: file source reads 2 days worth of data till absolute end timestamp and subsequent Kafka source starts at the same, upfront known start timestamp. There is no special position transfer at switch time required for this. Existing FLIP-27 sources can be used as is. The sources are pre-configured at job submission time.
  2. Dynamically set start position based on transfer between enumerators at switch time. Example: File source reads a very large backlog, taking potentially longer than retention available for subsequent source. Switch needs to occur at "current time - X". This requires the start time for the next source to be set at switch time. Here we require state transfer from one enumerator to another. Sources will need to be modified to support this (current FLIP-27 sources don't support it). Hybrid source will support it by constructing underlying source from previous enumerator at switch time (SourceFactory).

Recovery

During task recovery, all splits that have been assigned to affected subtasks since the last checkpoint are handed back to the enumerator. Subtasks that are reset can be all or subset, depending on the topology. If a source switch occurred since the last checkpoint, the splits that are returned to the hybrid source enumerator may belong to a different source than the current active underlying enumerator.

To handle this case, the hybrid split enumerator needs to switch affected readers to the appropriate underlying reader (which could be different from the reader of current source/enumerator) and reassign the returned splits. After the splits belonging to the previous source have been completed, delegation to the current enumerator can resume. For an embarrassingly parallel topology with regional recovery that means readers of different sources may be active at the same time in different subtasks. Source and split processing order per subtask however remain the same and consistent with the semantics of fine grained recovery.

An alternative approach would be to only switch sources at the checkpoint boundary to avoid the complexity of dealing with splits from different enumerators. On the other hand this would require that checkpointing is enabled and the checkpoint intervals are reasonably small (as otherwise there could be a significant delay before reading for the next source begins).

HybridSource implementation

The baseline implementation of hybrid source switches underlying sources based on configured source chain built from HybridSourceBuilder.  HybridSourceBuilder adds the source with deferred instantiation based on previous enumerator to build the source chain through the SourceFactory for underlying sources of hybrid source.  SourceFactory permits building of a source at graph construction time or deferred at switch time and provides the ability to set a start position in the way allowed by a specific source. When the current enumerator has finished, SourceFactory creates the next source from the previous enumerator before the next enumerator is created and only required for dynamic position transfer at time of switching. Thus the end state of split enumerator is used to set the next source's start start position. 

/**
 * Hybrid source that switches underlying sources based on configured source chain.
 *
 * <p>A simple example with FileSource and KafkaSource with fixed Kafka start position:
 *
 * <pre>{@code
 * FileSource<String> fileSource =
 *   FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
 * KafkaSource<String> kafkaSource =
 *           KafkaSource.<String>builder()
 *                   .setBootstrapServers("localhost:9092")
 *                   .setGroupId("MyGroup")
 *                   .setTopics(Arrays.asList("quickstart-events"))
 *                   .setDeserializer(
 *                           KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
 *                   .setStartingOffsets(OffsetsInitializer.earliest())
 *                   .build();
 * HybridSource<String> hybridSource =
 *           HybridSource.builder(fileSource)
 *                   .addSource(kafkaSource)
 *                   .build();
 * }</pre>
 *
 * <p>A more complex example with Kafka start position derived from previous source:
 *
 * <pre>{@code
 * HybridSource<String> hybridSource =
 *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
 *         .addSource(
 *             switchContext -> {
 *               StaticFileSplitEnumerator previousEnumerator =
 *                   switchContext.getPreviousEnumerator();
 *               // how to get timestamp depends on specific enumerator
 *               long timestamp = previousEnumerator.getEndTimestamp();
 *               OffsetsInitializer offsets =
 *                   OffsetsInitializer.timestamp(timestamp);
 *               KafkaSource<String> kafkaSource =
 *                   KafkaSource.<String>builder()
 *                       .setBootstrapServers("localhost:9092")
 *                       .setGroupId("MyGroup")
 *                       .setTopics(Arrays.asList("quickstart-events"))
 *                       .setDeserializer(
 *                           KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
 *                       .setStartingOffsets(offsets)
 *                       .build();
 *               return kafkaSource;
 *             },
 *             Boundedness.CONTINUOUS_UNBOUNDED)
 *         .build();
 * }</pre>
 */
@PublicEvolving
public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {...}


/**
 * Factory for underlying sources of {@link HybridSource}.
 *
 * <p>This factory permits building of a source at graph construction time or deferred at switch
 * time. Provides the ability to set a start position in any way a specific source allows.
 * Future convenience could be built on top of it, for example a default implementation that
 * recognizes optional interfaces to transfer position in a universal format.
 *
 * <p>Called when the current enumerator has finished. The previous source's final state can
 * thus be used to construct the next source, as required for dynamic position transfer at time
 * of switching.
 *
 * <p>If start position is known at job submission time, the source can be constructed in the
 * entry point and simply wrapped into the factory, providing the benefit of validation during
 * submission.
 */
@FunctionalInterface
public interface SourceFactory<
                T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>
        extends Serializable {
    SourceT create(SourceSwitchContext<FromEnumT> context);
}


/**
 * Context provided to source factory.
 *
 * <p>To derive a start position at switch time, the source can be initialized from context of
 * the previous enumerator. A specific enumerator implementation may carry state such as an end
 * timestamp, that can be used to derive the start position of the next source.
 *
 * <p>Currently only the previous enumerator is exposed. The context interface allows for
 * backward compatible extension, i.e. additional information about the previous source can be
 * supplied in the future.
 */
public interface SourceSwitchContext<EnumT> {
    EnumT getPreviousEnumerator();
}


/** Builder for HybridSource. */
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
        implements Serializable {
    private final List<SourceListEntry> sources;

    public HybridSourceBuilder() {
        sources = new ArrayList<>();
    }

    /** Add pre-configured source (without switch time modification). */
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) {
        return addSource(new PassthroughSourceFactory<>(source), source.getBoundedness());
    }

    /** Add source with deferred instantiation based on previous enumerator. */
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(
                    SourceFactory<T, NextSourceT, EnumT> sourceFactory,
                    Boundedness boundedness) {
        if (!sources.isEmpty()) {
            Preconditions.checkArgument(
                    Boundedness.BOUNDED.equals(sources.get(sources.size() - 1).boundedness),
                    "All sources except the final source need to be bounded.");
        }
        ClosureCleaner.clean(
                sourceFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        sources.add(SourceListEntry.of(sourceFactory, boundedness));
        return (HybridSourceBuilder) this;
    }

    /** Build the source. */
    public HybridSource<T> build() {
        return new HybridSource(sources);
    }
}


HybridSource creates the HybridSourceSplitEnumerator when SourceCoordinator start the hybrid source. HybridSourceSplitEnumerator wraps the actual split enumerators and facilitates source switching. Split enumerators are created lazily when source switch occurs to support runtime position conversion. The enumerator delegates to the current underlying split enumerator and transitions to the next source once all readers have indicated via SourceReaderFinishedEvent that all input was consumed. SourceReaderFinishedEvent is a source event sent from the HybridSourceReader to the enumerator to indicate that the current reader has finished and splits for the next reader could be sent from corresponding enumerator. When creating the new enumerator via SourceHybridSourceSplitEnumerator switches between enumerators and the start position is fixed at pipeline construction time through the source or supplied at switch time through a converter function by using the end state of the previous enumerator. During subtask recovery, SourceCoordinator adds back the source splits which have already been assigned since the last checkpoint. For the case that these splits originates from a previous enumerator that is no longer active. HybridSourceSplitEnumerator will suspend forwarding to the current enumerator and replay the returned splits by activating the previous readers. After returned splits were processed, delegation to the current underlying enumerator resumes.


HybridSource creates the HybridSourceReader to to read data from the splits assigned when creating the stream operator. HybridSourceReader delegates to the actual source reader and processes source splits from a sequence of sources as determined by the enumerator. The current source is provided with SwitchSourceEvent which sent from HybridSourceSplitEnumerator to HybridSourceReader to switch to the indicated reader. HybridSourceReader doesn't require upfront knowledge of the number and order of sources. When the underlying reader has consumed all input for a source, HybridSourceReader sends SourceReaderFinishedEvent to the SourceCoordinator and doesn't make assumptions about the order in which sources are activated. HybridSourceReader may start processing splits for a previous source, which is indicated via SwitchSourceEvent when recovering from a checkpoint.

Feature Work

The feature plan of HybridSource could be planned as follows:

  1. Implement hybrid SQL connector for the HybridSource to create multiple source tables to specify which ones are added to the hybrid source to switch corresponding sources.

Compatibility, Deprecation, and Migration Plan

Hybrid source implementation is supported by the new source interface. Existing FLIP-27 sources can be used without modification, including FileSource, KafkaSource. For dynamic start position, existing sources may need to be modified to expose the end position of the enumerator to the source factory function. This work is outside the scope of this FLIP.

Test Plan

Unit test and integration test for HybridSource are part of the baseline PR and use MockSource. Manual verification with other sources as mentioned on the PR. End to end test with FileSource and KafkaSource will be added with .