Status
Motivation
Flink creates DynamicTableSource
for each source table independently when it generates plan for a SQL job, sources are not aware of each other. In some situation we need to generate options for each source based on all source tables in the same job. For example, we use Flink as OLAP cluster for our HTAP system[1] which will synchronize all data from an OLTP system to column storage at the database level, and then perform OLAP queries in Flink cluster on a unified version to ensure consistency of results. Similarly, we also mentioned in FLIP-276 [2] that it is necessary to support consistent queries across multiple tables.
Besides, many connectors have confidential parameters, such as the username and password of the jdbc connector. Currently, we can only configure them in catalog config file or table options in the sql statement, which is not appropriate.
We would like to introduce a customized provider to generate dynamic options for all source and sink tables, then Flink job can create sources and sinks based on these options.
Proposal
We submit SQL statements to Sql-Gateway through the jdbc driver, Sql-Gateway parses SQL to an Operation
and performs it in TableEnvironment
. So Sql-Gateway could load the options provider from configuration and register it to TableEnvironment
before submitting the Operation
. Users who use TableEnvironment
to perform jobs directly can create and register the provider themselves. Then TableEnvironment
could traverser all Operations
, get options for each source and sink from the provider and add them to source and sink operations.
Public Interfaces
TableDynamicOptionProvider
We add TableDynamicOptionProvider
to provide dynamic options for each source/sink in jobs.
/** Dynamic option provider for source and sink tables. */ @PublicEvolving public interface TableDynamicOptionProvider { /** * Provide dynamic options for source and sink tables. * * @param context The context for job to get table dynamic options. * @return The source dynamic options. */ TableDynamicOpitons getDynamicOptions(TableOptionsContext context); @PublicEvolving static interface TableOptionsContext { /** * Configuration for the job which provide job customized options, for example, * different strategy to generate table dynamic options. **/ ReadableConfig jobConfiguration(); /** * Get catalog context for given catalog which is used to identify physical catalog. * There will be some information about options in `CatalogContext` such as the location * where the option values for tables are stored. **/ CatalogContext catalogContext(String catalog); /** All source tables for the job. */ Set<ObjectIdentifier> sources(); /** * Get current options for the source table which is used to determine the dynamic * options for the table, for example, jdbc table names in the options will be sued to * generate username and password. **/ Map<String, String> sourceOptions(ObjectIdentifier table); /** All sink table for the job. */ Set<ObjectIdentifier> sinks(); /** * Get current options for the sink table which is used to determine the dynamic * options for the table, for example, jdbc table names in the options will be sued to * generate username and password. **/ Map<String, String> sinkOptions(ObjectIdentifier table); } /** Table dynamic option result from the provider. */ @PublicEvolving static interface TableDynamicOpitons { /** Get source options for given table identifier. */ Map<String, String> sourceOptions(ObjectIdentifier table); /** Get sink options for given table identifier. */ Map<String, String> sinkOptions(ObjectIdentifier table); } } /** Factory to create table dynamic option provider. */ @PublicEvolving public interface TableDynamicOptionProviderFactory extends Factory { TableDynamicOptionProvider createProvider(Context context); /** Context for dynamic option provider factory. */ static interface Context { ReadableConfig getConfiguration(); ClassLoader getClassLoader(); } }
TableEnvironment
Add registerTableOptionProvider
to TableEnvironment
.
/** Add register provider method in table environment. */ public interface TableEnvironment { void registerTableOptionProvider(TableDynamicOptionProvider provider); }
Options
Add new option for provider factory in SqlGatewayOptions
.
sql-gateway.table-dynamic-option-provider-factory: {provider factory class identifier}
Proposed Changes
Load Provider In Gateway
Sql-Gateway could load the provider factory and options provider from configuration for DefaultContext
, then a session is created by SessionManager
can get the provider as follows.
/** Load the provider factory and create provider for DefaultContext. */ public class DefaultContext { private final TableDynamicOptionProvider tableOptionProvider; ...; public static DefaultContext load( Configuration dynamicConfig, List<URL> dependencies, boolean discoverExecutionConfig, boolean discoverPythonJar) { ...; TableDynamicOptionProviderFactory providerFactory = FactoryUtil.discoverFactory( DefaultContext.class.getClassLoader(), TableDynamicOptionProviderFactory.class, dynamicConfig.get(SOURCE_DYNAMIC_OPTIONS_FACTORY)); TableDynamicOptionProvider optionsProvider = providerFactory.create(new Context() { ...; }); ...; return new DefaultContext(..., optionsProvider); } }
Option Provider In TableEnvironment
There will be a registerTableOptionProvider
method in TableEnvironment
. For Sql-Gateway, it can register the loaded provider to TableEnvironment
before submitting Operation
, and for the users who don't submit jobs through Sql-Gateway, they can create and register TableDynamicOptionProvider
for the table environment themselves.TableEnvironment
will traverse the received Operation
, get all QueryOperation
and ModifyOperation
for sources and sinks, and get options for all the source and sink tables at once. After that, the options for each table will be added to the QueryOperation
and ModifyOperation
.
public class TableEnvironmentImpl implements TableEnvironmentInternal { @Nullable private TableDynamicOptionProvider tableDynamicOptionProvider; /** Register source dynamic option provider. */ @Override public void registerTableDynamicOptionProvider( TableDynamicOptionProvider TableDynamicOptionProvider) { this.TableDynamicOptionProvider = TableDynamicOptionProvider; } /** Get options for source/sink tables and add them to specified operation. */ private void provideTableDynamicOptions(List<Operation> operations) { Map<ObjectIdentifier, List<SourceQueryOperation>> identifierSourceOperations = traversesourceOperations(operations); TableDynamicOpitons tableDynamicOptions = tableDynamicOptionProvider.getDynamicOptions( new TableOptionsContext() { ... }); // Add options in QueryOperation and ModifyOperation. ...; } }
Dynamic Options In Operation
Currently there is a field named dynamicOptions
in SourceQueryOperation
and SinkModifyOperation
which will be used to create the source and sink tables from catalogs. We can add dynamic options generated from provider to dynamicOptions
and then Flink can create tables according to these dynamic options. For the source and sink Operations
without dynamicOptions
field, we can add it to them and create RelNode
with it in converter.
public class SourceQueryOperation implements QueryOperation { private final Map<String, String> dynamicOptions ....; /** Add options to dynamicOptions if it is not existed. */ public void addDynamicOptions(Map<String, String> options) { options.forEach((k, v) -> dynamicOptions.putIfAbsent(k, v)); } } public class SinkModifyOperation implements ModifyOperation { private final Map<String, String> dynamicOptions; public void addDynamicOptions(Map<String, String> options) { options.forEach((k, v) -> dynamicOptions.putIfAbsent(k, v)); } }
There will be three types of options for tables: options in CREATE ... WITH(options), options from Provider and options in /*+ OPTIONS(...) */
for SQL. The priority here is the lowest for WITH
, followed by Provider, and the highest in OPTIONS.
Use Case
Generate Snapshot IDs For Paimon
All tables of a database are synchronized to Paimon such as tableA, tableB and tableC, the data in these tables are managed by an unified version which is mapping to different snapshot ids in them. We can create a PaimonSnapshotTableOptionProvider implements TableDynamicOptionProvider
to get different snapshot ids for the above tables according to the unified version in Sql-Gateway as follows.
SET 'sql-gateway.table-dynamic-option-provider-factory'='paimon-provider'; // Set the paimon provider for Sql-Gateway SELECT * FROM tableA JOIN tableB ON ... JOIN tableC ON ...;
In the above SELECT
query, Flink will use snapshot ids for tableA, tableB and tableC from provider to get specific snapshot data from these tables. For example, snapshot1
for tableA, snapshot3
for tableB and snapshot10
for tableC belong to the same version, they will be provided by the paimon provider for the query even when it is not configured in the sql statement.
Generate Username And Password For Jdbc
Suppose there is a Security Service stores the username and password for RDS and we can develop a JdbcTableSecurityServiceProvider
to get them for jdbc tables in job. We can write our jobs in sql as follows and submit it to Sql-Gateway
SET 'sql-gateway.table-dynamic-option-provider-factory'='jdbc-security-provider'; // Set the jdbc security provider for Sql-Gateway CREATE TABLE my_jdbc_table ( val1 INT, val2 INT) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'mysql_table' //'username' = '****' We don't need to provide username and password for job here //'password' = '****' ); INSERT INTO my_jdbc_table SELECT ... FROM ... GROUP BY ...;
In the above case, we don't need to add username
and password
options in our SQL job or just give a placeholder configuration, and we can generate the real username and password for the jdbc tables in a security provider. After dba update the security information for the tables, we can just restart our jobs without any changes in jobs.
Compatibility, Deprecation, and Migration Plan
This is a newly added feature, so there will be no compatibility issues
Test Plan
UT & IT
[1] https://www.vldb.org/pvldb/vol15/p3411-chen.pdf