Discussion thread

Vote thread

https://lists.apache.org/list.html?dev@flink.apache.org

JIRA

FLINK-38259 - Getting issue details... STATUS

Release

<Flink Version>

Status

Accepted

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink SQL handles external connectivity by defining endpoints and credentials in table configuration. This approach prevents reusability of these connections and makes table definition less secure by exposing sensitive information directly in catalog metadata.

We propose the introduction of a new Connection resource in Flink, backed by a pluggable SecretStore architecture. This will provide a secure, reusable way to manage external connectivity where sensitive credentials are stored separately from connection metadata, enabling seamless and secure integration with external systems.

The connection resource will provide a new, optional way to manage external connectivity in Flink SQL and Table API. Existing methods for table definitions will remain unchanged.

Goals

  • Security: Ensure sensitive credentials are never exposed in catalog metadata, logs, or configuration files

  • Reusability: Enable the same connection to be shared across multiple tables, models, and functions

  • Manageability: Provide centralized connection management for large-scale deployments

  • Integration: Seamlessly work with existing enterprise secret management solutions

  • Consistency: Align with Flink's existing catalog and resource management patterns

Public Interfaces

SecretStore Interface

/**
 * Interface for managing secrets in Flink.
 * Delegates to external secret management systems.
 */
@PublicEvolving
public interface SecretStore {
}


/**
 * Interface for managing secrets in Flink.
 * Delegates to external secret management systems.
 */
@PublicEvolving
public interface ReadableSecretStore extends SecretStore {
    /**
     * Retrieves secret data by identifier.
     * 
     * @param secretId The unique identifier of the secret
     * @return The secret data
     * @throws SecretNotFoundException if the secret doesn't exist
     */
    Map<String, String> getSecret(String secretId) throws SecretNotFoundException;
}


/**
 * Interface for managing secrets in Flink.
 * Delegates to external secret management systems.
 */
@PublicEvolving
public interface WritableSecretStore extends SecretStore {
    /**
     * Stores secret data and returns a unique identifier.
     * 
     * @param secretData The secret key-value pairs to store
     * @return A unique identifier for retrieving the secret
     */
    String storeSecret(Map<String, String> secretData);
   /**
     * Removes a secret from the store.
     * 
     * @param secretId The unique identifier of the secret
     */
    void removeSecret(String secretId);
	/**
     * Atomically updates an existing secret with new data.
     * 
     * @param secretId The existing secret identifier
     * @param newSecretData The new secret data
     * @throws SecretNotFoundException if the secret doesn't exist
     */
    void updateSecret(String secretId, Map<String, String> newSecretData) 
        throws SecretNotFoundException;
}

SecretStore Factory

/**
 * Factory for creating SecretStore instances.
 */
@PublicEvolving
public interface SecretStoreFactory extends Factory {
    
    /**
     * Creates a SecretStore instance.
     */
    SecretStore createSecretStore();

    /**
     * Initialize secret store.
     */
    void open(Context context) throws Exception;

    /**
     * Close secret store
     */
    void close() throws CatalogException;

    interface Context {
        /**
         * Returns the options with which the secret store is created.
         *
         * <p>An implementation should perform validation of these options.
         */
        Map<String, String> getOptions();

        /** Gives read-only access to the configuration of the current session. */
        ReadableConfig getConfiguration();

        /**
         * Returns the class loader of the current session.
         *
         * <p>The class loader is in particular useful for discovering further (nested) factories.
         */
        ClassLoader getClassLoader();
    }
}

CatalogConnection Interface

/**
 * Interface for a connection in a catalog.
 * Contains non-sensitive configuration and references to secrets.
 */
@PublicEvolving
public interface CatalogConnection {
    
    /**
     * Returns non-sensitive connection options.
     */
    Map<String, String> getOptions();
    
    /**
     * Get comment of the connection.
     */
    String getComment();
    
    /**
     * Get a deep copy of the CatalogConnection instance.
     */
    CatalogConnection copy();
}

/**
 * Interface for a connection that may contain sensitive data
 */
@PublicEvolving
public interface SensitiveConnection {
    
    /**
     * Returns connection options which contains plain secrets.
     */
    Map<String, String> getOptions();
    
    /**
     * Get comment of the connection.
     */
    String getComment();
}    

ObjectIdentifier

public final class ObjectIdentifier implements Serializable {
    public of(String objectName);
    
    boolean hasOnlyObjectName();
}


CatalogTable

public interface CatalogTable {
    Optional<ObjectIdentifier> getConnection();
}

