Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-376 there was introduced DISTRIBUTED BY/INTO functionality to support bucketing concept in Flink, then in FLIP-435 there were introduced Materialized tables. However so far there is no support for DISTRIBUTED BY/INTO for Materialized tables.

Another thing which this FLIP is going to address is ability to SHOW MATERIALIZED TABLES

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines

Public Interfaces

New Supported SQL Syntax


So supported syntax for Materialized tabled would look like

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[ ([ <table_constraint> ]) ]
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
[ <distribution> ]
[WITH (key1=val1, key2=val2, ...)]
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>
<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<distribution>:
{
    DISTRIBUTED BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
  | DISTRIBUTED INTO n BUCKETS
}

example:

CREATE MATERIALIZED TABLE my_materialized_table
    PARTITIONED BY (ds)
    DISTRIBUTED INTO 5 BUCKETS
    FRESHNESS = INTERVAL '1' HOUR
    AS SELECT 
        ds
    FROM
     ...

And also add possibility to add/drop distribution like for tables

ALTER MATERIALIZED TABLE my_materialized_table DROP DISTRIBUTION

and 

ALTER MATERIALIZED TABLE my_materialized_table ADD DISTRIBUTION INTO 3 BUCKETS

modify distribution

ALTER MATERIALIZED TABLE my_materialized_table MODIFY DISTRIBUTION INTO 3 BUCKETS

or

ALTER MATERIALIZED TABLE my_materialized_table MODIFY DISTRIBUTION BY (uid) INTO 5 BUCKETS

SHOW MATERIALIZED TABLES

SHOW MATERIALIZED TABLES


Pass distribution to existing interfaces

CatalogMaterializedTable
/** Interface for a materialized table in a catalog. */
@PublicEvolving
public interface CatalogMaterializedTable {
    /** Returns the distribution of the materialized table if the {@code DISTRIBUTED} clause is defined. */
    default Optional<TableDistribution> getDistribution() {
        return Optional.empty();
    }

    /** Builder for configuring and creating instances of {@link CatalogMaterializedTable}. */
    @PublicEvolving
    class Builder {
    ...
    private @Nullable TableDistribution distribution;
    ...
    
        public Builder distribution(@Nullable TableDistribution distribution) {
            this.distribution = distribution;
            return this;
        }
        ...
    }
}
ResolvedCatalogMaterializedTable
@PublicEvolving
public class ResolvedCatalogMaterializedTable
        implements ResolvedCatalogBaseTable<CatalogMaterializedTable>, CatalogMaterializedTable {

...

    @Override
    public Optional<TableDistribution> getDistribution() {
        ...
    }

...
}


MaterializedTableChange (no changes expected) already extends TableChange containing

AddDistribution add(TableDistribution distribution)

ModifyDistribution modify(TableDistribution distribution)


and for SHOW MATERIALIZED TABLES


@PublicEvolving
public interface Catalog {

...


    /**
     * Get names of all materialized tables under this database. An empty list is returned if none exists.
     *
     * @param databaseName the name of the given database
     * @return a list of the names of all materialized tables in the given database
     * @throws DatabaseNotExistException if the database does not exist
     * @throws CatalogException in case of any runtime exception
     */
     default List<String> listMaterializedTables(String databaseName) throws DatabaseNotExistsException, CatalogException {          
         throw new UnsupportedOperationException(
                String.format(
                        "listMaterializedTables(String) is not implemented for %s.", this.getClass()));
     }

...
}


Compatibility, Deprecation, and Migration Plan

  • The change is fully backward compatible as the clause is optional; existing statements remain valid.