Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/6420gmzhz14co1sktn5kp9011l1vfmx4
Vote threadTo be created
JIRA

FLINK-34442 - Getting issue details... STATUS

ReleaseTo be created

Terminology

In this FLIP we use the term pre-partitioned to refer to data that is already "pre-divided" w.r.t. one or more columns. The scope of this pre-division can spawn to multiple buckets and within each bucket multiple partitions. 

So, if there is any way to consume the input data with consistent partitioning [in parallel], we refer to it as pre-partitioned data. 

Motivation


There are some use-cases in which data sources are pre-partitioned:

  • Kafka broker is already partitioned w.r.t. some key[s] 
  • There are multiple [Flink] jobs that materialize their outputs and read them as input subsequently. For example, tables created with DISTRIBUTED BY or PARTITIONED BY clauses can be candidates for this optimization.
  • There is already an experimental feature in DataStream to support this feature [3] and all the statements mentioned there also hold for this FLIP.


It is also known that data shuffling, especially many-to-many, is one of the major computationally expensive operations in [distributed] query processing [1].

Data shuffling involves redistributing data across the nodes in the cluster. This typically occurs during operations like joins, group-bys, or aggregations where data from multiple partitions needs to be combined or reorganized.

Therefore, to avoid this bottleneck some optimizations [1, 2] are proposed.

The motivation of this FLIP is not to modify the shuffle operation in Flink but to take the advantage of the pre-partitioned data and try to avoid data shuffling in a query.


Example


SELECT
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
FROM
    lineitem
WHERE
    l_shipdate <= date '1998-12-01' - interval '90' day
GROUP BY
    l_returnflag,
    l_linestatus
ORDER BY
    l_returnflag,
    l_linestatus;


Consider the above TPC-H Q1. If the data is already pre-partitioned w.r.t. (l_returnflag, l_linestatus), we can have a query plan with one less shuffle operation (after reading the data, we do not need to groupby w.r.t (l_returnflag, l_linestatus).


Public Interfaces


The following interface is introduced to be implemented by the data sources:

SupportsPartitioning


package org.apache.flink.table.connector.source.abilities;


/**
 * Enables {@link ScanTableSource} to discover source partitions and inform the optimizer
 * accordingly.
 *
 * <p>Partitions split the data stored in an external system into smaller portions that are
 * identified by one or more string-based partition keys.
 *
 * <p>For example, data can be partitioned by region and within a region partitioned by month. The
 * order of the partition keys (in the example: first by region then by month) is defined by the
 * catalog table. A list of partitions could be:
 *
 * <pre>
 *   List(
 *     ['region'='europe', 'month'='2020-01'],
 *     ['region'='europe', 'month'='2020-02'],
 *     ['region'='asia', 'month'='2020-01'],
 *     ['region'='asia', 'month'='2020-02']
 *   )
 * </pre>
 *
 * <p>In the above case (data is partitioned w.r.t. region and month) the optimizer might utilize
 * this pre-partitioned data source to eliminate possible shuffle operation. For example, for a
 * query SELECT region, month, AVG(age) from MyTable GROUP BY region, month the optimizer takes the
 * advantage of pre-partitioned source and avoids partitioning the data w.r.t. [region,month]
 *
 * @param <T> The type of data each partition.
 */
@PublicEvolving
public interface SupportsPartitioning<T> {

    /** Returns a list of data units in each partition. */
    Optional<List<T>> sourcePartitions();

    /** Applies partitioned reading to the source operator. If withPartitions is non-empty, the method will limit its partitions to withPartitions. */
    void applyPartitionedRead(Optional<List<T>> withPartitions);
}


Flink source connectors that implement the above ability and provide the necessary partitioning information (sourcePartitions). For example, consider the below query and file structure for a Flink table with filesystem connector:


CREATE TABLE MyTableP (
  a bigint,
  b int,
  c varchar
) PARTITIONED BY (a, b) with (
 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '/root_dir')
 

SELECT a, b, COUNT (c) from MyTableP GROUP BY a, b



And the pre-partitioned input file structure is as follows:

 | |-root_dir
 | |-a=1
 | | |-b=1
 | | | |-part1
 | | | |-part2
 | | | |-part3
 | | |-b=2
 | | | |-part4
 | | | |-part5

In this case, 

  • T (partition data unit) in sourcePartitions() is List<String>.

           So, sourcePartitions method would return Optional<List<List<String>>> (in the above example, [["root_dir/a=1/b=1/part1", "root_dir/a=1/b=1/part2", "root_dir/a=1/b=1/part3"], ["root_dir/a=1/b=2/part4", "root_dir/a=1/b=2/part5"]]



Changes to FileSplitAssigner


Additional to the SupportsPartitioning interface, we need to add one extra method to the FileSplitAssigner interface:


package org.apache.flink.connector.file.src.assigners;


/**
 * The {@code FileSplitAssigner} is responsible for deciding what split should be processed next by
 * which node. It determines split processing order and locality.
 */
@PublicEvolving
public interface FileSplitAssigner {
    
    ...

    /**
     * Gets the next split with the context information of the calling subtaskID and overall
     * registered source tasks. These extra information might be useful for the split assigners to
     * send splits to specific partitions.
     *
     * <p>When this method returns an empty {@code Optional}, then the set of splits is assumed to
     * be done and the source will finish once the readers finished their current splits.
     */
    Optional<FileSourceSplit> getNext(
            @Nullable String hostname, int subtaskID, int registeredTasks);

   ...
}


Note that we preserve all the existing methods in the FileSplitAssigner interface. The new getNext method is necessary for a split assigner (e.g., possible PartitionAwareSplitAssigner implementation) to

1) track the origin of the requests, so to assign only specific splits to specific subtasks, and

2) to get the info about the overall registered source subtasks. 