Connection Factory

/**
 * Factory for processing connection configurations.
 * Extracts secrets during creation, resolves them during usage.
 */
@PublicEvolving
public interface ConnectionFactory extends Factory {
    
    /**
     * Processes raw connection options and returns a catalog connection.
     * Extracts sensitive data to SecretStore during processing.
     * 
     * @param connectionToEncrypt Raw connection  (may contain sensitive data)
     * @param secretStore The SecretStore to use for storing secrets
     * @return CatalogConnection with secrets extracted
     */
    CatalogConnection createConnection(
        SensitiveConnection connectionToEncrypt, 
        WritableSecretStore secretStore);
    
    /**
     * Resolves a connection for usage by merging catalog data with secrets.
     * 
     * @param connectionToDecrypt The catalog connection with secret references
     * @param secretStore The SecretStore to retrieve secrets from
     * @return Complete connection configuration with resolved secrets
     */
    SensitiveConnection resolveConnection(
        CatalogConnection connectionToDecrypt, 
        ReadableSecretStore secretStore);
}

Enhanced Catalog Interface

public interface Catalog {
    // ... existing methods ...
    
    /**
     * Get names of all connections under this database.
     */
    default List<String> listConnections(String databaseName) 
            throws DatabaseNotExistException, CatalogException {
        return Collections.emptyList();
    }
    
    /**
     * Returns a CatalogConnection identified by the given ObjectPath.
     * Uses 3-part identifier: catalog.database.connection
     */
    default CatalogConnection getConnection(ObjectPath connectionPath) 
            throws ConnectionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }
    
    /**
     * Creates a new connection.
     */
    default void createConnection(ObjectPath connectionPath, CatalogConnection connection, boolean ignoreIfExists) 
            throws ConnectionAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }
    
    /**
     * Modifies an existing connection.
     */
    default void alterConnection(ObjectPath connectionPath, CatalogConnection newConnection, boolean ignoreIfNotExists) 
            throws ConnectionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }
    
    /**
     * Drop a connection.
     */
    default void dropConnection(ObjectPath connectionPath, boolean ignoreIfNotExists) 
            throws ConnectionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }
    
    // Additional methods: connectionExists, renameConnection...
}

EnvironmentSettings Enhancement

public class EnvironmentSettings {

    Class Builder {
      /**
       * Configures SecretStore using factory pattern.
       */
      public Builder withSecretStore(SecretStore secretStore) {
          // Builder with SecretStore implementation
      }
    }
}

Table API Changes

public class ConnectionDescriptor {
    
    public Map<String, String> getOptions();
    public String getComment();
    public Builder toBuilder();
    public Builder forType(String type);

    public 
    public static class Builder {
        private Map<String, String> connectionOptions;
        private @Nullable String comment;
        
        public Builder();
        public Builder(ConnectionDescriptor);
        
        public void option(String key, String value);
        public void option(ConfigOption<T> configOption, T value);
        public void comment();
        public ConnectionDescriptor build();
    }
}

public class TableEnvironment {
    public void createConnection(String path, ConnectionDescriptor connectionDescriptor);
    public void createConnection(String path, ConnectionDescriptor connectionDescriptor, boolean ignoreIfExists);
    public void createTemporaryConnection(String path, ConnectionDescriptor connectionDescriptor);
    public void createTemporaryConnection(String path, ConnectionDescriptor connectionDescriptor, boolean ignoreIfExists);
    public void createTemporarySystemConnection(String path, ConnectionDescriptor connectionDescriptor);
    public void createTemporarySystemConnection(String path, ConnectionDescriptor connectionDescriptor, boolean ignoreIfExists);
    
    public boolean dropConnection(String path);
    public boolean dropConnection(String path, boolean ignoreIfNotExists);
    public boolean dropTemporaryConnection(String path); 
    public boolean dropTemporarySystemConnection(String path);
}



Proposed Changes

SQL Syntax

Create Connection

CREATE [TEMPORARY] [SYSTEM] CONNECTION [IF NOT EXISTS] [catalog_name.][db_name.]connection_name 
[COMMENT connection_comment] 
WITH (
    'type' = 'basic' | 'bearer' | 'oauth' | 'custom_type',
    'url' = 'endpoint_url',
    'username' = 'user_name',     -- Will be extracted to SecretStore
    'password' = 'user_password', -- Will be extracted to SecretStore
    ...
)

Note: for system connection, simple identifier will be used instead of the compound identifier which consists of catalog and database name. System connection can be used to create catalog. In this FLIP, we only focus on temporary system connection. We can explore persistent system connection in subsequent FLIPs.

