Current state: Under Discussion
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
FLIP-36 proposes to add the cached table to avoid duplicated computation and provide a better user experience on interactive programming. The intermediate result storage is the backend storage where the content of the cached table stored. And FLIP-36 provides a default implementation of the intermediate result storage so that the user can use the cache out of the box. It utilizes the ShuffleMaster/ShuffleService to persist the intermediate result and keep the intermediate result available across jobs.
For more advanced usage of the cache, users may want to plug in some external storages to store the intermediate result. For example, users can use a table source that supports projection or filter push-down, i.e., ParquetTableSource, OrcTableSource. This may reduce the scanning time if the job only used some of the columns of the cached table.
In addition to the performance improvement, the ability to store the intermediate result to external storage is required for across application sharing (Phase 3).
Add an registerIntermediateResultStorage to the TableEnvironment
Introduce New Descriptor
Introduce IntermediateResultStorage Interface
Introduce Configurable Interface
Introduce Table Creation/Cleanup Hook
All the changes we need to make are on the Table API and the runtime is untouched. The workflow of loading and configuring the intermediate result storage is the following:
- Get the intermediate result storage configuration
- The user calls registerIntermediateResultStorage explicitly in the code and passes the configuration.
- The cache manager loads the configuration as YAML file from the classpath
- The cache manager passes the configuration to the TableFactoryService to load and configure the intermediate result storage.
- The TableFactory service finds the implementation using service loader, instantiate and configure the object and return it to the cache manager. The CacheManager maintain the intermediate result storage. After which, it is essentially the same as FLIP-36, i.e., maintaining a set of cached tables, replacing cached tables with TableSink/TableSource.
The key functionality of the pluggable intermediate result storage is writing and reading table to and from external storage. It is similar to what the TableSink and TableSource are doing, but they are also different in that the TableSink and TableSouce are designed to connect to the external storage which is managed by the user, while the cache service should manage the external storage itself. Therefore, the following requirements are needed:
- Create a location for each table on the corresponding external storage if it does not already exist. Currently, only the file system meets the requirement, which will create the file to write to if it is not already created. Other than the file system, the table sink will expect the destination of where the table will be written to is already created by the user. This is unacceptable for it to serve as the storage of intermediate results. Imagine users have to create tens of tables on a MySQL database and tens of other tables for another Flink application. And in some cases, the user may not even know how many tables will be cached.
- Take a table name of a table and map it to the corresponding configuration field of the external storage. At the current state, the TableSinkFactory and TableSourceFactory take a configuration as a Map<String, String> and different TableSinkFactory will map the logical table name to a field with a different key. For example, CsvTableSinkFactoryBase uses “connector.path”, KafkaTableSourceSinkFactoryBase uses “connector.topic‘ and JDBCTableSourceSinkFactory uses “connector.table”. This is an essential requirement as the cache service need to create different TableSink/TableSouce for different tables.
- Clean up the physical storage. As cache storage, it is important to be able to reclaim the storage space when the cache is invalidated, which is not supported currently.
Therefore, the IntermediateResultStorage interface is introduced. Any class that implements this interface can be used by the cache service and served as external storage for the intermediate result. The interface extends the TableFactory interface so that the implementation can be discovered with service loader similar to how the current TableSinkFactory and TableSourceFactory work. And we can reuse the TableFactoryService class to do the discovery and loading.
The TableFactoryService use the service loader to instantiate the object with the parameterless constructor. However, it is limited without the ability to initialize the internal state of the object. Therefore, the IntermediateResultStorage extends the Configurable interface.
Classes that implement Configurable interface should have parameterless constructor, and the configure method should fully configure itself. The TableFactoryService will use the class loader to instantiate the object. Then, TableFactoryService should call the configure method if the object implements the Configurable interface.
The TableCreationHook and TableCleanupHook are functional interfaces, which will be called by the cache service before the cache creating and after the cache is invalidated. The TableCreationHook is responsible for preparing the location to store the content of the cached table and map the table name to fields in the configuration so that the TableSinkFactory/TableSourceFactory can understand. The TableCleanupHook is responsible for deleting the content of the cached tables and reclaim the space.
Since the cache service sits on the client-side, we have to make an assumption at the moment that the client has access to the external storage and it is capable to create and delete tables. In the future, we could integrate with the Flink Driver(FLIP-40) and let the driver run the creation hook and clean up hook in the Flink cluster.
Integration with FLIP-36
The default intermediate result storage could implement the IntermediateResultStorage, and we can have a unified interface for default and external intermediate result storage.
There are some reserved keys used by the cache service and TableFactoryService. Other than those reserved keys, the implementation of the IntermediateResultStorage can specify the required keys and supported keys by implementing the requiredContext and supportedProperties methods.
Here are the reserved keys for IntermediateResultStorage descriptor
Users can write code to configure the IntermediateResultStorage using descriptor similar to how they configure a connector. Similar to the abstract ConnectorDescriptor class, we introduce the abstract class IntermediateResultStorageDescriptor.
To make the external storage configurable without modifying the code, meaning users can use the configuration file (.yaml) to configure the IntermediateResultStorage. One example of such a configuration file can be the following.
The cache service would load the YAML file from the classpath and parse the YAML file to key-value properties that can be used by the Table API. For example, the YAML above needs to be translated to the following:
The TableFactoryService can use fields with prefix intermediate-result-storage.configs to configure the intermediate result storage.
It is backward compatible.
None so far