Additional options


Disable this optimization

Enabling this optimization might not be always beneficial. Especially, when there are many data units (e.g., files) and very few partitions, enabling this optimization by default might be suboptimal. 

Consider, 2 partitions, each with thousand files. Then, the maximum source parallelism would be 2. Although we plan to enhance the optimizer to make this decision transparently to the user, it would be better to give a choice to a user to disable this optimization via options (e.g., 'read-source-partitioned' = 'true/false', by default 'true'). This option (to disable this optimization) is a source configuration. 


Static Partitions

We utilize table option is-static-partitioning to check whether partitions are static or not. This option is a hint from a user to indicate whether new partitions will be added during the query execution. If this option is false, then Flink makes sure that no partitions will be added during the runtime, and performs the necessary optimizations accordingly. Otherwise, new partitions can be added at runtime. 


Proposed Changes


Optimization rule


The proposed changes consist of mainly two parts:

  1. DynamicTableSources (we can start with FileSystemTableSource) implements SupportsPartitioning interface
  2. Introduce new FileAssigner, such as PartitionAwareFileAssigner (that implements FileSplitAssigner)
  3. Introduce a new optimization rule (e.g., RemoveRedundantShuffleRule) to 
    1. Detect pattern upstream_any->Exchange→downstream_any
    2. Check if upstream_any is pre-partitioned data source AND contains the same partition keys as the Exchange
    3. If yes, then
      1. apply SupportsPartitioning::applyPartitionedRead
      2. remove Exchange operator and return upstream_any→downstream_any


Handling partition change at runtime


The partition information returned during the optimization phase may not be the same as the partition information during runtime execution.

For long-running jobs, partitions may be continuously created. We handle the above scenario as follows:

Once a new partition is added, or in general, new splits are discovered, PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> newSplits) method will be called.

Inside this method, we are able to detect if a split belongs to existing partitions or there is a new partition. Once a new partition is detected, we add it to our existing mapping.

Our mapping looks like Map<Integer, Set<Integer>> subtaskToPartitionAssignment, where it maps each source subtaskID to zero or more partitions.


Physical plan and integration with CompiledPlan


For compiled plan, PartitioningSpec will be used, with a json tag "Partitioning". As a result, in the compiled plan, the source operator will have
"abilities" : [ { "type" : "Partitioning" } ] as part of the compiled plan. More about the implementation details below:


--------------------------------
PartitioningSpec class
--------------------------------

@JsonTypeName("Partitioning")
public final class PartitioningSpec extends SourceAbilitySpecBase {
 // some code here
    @Override
    public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
        if (tableSource instanceof SupportsPartitioning) {
            ((SupportsPartitioning<?>) tableSource).applyPartitionedRead();
        } else {
            throw new TableException(
                    String.format(
                            "%s does not support SupportsPartitioning.",
                            tableSource.getClass().getName()));
        }
    }
  // some code here
}



--------------------------------
SourceAbilitySpec class
--------------------------------
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes({
    @JsonSubTypes.Type(value = FilterPushDownSpec.class),
    @JsonSubTypes.Type(value = LimitPushDownSpec.class),
    @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
    @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
    @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
    @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
    @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
    @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
+   @JsonSubTypes.Type(value = PartitioningSpec.class)                   // new added  


For the physical plan, the table source will have an additional info named "partitionedReading".

Example:


CREATE TABLE MyTableP (
  a bigint,
  b int,
  c varchar
) PARTITIONED BY (a, b) with (
 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '/root_dir')
 

SELECT a, b, COUNT (c) from MyTableP GROUP BY a, b


+- LocalHashAggregate(groupBy=[a, b], select=[a, b, Partial_COUNT(c) AS count$0])
  +- TableSourceScan(table=[[default_catalog, default_database, MyTableP, partitionedReading]], fields=[a, b, c])




Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

As a result of this optimization, the users will be able to take the advantage of their partitioned sources transparently. 

  • If we are changing behavior how will we phase out the older behavior?

N/A

  • If we need special migration tools, describe them here.

N/A

  • When will we remove the existing behavior?

N/A

Test Plan

The target is to make all existing tests pass with the current architecture. 

Also, we will add new integration and unit tests


Rejected Alternatives

No rejected alternatives yet. 


[1] https://www.alibabacloud.com/blog/performance-analysis-and-tuning-guides-for-hybrid-shuffle-mode_600154

[2] https://www.vldb.org/pvldb/vol13/p3382-shen.pdf

[3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/


  • No labels