Use Connection

-- Using CONNECTION keyword in DDL
CREATE TABLE customers (
    customer_id INT,
    name STRING,
    PRIMARY KEY (customer_id) NOT ENFORCED
) USING CONNECTION mycat.mydb.mysql_connection
WITH (
    'table-name' = 'customers',
    'lookup.cache' = 'PARTIAL'
);

Connection Management

-- Alter connection
ALTER CONNECTION mycat.mydb.mysql_connection SET (
    'password' = 'new_password'  -- Updates secret in SecretStore
);

-- Drop connection  
DROP CONNECTION [IF EXISTS] mycat.mydb.mysql_connection;

-- Describe connection (secrets are redacted)
DESCRIBE CONNECTION mycat.mydb.mysql_connection;

-- Show connections
SHOW CONNECTIONS [LIKE 'pattern'] [FROM catalog_name.db_name];

-- Show create connection
SHOW CREATE CONNECTION <connection_name>

Implementation Flow

Connection Creation Flow

  1. User executes CREATE CONNECTION ... WITH (username='user', password='pass', url='...')

  2. SQL parser identifies connection type and finds corresponding ConnectionFactory

  3. ConnectionFactory.createConnection() is called:

    • Extracts sensitive fields (username, password)

    • Stores secrets in SecretStore, gets secretId

    • Returns CatalogConnection with non-sensitive options + secretId

  4. Catalog stores the CatalogConnection (no secrets in catalog)

    • For catalog connection, it's stored in catalog
    • For temporary system connection, it's stored in memory map in CatalogManager

Connection Usage Flow

  1. User creates table with USING CONNECTION mycat.mydb.conn

  2. SQL parser retrieves CatalogConnection from catalog

  3. ConnectionFactory.resolveConnection() is called from DynamicTableSourceSpec:

    • Gets secretId from CatalogConnection

    • Retrieves secrets from SecretStore

    • Merges secrets with non-sensitive options

    • Merge connection options with table options

  4. DynamicTableFactory receives complete options with secrets via Context.getCatalogTable()

Examples

Connection Factory Examples

Default Authentication Factory

public class DefaultConnectionFactory implements ConnectionFactory {
    
    public static final String IDENTIFIER = "default";
    // This conforms to what's defined in GlobalConfiguration      
    public static final Set<String> SECRET_FIELDS = Set.of("password", "secret", "fs.azure.account.key", "apikey", "api-key", "auth-params", "service-key", "token", "basic-auth", "jaas.config", "http-headers");
    private static final String SECRET_KEY = "encrypted_secret_key";
    
    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }
    
    @Override
    public CatalogConnection createConnection(Map<String, String> options, SecretStore secretStore) {
        // Extract secrets
        Map<String, String> secrets = new HashMap<>();
        Map<String, String> publicOptions = new HashMap<>(options);
        
        for (String secretField : SECRET_FIELDS) {
            if (options.containsKey(secretField)) {
                secrets.put(secretField, options.get(secretField));
                publicOptions.remove(secretField);
            }
        }
        
        // Store secrets if any
        String secretId = secrets.isEmpty() ? null : secretStore.storeSecret(secrets);
        if (secretId != null) {
            publicOptions.put(SECRET_KEY, secretId);
        }
        
        return new DefaultCatalogConnection(publicOptions, null);
    }
    
    @Override
    public SensitiveConnection resolveConnection(CatalogConnection connection, SecretStore secretStore) {
        Map<String, String> resolved = new HashMap<>(connection.getOptions());
        
        // Merge secrets if they exist
        if (connection.getOptions().containsKey(SECRET_KEY)) {
            Map<String, String> secrets = secretStore.getSecret(connection.getOptions().get(SECRET_KEY));
            resolved.putAll(secrets);
            resolved.remove(SECRET_KEY);
        }
        
        return new DefaultSensitiveConnection(resolved, null);
    }
}

Example: Default Authentication

-- Create connection with username/password (secrets extracted automatically)
CREATE CONNECTION mycat.mydb.mysql_prod WITH (
    'type' = 'default',
    'hostname' = 'jdbc:mysql://prod-db:3306/orders',
    'port' = '1234',
    'username' = 'prod_user',    -- Moved to SecretStore
    'password' = 'secret123'     -- Moved to SecretStore
);

