DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Discussion thread | https://lists.apache.org/thread/kds2zrcdmykrz5lmn0hf9m4phdl60nfb |
|---|---|
| Vote thread | https://lists.apache.org/thread/xoz07zcdnk1kgo4dr4wywmr1s54x8lfh |
| JIRA | FLINK-35652 - Getting issue details... STATUS |
| Release | 2.0.0 |
Motivation
Lookup Join is an important feature in Flink, It is typically used to enrich a table with data that is queried from an external system.
If we interact with the external systems for each incoming record, we incur significant network IO and RPC overhead. Therefore, most connectors introduce caching to reduce the per-record level query overhead, such as Hbase and JDBC. However, because the data distribution of Lookup Join's input stream is arbitrary, the cache hit rate is sometimes unsatisfactory.
FLIP-204 introduces Hash Lookup Join that distributing data according to the hash of Join keys. For LookupFunction using partial-caching strategy, this can improve the hit rate. However, in some external system, the data itself has a certain distribution pattern. For example, Apache Paimon organizes data in buckets, partitioning by join key is not conducive to data locality.
As you can see, external systems may have different requirements for data distribution on the Input side, and Flink does not have this knowledge. We want to introduce a mechanism for the connector to tell the Flink planner its desired input stream data distribution or partitioning strategy. This can significantly reduce the amount of cached data and improve performance of Lookup Join.
Public Interfaces
We propose to introduce the following interface as a new ability for LookupTableSource connector.
/**
* This interface is designed to allow connectors to provide a custom partitioning strategy for the
* data that is fed into the {@link LookupTableSource}. This enables the Flink Planner to optimize
* the distribution of input stream across different subtasks of lookup-join node to match the
* distribution of data in the external data source.
*/
@PublicEvolving
public interface SupportsLookupCustomShuffle {
/**
* This method is used to retrieve a custom partitioner that will be applied to the input stream
* of lookup-join node.
*
* @return An {@link InputDataPartitioner} that defines how records should be distributed across
* the different subtasks. If the connector expects the input data to remain in its original
* distribution, an {@link Optional#empty()} should be returned.
*/
Optional<InputDataPartitioner> getPartitioner();
/**
* This interface is responsible for providing custom partitioning logic for the RowData
* records. We didn't use {@link Partitioner} directly because the input data is always
* RowData type, and we need to extract all join keys from the input data before send it
* to partitioner.
*/
@PublicEvolving
interface InputDataPartitioner extends Serializable {
/**
* Determining the partition id for each input data.
*
* <p>This data is projected to only including all join keys before emit to this
* partitioner.
*
* @param joinKey The extracted join key for each input record.
* @param numPartitions The total number of partition.
* @return An integer representing the partition id to which the record should be sent.
*/
int partition(RowData joinKeyRow, int numPartitions);
/**
* Returns information about the determinism of this partitioner.
*
* <p>It returns true if and only if a call to the {@link #partition(RowData, int)} method
* is guaranteed to always return the same result given the same joinKeyRow. If the
* partitioning logic depends on not purely functional like <code>
* random(), date(), now(), ...</code> this method must return false.
*
* <p>If this method return false, planner may not apply this partitioner in updating stream to
* avoid out-of-order of the changelog events.
*/
default boolean isDeterministic() {
return true;
}
}
}
The planner just tries best(the reason will be explained later) to apply the custom partitioner to change the data distribution of lookup join's input stream, so connector needs to know if the actual data distribution meets its expectations. This affect the connector's strategy for loading data from external systems. For example, whether to load only specific partitions/buckets or all partitions/buckets.
In view of this, we plan to introduce a new method for LookupContext to tell the connector whether the data distribution is what it expects.
@PublicEvolving
interface LookupContext extends DynamicTableSource.Context {
/**
* Whether the distribution of the input stream data matches the partitioner provided by the
* {@link LookupTableSource}. If the interface {@link SupportsLookupCustomShuffle} is not
* implemented, false is guaranteed to be returned.
*
* @return true if the custom partitioner provided by the source is applied, otherwise
* return false.
*/
boolean isCustomShuffleEnabled();
}
It is not enough to just have the input data partitioned as expected. For example, if the data is shuffled by sharding/bucket, different instances of LookupFunction need to know some information about the task(e.g. parallelism and maximum parallelism) in order to calculate which sharding was assigned to itself for further optimization. For example, only load the data inside this sharding for caching In the initialization phase.
Therefore, we plan to expose TaskInfo in FunctionContext
@PublicEvolving
public class FunctionContext {
/**
* Get the task information for this parallel subtask.
*
* @return information for this parallel subtask.
*/
public TaskInfo getTaskInfo();
}
Proposed Changes
How to apply the custom partitioner to input stream
The whole thing is divided into three steps for planner:
- Recognizes that a LookupTableSource implements SupportsLookupCustomShuffle, and retrieves the custom partitioner.
Construct a special StreamPartitioner which extracts the join key from the input data and sends it to the custom partitioner to determine the output partition.
Attach a PartitionTransformation to the input stream to force data distribution match the partitioner.
How to enable this optimization
We plan to introduce a new option called shuffle for LOOKUP SQL Hint. If shuffle = true, table planner will try best to apply the custom shuffle to the input stream of lookup join.
SQL Syntax Example
To enable custom shuffle for lookup join, user only needs specify a LOOKUP hint with shuffle = true in select clause in query. If the target dim table does implement SupportsLookupCustomShuffle, the planner will try best to apply customer partitioning for the input stream. Otherwise, the planner will try best to apply a hash partitioning.
|
Note: The hint only provides a suggestion to the optimizer, it is not an enforcer.
In theory, this approach is already cover the hash shuffle introduced in FLIP-204. Since the development progress of FLIP-204 is stale(no code has been merged), we prefer to discuss whether it should be discarded or refactored in a separate thread later.
When will the planner not use this optimization
The proposed SupportsLookupCustomShuffle interface here is not a mandatory protocol. That said, the planner does not guarantee that the custom partitioner will be applied, but it will try best to do it.
Since 1.16, Flink SQL introduces the NDU(Non-Deterministic Update) handling mechanism, if TRY_RESOLVE mode is enabled, it will check whether there is NDU problem in the streaming query and try to eliminate the NDU problem generated by Lookup Join. This require input data distributed by join keys, and often in conflict with custom partitioner. In this case, correctness is exactly what the user wants. Therefore, the planner does not use the custom partitioner provided by connector. In theory, if the connector's custom data distribution matches, or at least partially matches, the distribution according to the join key, we can have a win-win situation. But it would introduce more complexity, and if necessary, we can support it in the future.
If the connector supplies a non-deterministic partitioner, e.g., a random partitioner for anti-skew purpose, then it'll break the assumption relied by SQL operators in streaming mode: The ADD/UPDATE_AFTER events always occur before its related UPDATE_BEFORE/DELETE events and they are always processed by the same task even if a data shuffle is involved. For this reason, the planner will not apply the custom shuffle partitioner if the isDeterministic method of this partitioner returns false.
It is also important to note that whether the planner should enable this optimization is determined not only by the type of connector, but also by other factors. Taking Paimon as an example, it expects the input data to be distributed according to the bucket id. The fields in the Paimon table used to calculate the bucket id is called bucket key, and if the join keys do not fully cover all bucket keys, it is obvious that the planner can't apply a custom shuffle because the partitioning strategy is not well-defined.
Compatibility, Deprecation, and Migration Plan
It won't break any compatibility.
Test Plan
The changes will be covered by unit tests, e2e tests and manual tests.
Rejected Alternatives
Allows LookupTableSource to touch the input DataStream
public interface SupportsTransformLookupInput{
void transformInputStream(DataStream<RowData> inputStream);
}
This solution gives the connector the most flexibility to not only change the distribution of the input stream, but also to insert other operators. For the dimension source table, all of its work even can be encapsulated in the LookupFunction. We don't think it should be aware of anything other than the fact table's data distribution, so we reject this alternative.
Allows LookupTableSource to declare fields for partitioning
If the partition can be calculated directly from multiple fields of the input row, no additional information is required. We can simply ask the connector to provide a list of fields for partitioning:
public interface SupportsLookupCustomPartitioning {
public int[] getPartitionFields();
}
The planner can generate a partitioner from these partition fields.
However, it has the following limitations:
Nested fields are not supported.
The specific partitioning rules can only be defined by the Planner, It is difficult to meet the requirements of all connectors.
The partitioning logic is completely determined by the input fields. In the case of Apache Paimon, we want the data to be distributed according to bucket it belongs, but the total number of buckets is not available from the input data itself.
In view of all the above drawbacks, we decided to reject this alternative.