Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
As FLIP-231[1] mentioned, statistics are one of the most important inputs to the optimizer. Accurate and complete statistics allows the optimizer to be more powerful. "ANALYZE TABLE" syntax is a very common but effective approach to gather statistics, which is already introduced by many compute engines and databases [2][3][4][5][6].
The main purpose of this FLIP is to discuss introducing "ANALYZE TABLE" syntax for Flink sql.
Public Interfaces
Sql Syntax
ANALYZE TABLE table_name [ PARTITION (partcol1[=val1] [, partcol2[=val2], ...]) ] COMPUTE STATISTICS [ FOR COLUMNS col1 [, col2, ...] | FOR ALL COLUMNS ]
Only existing table is supported, and an exception will thrown if the table is a view.
PARTITION (partcol1=val1 [, partcol2=val2, ...])
is optional for the partition table. If no partition is specified, the statistics will be gathered for all partitions. If a certain partition is specified, the statistics will be gathered only for the specific partition. If the table is non-partition table, while a partition is specified, an exception will be thrown.
FOR COLUMNS col1 [, col2, ...]
or FOR ALL COLUMNS
are also optional. If no column is specified, only table level statistics will be gathered. If any columns are specified, the column level statistics will be gathered. The column level statistics include: the number of distinct values (ndv), the number of nulls(nullCount), the average length of column values(avgLen), the max length of column values(maxLen), the min value of column values(minValue), the max values of column values(maxValue), and the value count for boolean type. The following sheet lists the supported types and its corresponding column level statistics:
Types | ndv | nullCount | avgLen | maxLen | maxValue | minValue | valueCount |
BOOLEAN | × | √ | × | × | × | × | √ |
TINYINT | √ | √ | × | × | √ | √ | × |
SMALLINT | √ | √ | × | × | √ | √ | × |
INTEGER | √ | √ | × | × | √ | √ | × |
FLOAT | √ | √ | × | × | √ | √ | × |
DATE | √ | √ | × | × | √ | √ | × |
TIME_WITHOUT_TIME_ZONE | √ | √ | × | × | √ | √ | × |
BIGINT | √ | √ | × | × | √ | √ | × |
DOUBLE | √ | √ | × | × | √ | √ | × |
DECIMAL | √ | √ | × | × | √ | √ | × |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | √ | √ | × | × | √ | √ | × |
TIMESTAMP_WITHOUT_TIME_ZONE | √ | √ | × | × | √ | √ | × |
CHAR | √ | √ | √ | √ | × | × | × |
VARCHAR | √ | √ | √ | √ | × | × | × |
other types | × | √ | × | × | × | × | × |
NOTES: For the fix length types (e.g. BOOLEAN, INTEGER, DOUBLE), we need not to gather the `avgLen`
and `maxLen`
from the original records.
Examples
Partition table
Suppose table MyTable has the schema: a INT, b BIGINT, c VARCHAR and has 4 partitions with the following specs
Partition1: (ds='2022-06-01', hr=1)
Partition2: (ds='2022-06-01', hr=2)
Partition3: (ds='2022-06-02', hr=1)
Partition4: (ds='2022-06-02', hr=2)
-- collect row count for partition1 ANALYZE TABLE MyTable PARTITION (ds='2022-06-01', hr=1) COMPUTE STATISTICS; -- collect row count for partition1 and partition2 ANALYZE TABLE MyTable PARTITION (ds='2022-06-01') COMPUTE STATISTICS; -- collect row count for all partitions ANALYZE TABLE MyTable COMPUTE STATISTICS; -- collect row count and all columns statistics for partition1 ANALYZE TABLE MyTable PARTITION (ds='2022-06-01', hr=1) COMPUTE STATISTICS FOR ALL COLUMNS; -- collect row count and all columns statistics for partition1 and partition2 ANALYZE TABLE MyTable PARTITION (ds='2022-06-01') COMPUTE STATISTICS FOR ALL COLUMNS; -- collect row count and all columns statistics for all partitions ANALYZE TABLE MyTable COMPUTE STATISTICS FOR ALL COLUMNS; -- collect row count and column: a statistics for partition1 ANALYZE TABLE MyTable PARTITION (ds='2022-06-01', hr=1) COMPUTE STATISTICS FOR COLUMNS a; -- collect row count and column: a, b statistics for partition1 and partition2 ANALYZE TABLE MyTable PARTITION (ds='2022-06-01') COMPUTE STATISTICS FOR COLUMNS a, b; -- collect row count and column: a, b, c statistics for all partitions ANALYZE TABLE MyTable COMPUTE STATISTICS FOR COLUMNS a, b, c;
Non-partition table
Suppose table MyTable has the schema: a INT, b BIGINT, c VARCHAR
-- collect row count ANALYZE TABLE MyTable COMPUTE STATISTICS; -- collect row count and all columns statistics ANALYZE TABLE MyTable COMPUTE STATISTICS FOR ALL COLUMNS; -- collect row count and column: a statistics ANALYZE TABLE MyTable COMPUTE STATISTICS FOR COLUMNS a;
Proposed Changes
Currently, ANALYZE TABLE
does not support streaming mode, because the sources may be unbounded and we can not get the accurate results. We can support it in the future for the case: all sources are bounded. So the following discussing is based on batch mode.
When to execute the `ANALYZE TABLE`
command
The `ANALYZE TABLE
` is not triggered automatically, but is triggered manually as needed.
How to collect the statistics
The ANALYZE TABLE
statement will be converted to a SELECT
statement, each statistics item will be converted to aggregate function, e.g. MAX(x) for max value of column values. To collect the number of distinct values for a specified column x
, we can use COUNT(DISTINCT x)
function. But it's a very heavy operation, and there may be hundreds of columns. Statistics does not require accurate value, approximate values are also acceptable. We will introduce APPROX_COUNT_DISTINCT
function which is a lighter approach to compute the number of distinct values. Calcite has introduce the definition of APPROX_COUNT_DISTINCT
, we only need to introduce the implementation.
The following examples shows the SELECT
statements converted from ANALYZE TABLE
statements:
CRATE TABLE Table1 ( a BOOLEAN, b INT, c VARCHAR ) WITH (...); ANALYZE TABLE Table1 COMPUTE STATISTICS FOR ALL COLUMNS; -- the above ANALYZE TABLE statement will be converted to: SELECT COUNT(1) AS rowCount, (COUNT(1) - COUNT(`a`)) AS a_nullCount, COUNT(`a`) FILTER (WHERE `a` IS TRUE) AS a_trueCount, COUNT(`a`) FILTER (WHERE `a` IS FALSE) AS a_falseCount, (COUNT(1) - COUNT(`b`)) AS b_nullCount, APPROX_COUNT_DISTINCT(`b`) AS b_ndv, MAX(`b`) AS b_max, MIN(`b`) AS b_min, (COUNT(1) - COUNT(`c`)) AS c_nullCount, APPROX_COUNT_DISTINCT(`c`) AS c_ndv, MAX(`c`) AS c_max, MIN(`c`) AS c_min, (COUNT(1) - COUNT(`i`)) AS i_nullCount, APPROX_COUNT_DISTINCT(`i`) AS i_ndv, AVG(CAST(CHAR_LENGTH(`i`) AS DOUBLE)) AS i_avgLen, MAX(CAST(CHAR_LENGTH(`i`) AS BIGINT)) AS i_maxLen FROM Table1;
CRATE TABLE Table2 ( a BOOLEAN, b INT, c VARCHAR,
p VARCHAR ) PARTITIONED BY(p) WITH (...); ANALYZE TABLE Table2 PARTITION(p=1) COMPUTE STATISTICS; -- the above ANALYZE TABLE statement will be converted to: SELECT COUNT(1) AS rowCount FROM Table2 where p=1;
The SELECT
job will be submitted via TableEnvironment#executeSql
method, and the execution result will be collected via TableResult.collect
method. Normally, the select job do not take long to execute. So, currently, the execution result collecting and statistics updating are all in client.
How to update the statistics
As we define the unique field name for each statistics item (see the above SELECT statement), we can get the specified value from the result row, and convert them to CatalogTableStatistics
and CatalogColumnStatistics
. Finally, we update the statistics through catalog api. The pseudocode looks like:
CatalogTable table = catalogManager.getTable(...); Catalog catalog = catalogManager.getCatalog(...); ObjectPath objectPath = ...; if (table.isPartitioned()) { // partition table List<CatalogPartitionSpec> targetPartitions = ...; for (CatalogPartitionSpec p : targetPartitions) { // generate the select statement based on AnalyzeTableOperation String statSql = ...; // submit the select job and wait the execution result TableResult tableResult = executeSql(statSql); List<Row> result = CollectionUtil.iteratorToList(tableResult.collect()); Row row = result.get(0); // convert the table statistics from the row CatalogTableStatistics tableStat = convertToTableStatistics(row); catalog.alterPartitionStatistics(objectPath, p, tableStat, false); // convert the column statistics from the row CatalogColumnStatistics columnStat = convertToColumnStatistics(row); catalog.alterPartitionColumnStatistics(objectPath, p, columnStat, false); } } else { // non partition table // generate the select statement based on AnalyzeTableOperation String statSql = ...; // submit the select job and wait the execution result TableResult tableResult = executeSql(statSql); List<Row> result = CollectionUtil.iteratorToList(executeSql(statSql).collect()); Row row = result.get(0); // convert the table statistics from the row CatalogTableStatistics tableStat = convertToTableStatistics(row); catalog.alterTableStatistics(objectPath, tableStat, false); // convert the column statistics from the row CatalogColumnStatistics columnStat = convertToColumnStatistics(row); catalog.alterTableColumnStatistics(objectPath, columnStat, false); }
Compatibility, Deprecation, and Migration Plan
This is new feature, no compatibility, deprecation, and migration plan.
Test Plan
- UT tests will be added to verify the sql parse result
- IT test will be added to verify the the statistics result
Rejected Alternatives
None
POC: https://github.com/godfreyhe/flink/tree/FLIP-240
[2] https://cwiki.apache.org/confluence/display/hive/statsdev
[3] https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-aux-analyze-table.html
[4] https://drill.apache.org/docs/analyze-table-compute-statistics/
[5] https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_4005.htm
[6] https://dev.mysql.com/doc/refman/5.6/en/analyze-table.html