-- Catalog stores:
-- {
--   'type' = 'default',
--   'hostname' = 'jdbc:mysql://prod-db:3306/orders',
--   'username' = 'prod_user',
--   'secret.id' = 'conn-12345'
-- }
-- SecretStore stores: {password' = 'secret123'}

-- Use connection
CREATE TABLE orders (
    order_id INT,
    customer_id INT,
    amount DECIMAL(10,2)
) USING CONNECTION mycat.mydb.mysql_prod
WITH (
    'connector' = 'jdbc',
    'tables' = 'orders'
    ...
);


In this example, connection mycat.mydb.mysql_prod  options will be merged with table options for DynamicTableFactory during runtime. The hostname , username , password and port  fields in connection are defined properties for mysql connector.

SecretStore Implementations

Example: Azure Key Vault

public class AzureKeyVaultSecretStoreFactory implements SecretStoreFactory {
    
    public static final String IDENTIFIER = "azure-key-vault";
    
    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }
    
    @Override
    public SecretStore createSecretStore(Context context) {
        ReadableConfig options = context.getOptions();
        String vaultUrl = options.get(VAULT_URL);
        String clientId = options.get(CLIENT_ID);
        
        return new AzureKeyVaultSecretStore(vaultUrl, clientId);
    }
}

Registration

# META-INF/services/org.apache.flink.table.factories.SecretStoreFactory
com.azure.flink.AzureKeyVaultSecretStoreFactory
com.aws.flink.AwsSecretsManagerSecretStoreFactory
org.apache.flink.table.store.InMemorySecretStoreFactory

Usage

// Configure SecretStore in Table API
EnvironmentSettings settings = EnvironmentSettings
    .withSecretStore(new AzureSecretStore());

TableEnvironment tableEnv = TableEnvironment.create(settings);
// Configure SecretStore in yaml file
table.secret-store.kind: azure-key-vault
table.secret-store.azure-key-vault.vault.url: https://company-vault.vault.azure.net/

Table API syntax

// Create connection
tableEnv.createConnection("mycat.mydb.mysql_prod", 
    ConnectionDescriptor.forType("basic")
        .option("url", "jdbc:mysql://prod:3306/db")
        .option("username", "user")    // Will be moved to SecretStore
        .option("password", "pass")    // Will be moved to SecretStore
        .build());

// Use connection in table
tableEnv.createTable("orders",
    TableDescriptor.forConnector("jdbc")
        .connection("mycat.mydb.mysql_prod")  // Reference to connection
        .option("table-name", "orders")
        .schema(Schema.newBuilder()...build())
        .build());


Compatibility, Deprecation, and Migration Plan

Backward Compatibility

  • Full backward compatibility: Existing table definitions continue to work unchanged

  • Gradual adoption: Users can migrate to connections incrementally

  • No breaking changes: All existing APIs remain functional

Migration Path

  1. Phase 1: Introduce SecretStore and Connection APIs

  2. Phase 2: Users create connections for new tables/models

  3. Phase 3: Migration utilities to convert existing configurations


Connection Naming Pros and Cons

There were two strong contenders for the naming discussion - Connection and Integration. The pros and cons are listed below. After careful consideration, we think Connection is a more familiar term for users and conveys the security purpose of this object better.


Connection

Integration

Pros

  1. Emphasizes security and access: "Connection" naturally conveys something that holds credentials, endpoints, and is concerned with authentication.

  2. Familiar to users: Widely used in data systems (e.g., JDBC connections), so it aligns with user expectations.

Sounds higher-level: May appeal to users who are building pipelines or platforms and think in terms of "integrating systems".

Cons

Overlap with Flink connectors: Might cause slight confusion with Flink’s connector modules (e.g., Kafka connector), though context may solve this.

  1. Too broad/vague: Might be unclear if it refers to just connection details or a full-fledged pipeline, transformation, or system sync.

  2. Security less obvious: Doesn’t immediately imply that it contains sensitive info like secrets.

  3. Possible confusion: Users may expect integration to handle more logic (e.g., mapping, enrichment), not just configuration.

Test Plan

Unit Tests

  • SecretStore factory discovery and creation

  • ConnectionFactory secret extraction and resolution

  • Catalog connection CRUD operations

  • Error handling for missing secrets

Integration Tests

  • End-to-end connection creation and usage

  • SecretStore integration with mock external stores

  • Connection-based table/model creation

  • DDL syntax validation

Rejected Alternatives

The following ideas were rejected

  • Storing secrets in catalog: Rejected for security reasons

  • Static configuration files: Doesn't align with SQL-native resource management

  • Naming: The name “integration” for this object

Future Enhancements

  • System-Level Connections: Global connections accessible across all catalogs

References


  • No labels