Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
As Flink SQL expands its range of connectors, it becomes crucial to support the lazy initialization of catalogs and the persistence of catalog configurations in order to manage a growing number of catalogs. Implementing lazy initialization for catalogs can decrease overhead of register catalog and simplify management. Additionally, enabling persistence would allow for effortless retrieval and use of catalogs across multiple Flink clusters, promoting their reuse in various applications and use cases.
Public Interfaces
CatalogDescriptor
Add a new class to describe what should be stored in the CatalogStore.
@PublicEvolving public class CatalogDescriptor { /* Catalog name */ private final String catalogName; /* The configuration used to discover and construct the catalog. */ private final Configuration configuration; public String getCatalogName() { return catalogName; } public Configuration getConfiguration() { return configuration; } private CatalogDescriptor(String catalogName, Configuration configuration) { this.catalogName = catalogName; this.configuration = configuration; } public static CatalogDescriptor of(String catalogName, Configuration configuration) { return new CatalogDescriptor(catalogName, configuration) } }
TableEnvironment
Add a new method for register catalog, deprecate the old register method.
@PublicEvolving public interface TableEnvironment { /** * Create a {@link CatalogDescriptor} under a unique name. All tables registered in the {@link CatalogDescriptor} * can be accessed. The CatalogDescriptor is initialized immediately and then saved to the CatalogStore. * * @param catalogName The name under which the catalog will be registered. * @param catalogDescriptor The catalogDescriptor to register. */ void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor); /** * Registers a {@link Catalog} under a unique name. All tables registered in the {@link Catalog} * can be accessed. * * @param catalogName The name under which the catalog will be registered. * @param catalog The catalog to register. */ @Deprecated void registerCatalog(String catalogName, Catalog catalog); }
CatalogStore
Adding the CatalogStore
interface would enable the storage and retrieval of properties linked to catalogs.
@PublicEvolving public interface CatalogStore { /** * Store a catalog under the give name. The catalog name must be unique. * * @param catalogName name under which to register the given catalog * @param catalog catalog instance to store * @throws CatalogException if the registration of the catalog under the given name failed */ void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; /** * Unregisters a catalog under the given name. The catalog name must be existed. * * @param catalogName name under which to unregister the given catalog. * @param ignoreIfNotExists If false exception will be thrown if the table or database or * catalog to be altered does not exist. * @throws CatalogException if the unregistration of the catalog under the given name failed */ void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; /** * Gets a catalog by name. * * @param catalogName name of the catalog to retrieve * * @return the requested catalog or empty if it does not exist */ Optional<CatalogDescriptor> getCatalog(String catalogName); /** * Retrieves names of all registered catalogs. * * @return a set of names of registered catalogs */ Set<String> listCatalogs(); /** * Check if there is a corresponding catalog with the given name in CatalogStore. * * @return whether there is a corresponding Catalog with the given name */ boolean contains(String catalogName); /** * Initialization method for the CatalogStore. * */ void open(); /** * Tear-down method for the CatalogStore. * */ void close(); }
CatalogStoreFactory
To enable plug-in management of CatalogStore implementations through the service provider interface (SPI), it would be beneficial to introduce a CatalogStoreFactory interface. This interface would include a method for creating a CatalogStore instance.
Typically, a factory only creates a single instance. However, in the Flink SQL gateway scenario, multiple sessions are maintained, with each session creating a CatalogStore instance. This can result in a significant number of CatalogStore instances being created. For example, we may have a MySQLCatalogStore that stores Catalog information in MySQL, with each CatalogStore establishing a connection to MySQL. To address this issue, we can create a connection pool in the CatalogStoreFactory. This approach allows us to initialize the connection pool in the open method, pass it to other CatalogStores in the createStore method, and destroy it in the close method.
@PublicEvolving public interface CatalogStoreFactory extends Factory { /** * Creates a {@link CatalogStore} instance from context information. * **/ CatalogStore createCatalogStore(Context context); /** * Initialization method for the CatalogStoreFactory. * **/ void open(); /** * Tear-down method for the CatalogStoreFactory. * **/ void close(); }
CatalogStore Registration Using Configuration
By default, there are two built-in CatalogStores available: the In-Memory CatalogStore and the File CatalogStore, And we can only specify one CatalogStore. In-MemoryCatalog will be the default Catalog
In-MemoryCatalogStore
table.catalog-store.kind: in-memory
FileCatalogStore
FileCatalogStore will save the catalog configuration to the specific directory.
table.catalog-store.kind: file table.catalog-store.file.path: file:///xxxx/xxx
Custom CatalogStore
If you have implemented a custom CatalogStore, you can configure the following parameters to make it effective.
table.catalog-store.kind: {identifier} table.catalog-store.{identifier}.{option1}: xxx1 table.catalog-store.{identifier}.{option2}: xxx2 table.catalog-store.{identifier}.{option3}: xxx3
CatalogStore Registration Using Table API
Add a new method to EnvironmentSettings::Builder for specifying the CatalogStore. In EnviromentSettings, InMemoryCatalogStore will be the default catalogStore.
@PublicEvolving public class EnvironmentSettings { @PublicEvolving public static class Builder { private final Configuration configuration = new Configuration(); private ClassLoader classLoader; private CatalogStore catalogStore; public Builder() {} .... /** Sets a catalogStore for retrieving the configuration of the Catalog.*/ public Builder withCatalogStore(CatalogStore catalogStore) { this.catalogStore = catalogStore; return this; } /** Returns an immutable instance of {@link EnvironmentSettings}. */ public EnvironmentSettings build() { if (classLoader == null) { classLoader = Thread.currentThread().getContextClassLoader(); } if (catalogStore == null) { catalogStore = new InMemoryCatalogStore(); } return new EnvironmentSettings(configuration, classLoader, catalogStore); } private CatalogStore lookupCatalogStore() { // Construct CatalogStore } }
For Table API Users
public class CatalogStoreExample { public static void main(String[] args) throws Exception { // Initialize a catalog Store CatalogStore catalogStore = new FileCatalogStore(""); // set up the Table API final EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode() .withCatalogStore(catalogStore) .build(); final TableEnvironment tableEnv = TableEnvironment.create(settings); } }
Proposed Changes
CatalogStore Initialize and close
To initialize the CatalogStore, the CatalogManager needs to have additional methods for opening and closing.
@Internal public final class CatalogManager { /** * initialize the catalogStore */ public void open() { catalogStore.open(); } /** * Close all initialized catalogs. */ public void close() { catalogs.forEach(catalog -> catalog.close()); catalogStore.close(); } }
Catalog Configuration Persistence
1. Add createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) for register the catalog and store catalogDescriptor into CatalogStore.
2. Deprecate the registerCatalog(String catalogName, Catalog catalog) method.
public final class CatalogManager { private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); // Store the initialized catalogs. private final Map<String, Catalog> catalogs; // CatalogStore for storing the CatalogDescriptor. private final CatalogStore catalogStore; ..... /** * Registers a catalog under the given name. The catalog name must be unique. * Add the catalog to the CatalogStore and verify if there is already a catalog with the same name in the store. * * @param catalogName name under which to register the given catalog * @param catalog catalog to register * @throws CatalogException if the registration of the catalog under the given name failed */ public void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); checkNotNull(catalogDescriptor, "Catalog cannot be null"); if (catalogStore.contains(catalogName) { throw new CatalogException(String.format("Catalog %s already exists.", catalogName)); } else { if (catalogs.contains(catalogName)) { throw new CatalogException(String.format("Catalog %s already exists in initialized catalogs.", catalogName)); } Catalog catalog = initCatalog(catalogDescriptor); catalogs.put(catalogName, catalog); catalogStore.storeCatalog(catalogName, catalogDescriptor); } catalog.open(); } /** * Gets a catalog by name. * * @param catalogName name of the catalog to retrieve * @return the requested catalog or empty if it does not exist */ public Optional<Catalog> getCatalog(String catalogName) { // Get catalog from the catalogs. if (catalogs.containsKey(catalogName)) { return Optional.of(catalogs.get(catalogName)); } // Get catalog from the CatalogStore. Optional<CatalogDescriptor> optionalDescriptor = catalogStore.get(catalogName); if (optionalDescriptor.isPresent()) { Catalog catalog = initCatalog(optionalDescriptor.get()); } return Optional.empty(); } // Init the catalog instance by CatalogDescriptor private Catalog initCatalog(CatalogDescriptor descriptor) { // Discover the catalog and init } }
Behavior of execute CREATE CATALOG statement
When the user executes the "CREATE CATALOG" statement, the method createCatalog(String catalogName, CatalogDescriptor) is utilized to save the catalog configuration and initialize the catalog instance.
public class CreateCatalogOperation implements CreateOperation { @Override public TableResultInternal execute(Context ctx) { try { CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, properties); ctx.getCatalogManager().createCatalog(catalogName, catalog); return TableResultImpl.TABLE_RESULT_OK; } catch (CatalogException e) { throw new ValidationException( String.format("Could not execute %s", asSummaryString()), e); } } }
Add build-in catalogStore
There are two built-in CatalogStores available, In-MemoryCatalogStore will be the default CatalogStore.
In-MemoryCatalogStore
In-Memory Catalog Store will be the default catalog store. By default, Catalog configurations are not shared between sessions, so the In-MemoryCatalogStore will not save any catalog configuration.
FileCatalogStore
FileCatalogStore will store all the catalog to a file directory. Each catalog is associated with a file that stores its configuration, and deleting the catalog will remove this file.
FileCatalogStore uses YAML format to save configuration, which is used by flink-conf.yaml
➜ catalog_path tree . ├── hive_catalog1.yaml ├── hive_catalog2.yaml ├── jdbc_catalog1.yaml └── jdbc_catalog2.yaml
Example of the file content
type: jdbc, default-database: test-database, username: test-user, password: test-pass, base-url: mysql://127.0.0.1:3306/test-database
Note: CatalogStore must be initialized at the same time as CatalogManager during initialization, and once CatalogManager is declared, CatalogStore cannot be modified again.
Conflict Scenarios
Currently, CatalogManager will cache the initialized Catalog by default. Therefore, in a multi-session scenario, if one session deletes a Catalog, other sessions may not be able to synchronize the deletion.
For this situation, we should make our caching logic configurable in the future.
There may be three scenarios:
- caching all initialized Catalog
- using LRU Cache caching the most frequently used Catalog to avoid occupying too much memory;
- not caching any Catalog instances.
Compatibility, Deprecation, and Migration Plan
Compatibility:
InMemoryCatalogStore will be the default CatalogStore
The current implementation will not cause compatibility issues. The original Catalog registration process can still be used, including CatalogManager::registerCatalog(String catalogName, Catalog catalog) and TableEnvironment::registerCatalog(String catalogName, Catalog catalog).
When using the CREATE CATALOG statement, the CatalogManager::createCatalog(String catalogName, CatalogDescriptor) method will be used by default. This method will initialize the Catalog instance by default and is consistent with the previous logic.
When using a different CatalogStore
, such as FileCatalogStore
, the configuration of the Catalog will persist to an external system when executing the CREATE CATALOG
statement. Even after a Flink session is restarted, the previously registered Catalog can still be used in the TableEnvironment
.
Deprecation:
TableEnvironment
TableEnvironment::registerCatalog(String catalogName, Catalog catalog) will be deprecated, We will use TableEnvironment::createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) when execute CREATE CATALOG statement.
We also recommend that TABLE API users create Catalogs using the new functions.
@PublicEvolving public interface TableEnvironment { /** * Registers a {@link CatalogDescriptor} under a unique name. All tables registered in the {@link CatalogDescriptor} * can be accessed. The CatalogDescriptor is initialized immediately and then saved to the CatalogStore. * * @param catalogName The name under which the catalog will be registered. * @param catalogDescriptor The catalogDescriptor of the Catalog. */ void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor); /** * Registers a {@link Catalog} under a unique name. All tables registered in the {@link Catalog} * can be accessed. * * @param catalogName The name under which the catalog will be registered. * @param catalog The catalog to register. */ @Deprecated void registerCatalog(String catalogName, Catalog catalog); }
Test Plan
UT
Rejected Alternatives
Make Catalog serializable and move the creation and caching of Catalog into CatalogStore
Here is a brief descriptions of the design:
- Add toProperties method to the Catalog make catalog serializable.
- Remove Map<String, Catalog> catalogs in the CatalogManager.
- The registerCatalog(String catalogName, Catalog catalog) will store the catalog to CatalogStore.
- The getCatalog(String catalogName) will get a Catalog instance from CatalogStore
The CatalogStore Interface
/** Interfaces defining catalog-related behaviors */ public interface CatalogStore { /** * Store a catalog under the give name. The catalog name must be unique. * * @param catalogName name under which to register the given catalog * @param properties catalog properties to store * @throws CatalogException if the registration of the catalog under the given name failed */ void storeCatalog(String catalogName, Catalog catalog) throws CatalogException; /** * Unregisters a catalog under the given name. The catalog name must be existed. * * @param catalogName name under which to unregister the given catalog. * @param ignoreIfNotExists If false exception will be thrown if the table or database or * catalog to be altered does not exist. * @throws CatalogException if the unregistration of the catalog under the given name failed */ void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; /** * Gets a catalog by name. * * @param catalogName name of the catalog to retrieve * @return the requested catalog or empty if it does not exist */ Optional<Catalog> getCatalog(String catalogName); /** * Retrieves names of all registered catalogs. * * @return a set of names of registered catalogs */ Set<String> listCatalogs(); }
Here is the implementation logic of registerCatalog
:
/** * Registers a catalog under the given name. The catalog name must be unique. * * @param catalogName name under which to register the given catalog * @param catalog catalog to register * @throws CatalogException if the registration of the catalog under the given name failed */ public void registerCatalog(String catalogName, Catalog catalog) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); checkNotNull(catalog, "Catalog cannot be null"); if (catalogStore.contains(catalogName)) { throw new CatalogException(format("Catalog %s already exists.", catalogName)); } catalogStore.store(catalogName, catalog); }
Here is the implementation logic of getCatalog
:
/** * Gets a catalog by name. * Get Catalog Instance from CatalogStore. * * @param catalogName name of the catalog to retrieve * @return the requested catalog or empty if it does not exist */ public Optional<Catalog> getCatalog(String catalogName) { return catalogStore.getCatalog(catalogName); }