DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Discussion thread | |
|---|---|
Vote thread | |
JIRA | FLINK-38259 - Getting issue details... STATUS |
Release | <Flink Version> |
Status | Accepted |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Flink SQL handles external connectivity by defining endpoints and credentials in table configuration. This approach prevents reusability of these connections and makes table definition less secure by exposing sensitive information directly in catalog metadata.
We propose the introduction of a new Connection resource in Flink, backed by a pluggable SecretStore architecture. This will provide a secure, reusable way to manage external connectivity where sensitive credentials are stored separately from connection metadata, enabling seamless and secure integration with external systems.
The connection resource will provide a new, optional way to manage external connectivity in Flink SQL and Table API. Existing methods for table definitions will remain unchanged.
Goals
Security: Ensure sensitive credentials are never exposed in catalog metadata, logs, or configuration files
Reusability: Enable the same connection to be shared across multiple tables, models, and functions
Manageability: Provide centralized connection management for large-scale deployments
Integration: Seamlessly work with existing enterprise secret management solutions
Consistency: Align with Flink's existing catalog and resource management patterns
Public Interfaces
SecretStore Interface
/**
* Interface for managing secrets in Flink.
* Delegates to external secret management systems.
*/
@PublicEvolving
public interface SecretStore {
}
/**
* Interface for managing secrets in Flink.
* Delegates to external secret management systems.
*/
@PublicEvolving
public interface ReadableSecretStore extends SecretStore {
/**
* Retrieves secret data by identifier.
*
* @param secretId The unique identifier of the secret
* @return The secret data
* @throws SecretNotFoundException if the secret doesn't exist
*/
Map<String, String> getSecret(String secretId) throws SecretNotFoundException;
}
/**
* Interface for managing secrets in Flink.
* Delegates to external secret management systems.
*/
@PublicEvolving
public interface WritableSecretStore extends SecretStore {
/**
* Stores secret data and returns a unique identifier.
*
* @param secretData The secret key-value pairs to store
* @return A unique identifier for retrieving the secret
*/
String storeSecret(Map<String, String> secretData);
/**
* Removes a secret from the store.
*
* @param secretId The unique identifier of the secret
*/
void removeSecret(String secretId);
/**
* Atomically updates an existing secret with new data.
*
* @param secretId The existing secret identifier
* @param newSecretData The new secret data
* @throws SecretNotFoundException if the secret doesn't exist
*/
void updateSecret(String secretId, Map<String, String> newSecretData)
throws SecretNotFoundException;
}
SecretStore Factory
/**
* Factory for creating SecretStore instances.
*/
@PublicEvolving
public interface SecretStoreFactory extends Factory {
/**
* Creates a SecretStore instance.
*/
SecretStore createSecretStore();
/**
* Initialize secret store.
*/
void open(Context context) throws Exception;
/**
* Close secret store
*/
void close() throws CatalogException;
interface Context {
/**
* Returns the options with which the secret store is created.
*
* <p>An implementation should perform validation of these options.
*/
Map<String, String> getOptions();
/** Gives read-only access to the configuration of the current session. */
ReadableConfig getConfiguration();
/**
* Returns the class loader of the current session.
*
* <p>The class loader is in particular useful for discovering further (nested) factories.
*/
ClassLoader getClassLoader();
}
}
CatalogConnection Interface
/**
* Interface for a connection in a catalog.
* Contains non-sensitive configuration and references to secrets.
*/
@PublicEvolving
public interface CatalogConnection {
/**
* Returns non-sensitive connection options.
*/
Map<String, String> getOptions();
/**
* Get comment of the connection.
*/
String getComment();
/**
* Get a deep copy of the CatalogConnection instance.
*/
CatalogConnection copy();
}
/**
* Interface for a connection that may contain sensitive data
*/
@PublicEvolving
public interface SensitiveConnection {
/**
* Returns connection options which contains plain secrets.
*/
Map<String, String> getOptions();
/**
* Get comment of the connection.
*/
String getComment();
}
ObjectIdentifier
public final class ObjectIdentifier implements Serializable {
public of(String objectName);
boolean hasOnlyObjectName();
}
CatalogTable
public interface CatalogTable {
Optional<ObjectIdentifier> getConnection();
}
Connection Factory
/**
* Factory for processing connection configurations.
* Extracts secrets during creation, resolves them during usage.
*/
@PublicEvolving
public interface ConnectionFactory extends Factory {
/**
* Processes raw connection options and returns a catalog connection.
* Extracts sensitive data to SecretStore during processing.
*
* @param connectionToEncrypt Raw connection (may contain sensitive data)
* @param secretStore The SecretStore to use for storing secrets
* @return CatalogConnection with secrets extracted
*/
CatalogConnection createConnection(
SensitiveConnection connectionToEncrypt,
WritableSecretStore secretStore);
/**
* Resolves a connection for usage by merging catalog data with secrets.
*
* @param connectionToDecrypt The catalog connection with secret references
* @param secretStore The SecretStore to retrieve secrets from
* @return Complete connection configuration with resolved secrets
*/
SensitiveConnection resolveConnection(
CatalogConnection connectionToDecrypt,
ReadableSecretStore secretStore);
}
Enhanced Catalog Interface
public interface Catalog {
// ... existing methods ...
/**
* Get names of all connections under this database.
*/
default List<String> listConnections(String databaseName)
throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
/**
* Returns a CatalogConnection identified by the given ObjectPath.
* Uses 3-part identifier: catalog.database.connection
*/
default CatalogConnection getConnection(ObjectPath connectionPath)
throws ConnectionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
/**
* Creates a new connection.
*/
default void createConnection(ObjectPath connectionPath, CatalogConnection connection, boolean ignoreIfExists)
throws ConnectionAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
/**
* Modifies an existing connection.
*/
default void alterConnection(ObjectPath connectionPath, CatalogConnection newConnection, boolean ignoreIfNotExists)
throws ConnectionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
/**
* Drop a connection.
*/
default void dropConnection(ObjectPath connectionPath, boolean ignoreIfNotExists)
throws ConnectionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
// Additional methods: connectionExists, renameConnection...
}
EnvironmentSettings Enhancement
public class EnvironmentSettings {
Class Builder {
/**
* Configures SecretStore using factory pattern.
*/
public Builder withSecretStore(SecretStore secretStore) {
// Builder with SecretStore implementation
}
}
}
Table API Changes
public class ConnectionDescriptor {
public Map<String, String> getOptions();
public String getComment();
public Builder toBuilder();
public Builder forType(String type);
public
public static class Builder {
private Map<String, String> connectionOptions;
private @Nullable String comment;
public Builder();
public Builder(ConnectionDescriptor);
public void option(String key, String value);
public void option(ConfigOption<T> configOption, T value);
public void comment();
public ConnectionDescriptor build();
}
}
public class TableEnvironment {
public void createConnection(String path, ConnectionDescriptor connectionDescriptor);
public void createConnection(String path, ConnectionDescriptor connectionDescriptor, boolean ignoreIfExists);
public void createTemporaryConnection(String path, ConnectionDescriptor connectionDescriptor);
public void createTemporaryConnection(String path, ConnectionDescriptor connectionDescriptor, boolean ignoreIfExists);
public void createTemporarySystemConnection(String path, ConnectionDescriptor connectionDescriptor);
public void createTemporarySystemConnection(String path, ConnectionDescriptor connectionDescriptor, boolean ignoreIfExists);
public boolean dropConnection(String path);
public boolean dropConnection(String path, boolean ignoreIfNotExists);
public boolean dropTemporaryConnection(String path);
public boolean dropTemporarySystemConnection(String path);
}
Proposed Changes
SQL Syntax
Create Connection
CREATE [TEMPORARY] [SYSTEM] CONNECTION [IF NOT EXISTS] [catalog_name.][db_name.]connection_name
[COMMENT connection_comment]
WITH (
'type' = 'basic' | 'bearer' | 'oauth' | 'custom_type',
'url' = 'endpoint_url',
'username' = 'user_name', -- Will be extracted to SecretStore
'password' = 'user_password', -- Will be extracted to SecretStore
...
)
Note: for system connection, simple identifier will be used instead of the compound identifier which consists of catalog and database name. System connection can be used to create catalog. In this FLIP, we only focus on temporary system connection. We can explore persistent system connection in subsequent FLIPs.
Use Connection
-- Using CONNECTION keyword in DDL
CREATE TABLE customers (
customer_id INT,
name STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
) USING CONNECTION mycat.mydb.mysql_connection
WITH (
'table-name' = 'customers',
'lookup.cache' = 'PARTIAL'
);
Connection Management
-- Alter connection
ALTER CONNECTION mycat.mydb.mysql_connection SET (
'password' = 'new_password' -- Updates secret in SecretStore
);
-- Drop connection
DROP CONNECTION [IF EXISTS] mycat.mydb.mysql_connection;
-- Describe connection (secrets are redacted)
DESCRIBE CONNECTION mycat.mydb.mysql_connection;
-- Show connections
SHOW CONNECTIONS [LIKE 'pattern'] [FROM catalog_name.db_name];
-- Show create connection
SHOW CREATE CONNECTION <connection_name>
Implementation Flow
Connection Creation Flow
User executes
CREATE CONNECTION ... WITH (username='user', password='pass', url='...')SQL parser identifies connection type and finds corresponding
ConnectionFactoryConnectionFactory.createConnection() is called:
Extracts sensitive fields (
username,password)Stores secrets in
SecretStore, getssecretIdReturns
CatalogConnectionwith non-sensitive options +secretId
Catalog stores the
CatalogConnection(no secrets in catalog)- For catalog connection, it's stored in catalog
- For temporary system connection, it's stored in memory map in CatalogManager
Connection Usage Flow
User creates table with
USING CONNECTION mycat.mydb.connSQL parser retrieves
CatalogConnectionfrom catalogConnectionFactory.resolveConnection() is called from
DynamicTableSourceSpec:Gets
secretIdfromCatalogConnectionRetrieves secrets from
SecretStoreMerges secrets with non-sensitive options
Merge connection options with table options
DynamicTableFactory receives complete options with secrets via
Context.getCatalogTable()
Examples
Connection Factory Examples
Default Authentication Factory
public class DefaultConnectionFactory implements ConnectionFactory {
public static final String IDENTIFIER = "default";
// This conforms to what's defined in GlobalConfiguration
public static final Set<String> SECRET_FIELDS = Set.of("password", "secret", "fs.azure.account.key", "apikey", "api-key", "auth-params", "service-key", "token", "basic-auth", "jaas.config", "http-headers");
private static final String SECRET_KEY = "encrypted_secret_key";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public CatalogConnection createConnection(Map<String, String> options, SecretStore secretStore) {
// Extract secrets
Map<String, String> secrets = new HashMap<>();
Map<String, String> publicOptions = new HashMap<>(options);
for (String secretField : SECRET_FIELDS) {
if (options.containsKey(secretField)) {
secrets.put(secretField, options.get(secretField));
publicOptions.remove(secretField);
}
}
// Store secrets if any
String secretId = secrets.isEmpty() ? null : secretStore.storeSecret(secrets);
if (secretId != null) {
publicOptions.put(SECRET_KEY, secretId);
}
return new DefaultCatalogConnection(publicOptions, null);
}
@Override
public SensitiveConnection resolveConnection(CatalogConnection connection, SecretStore secretStore) {
Map<String, String> resolved = new HashMap<>(connection.getOptions());
// Merge secrets if they exist
if (connection.getOptions().containsKey(SECRET_KEY)) {
Map<String, String> secrets = secretStore.getSecret(connection.getOptions().get(SECRET_KEY));
resolved.putAll(secrets);
resolved.remove(SECRET_KEY);
}
return new DefaultSensitiveConnection(resolved, null);
}
}
Example: Default Authentication
-- Create connection with username/password (secrets extracted automatically)
CREATE CONNECTION mycat.mydb.mysql_prod WITH (
'type' = 'default',
'hostname' = 'jdbc:mysql://prod-db:3306/orders',
'port' = '1234',
'username' = 'prod_user', -- Moved to SecretStore
'password' = 'secret123' -- Moved to SecretStore
);
-- Catalog stores:
-- {
-- 'type' = 'default',
-- 'hostname' = 'jdbc:mysql://prod-db:3306/orders',
-- 'username' = 'prod_user',
-- 'secret.id' = 'conn-12345'
-- }
-- SecretStore stores: {password' = 'secret123'}
-- Use connection
CREATE TABLE orders (
order_id INT,
customer_id INT,
amount DECIMAL(10,2)
) USING CONNECTION mycat.mydb.mysql_prod
WITH (
'connector' = 'jdbc',
'tables' = 'orders'
...
);
In this example, connection mycat.mydb.mysql_prod options will be merged with table options for DynamicTableFactory during runtime. The hostname , username , password and port fields in connection are defined properties for mysql connector.
SecretStore Implementations
Example: Azure Key Vault
public class AzureKeyVaultSecretStoreFactory implements SecretStoreFactory {
public static final String IDENTIFIER = "azure-key-vault";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public SecretStore createSecretStore(Context context) {
ReadableConfig options = context.getOptions();
String vaultUrl = options.get(VAULT_URL);
String clientId = options.get(CLIENT_ID);
return new AzureKeyVaultSecretStore(vaultUrl, clientId);
}
}
Registration
# META-INF/services/org.apache.flink.table.factories.SecretStoreFactory com.azure.flink.AzureKeyVaultSecretStoreFactory com.aws.flink.AwsSecretsManagerSecretStoreFactory org.apache.flink.table.store.InMemorySecretStoreFactory
Usage
// Configure SecretStore in Table API
EnvironmentSettings settings = EnvironmentSettings
.withSecretStore(new AzureSecretStore());
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Configure SecretStore in yaml file table.secret-store.kind: azure-key-vault table.secret-store.azure-key-vault.vault.url: https://company-vault.vault.azure.net/
Table API syntax
// Create connection
tableEnv.createConnection("mycat.mydb.mysql_prod",
ConnectionDescriptor.forType("basic")
.option("url", "jdbc:mysql://prod:3306/db")
.option("username", "user") // Will be moved to SecretStore
.option("password", "pass") // Will be moved to SecretStore
.build());
// Use connection in table
tableEnv.createTable("orders",
TableDescriptor.forConnector("jdbc")
.connection("mycat.mydb.mysql_prod") // Reference to connection
.option("table-name", "orders")
.schema(Schema.newBuilder()...build())
.build());
Compatibility, Deprecation, and Migration Plan
Backward Compatibility
Full backward compatibility: Existing table definitions continue to work unchanged
Gradual adoption: Users can migrate to connections incrementally
No breaking changes: All existing APIs remain functional
Migration Path
Phase 1: Introduce SecretStore and Connection APIs
Phase 2: Users create connections for new tables/models
Phase 3: Migration utilities to convert existing configurations
Connection Naming Pros and Cons
There were two strong contenders for the naming discussion - Connection and Integration. The pros and cons are listed below. After careful consideration, we think Connection is a more familiar term for users and conveys the security purpose of this object better.
Connection | Integration | |
|---|---|---|
Pros |
| Sounds higher-level: May appeal to users who are building pipelines or platforms and think in terms of "integrating systems". |
Cons | Overlap with Flink connectors: Might cause slight confusion with Flink’s connector modules (e.g., Kafka connector), though context may solve this. |
|
Test Plan
Unit Tests
SecretStore factory discovery and creation
ConnectionFactory secret extraction and resolution
Catalog connection CRUD operations
Error handling for missing secrets
Integration Tests
End-to-end connection creation and usage
SecretStore integration with mock external stores
Connection-based table/model creation
DDL syntax validation
Rejected Alternatives
The following ideas were rejected
Storing secrets in catalog: Rejected for security reasons
Static configuration files: Doesn't align with SQL-native resource management
Naming: The name “integration” for this object
Future Enhancements
System-Level Connections: Global connections accessible across all catalogs
References