Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Many SQL vendors expose the concepts of Partitioning, Bucketing, and Clustering.
Terminology
Apache Hive was among the first large scale data warehouses.
Flink’s concept of Partitioning is adopted from Hive.
Hive Partitions Explained with Examples - Spark By {Examples}
HIVE - ORDER BY, SORT BY, DISTRIBUTE BY, CLUSTER BYApache Paimon uses the concept of Partitions and Buckets.
Basic Concepts
File LayoutsOther vendors offer Partitions and Clustering
Introduction to partitioned tables | BigQuery | Google Cloud
Introduction to clustered tables | BigQuery | Google Cloud
Clustering Keys & Clustered Tables | Snowflake Documentation
Scope
FLIP-63: Rework table partition support introduced the concept of Partitioning to Flink.
This FLIP proposes to introduce the concept of Bucketing to Flink.
Clustering is out of scope for this FLIP.
Requirements
Apache Paimon (ex Flink Table Store) currently needs to define the number of buckets and bucket keys in the WITH clause:
CREATE TABLE PaimonTable ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING, PRIMARY KEY (dt, hh, user_id) NOT ENFORCED ) PARTITIONED BY (dt, hh) WITH ( 'bucket' = '2', 'bucket-key' = 'user_id' );
Flink's Kafka connector also needs to define the message key and number of Kafka partitions (which semantically are closer to buckets actually) using options:
CREATE TABLE KafkaTable ( `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'key.format' = 'json', 'key.fields' = 'user_id;item_id', 'value.format' = 'json', 'properties.num.partitons' = '6', )
→ Flink should offer a native way of declaring bucketing.
→ Whether this is supported or not during runtime should then be a connector characteristic - similar to partitioning.
→ The planner can leverage this property via ability interfaces in the future. However, this is out of scope for this FLIP. This FLIP focuses solely on the syntax and necessary API changes.
Public Interfaces
SQL syntax
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( ... ) [COMMENT table_comment] [ { DISTRIBUTED BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] | DISTRIBUTED INTO n BUCKETS } ] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (...) [ LIKE ... ] ALTER TABLE [IF EXISTS] table_name { ADD { ... | <distribution_spec> } | MODIFY { ... | <distribution_spec> } | DROP { ... | <distribution>} | RENAME ... | RENAME TO ... | SET ... | RESET ... } <distribution_spec>: { DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] | DISTRIBUTION INTO n BUCKETS } <distribution>: DISTRIBUTION
API Changes
CatalogTable.getDistribution()
CatalogTable.Builder
andCatalogTable.newBuilder()
instead of another overloadedCatalogTable.of()
method.org.apache.flink.table.connector.sink.abilities.SupportsBucketing
Proposed Changes
CREATE TABLE
We propose to add the DISTRIBUTED
keyword to the CREATE TABLE
DDL:
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid) INTO 6 BUCKETS
Notes:
- By default,
DISTRIBUTED BY
assumes a list of columns for an implicit hash partitioning. - The referenced columns must exist and must be physical columns.
- The
BY (...)
syntax matches withPARTITION BY ()
for consistency by default.
CREATE TABLE MyTable (uid BIGINT, name STRING)
Notes:
- Omitting the
DISTRIBUTED
clause leaves the distribution up to the connector implementation. - This the behavior prior to this FLIP.
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS
Notes:
- For advanced users, the algorithm can be defined explicitly.
- Currently, either
HASH()
orRANGE()
.
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 6 BUCKETS
Notes:
- Omitting the
BY (...)
clause leaves the distribution up to the connector implementation. - The connector will most likely use a round robin or random distribution.
- Again, specifying the number of buckets is mandatory otherwise the
DISTRIBUTED
clause could be omitted entirely to achieve the same behavior.
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY RANGE(uid)
Notes:
- Omitting the
INTO n BUCKETS
leaves the number of buckets up to the connector implementation.
ALTER TABLE
We propose to add the DISTRIBUTION
keyword to the ALTER TABLE
DDL:
ALTER TABLE MyTable ADD DISTRIBUTION BY (uid) INTO 6 BUCKETS; ALTER TABLE MyTable ADD DISTRIBUTION INTO 6 BUCKETS; ALTER TABLE MyTable MODIFY DISTRIBUTION BY (uid) INTO 6 BUCKETS; ALTER TABLE MyTable MODIFY DISTRIBUTION INTO 6 BUCKETS;
Notes:
- Similar syntax as in
CREATE TABLE
but with noun instead of verb similar how we do it withPARTITIONED BY
andPARTITION
- Consistency with existing ALTER TABLE components is ensured.
ALTER TABLE MyTable DROP DISTRIBUTION;
Notes:
- Similar to
PRIMARY KEY
orWATERMARK
without spec
CatalogTable
In order to store the new metadata, we add new methods to CatalogTable.
public interface CatalogTable extends CatalogBaseTable { /** Returns the distribution of the table if the {@code DISTRIBUTED} clause is defined. */ Optional<TableDistribution> getDistribution(); /** Distribution specification. */ class TableDistribution { private final Kind kind; private final @Nullable Integer bucketCount; private final List<String> bucketKeys; public TableDistribution( Kind kind, @Nullable Integer bucketCount, List<String> bucketKeys) { this.kind = kind; this.bucketCount = bucketCount; this.bucketKeys = bucketKeys; } /** Connector-dependent distribution with a declared number of buckets. */ public static TableDistribution ofUnknown(int bucketCount) { return new TableDistribution(Kind.UNKNOWN, bucketCount, Collections.emptyList()); } /** Hash distribution over on the given keys among the declared number of buckets. */ public static TableDistribution ofHash(List<String> bucketKeys, @Nullable Integer bucketCount) { return new TableDistribution(Kind.HASH, bucketCount, bucketKeys); } /** Range distribution over on the given keys among the declared number of buckets. */ public static TableDistribution ofRange(List<String> bucketKeys, @Nullable Integer bucketCount) { return new TableDistribution(Kind.RANGE, bucketCount, bucketKeys); } enum Kind { UNKNOWN, HASH, RANGE } public Kind getKind() { return kind; } public List<String> getBucketKeys() { return bucketKeys; } public Optional<Integer> getBucketCount() { return Optional.ofNullable(bucketCount); } } }
Notes:
- Transitively, this also applies to
ResolvedCatalogTable
. - The new attribute also requires an overloaded
CatalogTable.of()
method, this is a good chance to introduceCatalogTable.Builder
andCatalogTable.newBuilder()
. The builder design can be similar toSchema
.
SupportsBucketing
For checking whether bucketing is supported, we introduce a new marker interface:
package org.apache.flink.table.connector.sink.abilities; public interface SupportsBucketing { Set<TableDistribution.Kind> listAlgorithms() boolean requiresBucketCount(); }
Notes:
- Currently, the interface mostly serves for helpful error messages.
- All information is provided by
ResolvedCatalogTable
. - The interface is checked in
DynamicSinkUtils
similar toSupportsPartitioning
For CompiledPlan, we add:
@JsonTypeName("Bucketing") BucketingSpec implements SinkAbilitySpec { // no properties; only check whether interface is implemented again during deserialization }
Kafka Connector
The DISTRIBUTED
clause can replace two options in the Kafka connector:
key.fields
→ not required anymoresink.partitioner
→ both'
default'
and'round-robin'
are covered
For backward compatibility, we suggest to keep the options but make them optional and give the DISTRIBUTED
declaration precedence in KafkaTableFactory
.
It is not allowed to specify both. Either a user goes with the option or with the clause. We won't implement merging logic.
The KafkaDynamicSink should implement SupportsBucketing
.
The bucket number is translated to the Kafka property: properties.num.partitions
Compatibility, Deprecation, and Migration Plan
No migration required.
Test Plan
We add tests to all layers. CompiledPlan tests, parser tests, catalog tests.
Rejected Alternatives
None.