DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In FLIP-376 there was introduced DISTRIBUTED BY/INTO functionality to support bucketing concept in Flink, then in FLIP-435 there were introduced Materialized tables. However so far there is no support for DISTRIBUTED BY/INTO for Materialized tables.
Another thing which this FLIP is going to address is ability to SHOW MATERIALIZED TABLES
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
Public Interfaces
New Supported SQL Syntax
So supported syntax for Materialized tabled would look like
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[ ([ <table_constraint> ]) ]
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
[ <distribution> ]
[WITH (key1=val1, key2=val2, ...)]
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<distribution>:
{
DISTRIBUTED BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
| DISTRIBUTED INTO n BUCKETS
}
example:
CREATE MATERIALIZED TABLE my_materialized_table
PARTITIONED BY (ds)
DISTRIBUTED INTO 5 BUCKETS
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
ds
FROM
...
And also add possibility to add/drop distribution like for tables
ALTER MATERIALIZED TABLE my_materialized_table DROP DISTRIBUTION
and
ALTER MATERIALIZED TABLE my_materialized_table ADD DISTRIBUTION INTO 3 BUCKETS
modify distribution
ALTER MATERIALIZED TABLE my_materialized_table MODIFY DISTRIBUTION INTO 3 BUCKETS
or
ALTER MATERIALIZED TABLE my_materialized_table MODIFY DISTRIBUTION BY (uid) INTO 5 BUCKETS
SHOW MATERIALIZED TABLES
SHOW MATERIALIZED TABLES
Pass distribution to existing interfaces
CatalogMaterializedTable
/** Interface for a materialized table in a catalog. */
@PublicEvolving
public interface CatalogMaterializedTable {
/** Returns the distribution of the materialized table if the {@code DISTRIBUTED} clause is defined. */
default Optional<TableDistribution> getDistribution() {
return Optional.empty();
}
/** Builder for configuring and creating instances of {@link CatalogMaterializedTable}. */
@PublicEvolving
class Builder {
...
private @Nullable TableDistribution distribution;
...
public Builder distribution(@Nullable TableDistribution distribution) {
this.distribution = distribution;
return this;
}
...
}
}
ResolvedCatalogMaterializedTable
@PublicEvolving
public class ResolvedCatalogMaterializedTable
implements ResolvedCatalogBaseTable<CatalogMaterializedTable>, CatalogMaterializedTable {
...
@Override
public Optional<TableDistribution> getDistribution() {
...
}
...
}
MaterializedTableChange (no changes expected) already extends TableChange containing
AddDistribution add(TableDistribution distribution)
ModifyDistribution modify(TableDistribution distribution)
and for SHOW MATERIALIZED TABLES
@PublicEvolving
public interface Catalog {
...
/**
* Get names of all materialized tables under this database. An empty list is returned if none exists.
*
* @param databaseName the name of the given database
* @return a list of the names of all materialized tables in the given database
* @throws DatabaseNotExistException if the database does not exist
* @throws CatalogException in case of any runtime exception
*/
default List<String> listMaterializedTables(String databaseName) throws DatabaseNotExistsException, CatalogException {
throw new UnsupportedOperationException(
String.format(
"listMaterializedTables(String) is not implemented for %s.", this.getClass()));
}
...
}
Compatibility, Deprecation, and Migration Plan
The change is fully backward compatible as the clause is optional; existing statements remain valid.