This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Page tree
Skip to end of metadata
Go to start of metadata


Current stateUnder Discussion

Discussion thread: here

JIRA: FLINK-13570 - Getting issue details... STATUS


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


FLIP-36 proposes to add the cached table to avoid duplicated computation and provide a better user experience on interactive programming. The intermediate result storage is the backend storage where the content of the cached table stored. And FLIP-36 provides a default implementation of the intermediate result storage so that the user can use the cache out of the box. It utilizes the ShuffleMaster/ShuffleService to persist the intermediate result and keep the intermediate result available across jobs. 

For more advanced usage of the cache, users may want to plug in some external storages to store the intermediate result. For example, users can use a table source that supports projection or filter push-down, i.e., ParquetTableSource, OrcTableSource. This may reduce the scanning time if the job only used some of the columns of the cached table. 

In addition to the performance improvement, the ability to store the intermediate result to external storage is required for across application sharing (Phase 3). 

Public Interfaces

Add an registerIntermediateResultStorage to the TableEnvironment

	 * Register the intermediate result storage, which is used to store the content of the cached table.
	 * @param descriptor the descriptor contains all the necessary information to discover and
	 *        configure the intermediate result storage.
	void registerIntermediateResultStorage(IntermediateResultStorageDescriptor descriptor);

Introduce New Descriptor

public abstract class IntermediateResultStorageDescriptor implements Descriptor {
	private String type;
	private Map<String, String> config = new HashMap<>();

	 * add the key value pair to the config
	public IntermediateResultStorageDescriptor withConfig(String key, String value) {...}
	 * set the type of the intermediate result storage
	public IntermediateResultStorageDescriptor withType(String type) {...}

	 * Converts this descriptor into a set of intermediate result storage properties.
	protected abstract Map<String, String> toIntermediateResultStorageProperties();

Introduce IntermediateResultStorage Interface

 * This interface represents a backend end storage where the intermediate result stored upon caching.
 * It provides the necessary methods for the CacheManager to substitute cached table with TableSink and TableSource.
public interface IntermediateResultStorage extends Configurable, TableFactory {

	 * Get the TableSourceFactory which the cache manager uses to create the TableSource to replace the cached table.
	TableSourceFactory getTableSourceFactory();

	 * Get the TableSinkFactory which the cache manager uses to create the TableSink when a table need to be cached.
	TableSinkFactory getTableSinkFactory();

	 * The cache manager gets the TableCleanUpHook and invoke the cleanup method to reclaim the space when
	 * some cached tables are invalidate.
	TableCleanUpHook getCleanUpHook();

	 * Table cache manager gets the TableCreationHook and invoke the createTable method before creating the
	 * TableSource or TableSink.
	TableCreationHook getTableCreationHook();

Introduce Configurable Interface

 * The Configurable interface indicates that the class can be instantiated by reflection.
 * And it needs to take some configuration parameters to configure itself.
public interface Configurable {

	 * Configure the class with the given key-value pairs
	void configure(Map<String, String> configs);

Introduce Table Creation/Cleanup Hook

 * The interface is used by the CacheManager before creating the TableSink and TableSource.
public interface TableCreationHook {
	 * Create the physical location to store the table and return the
	 * configuration that encodes the given table name.
	Map<String, String> createTable(String tableName, Map<String, String> properties);

 * The interface is used by the CacheManager whenever some table caches are invalidated.
public interface TableCleanUpHook {

	 * Delete the physical storage for tables with the given table names.
	void clean(Collection<String> tableNames, Map<String, Map<String, String>> properties);

Proposed Changes

All the changes we need to make are on the Table API and the runtime is untouched. The workflow of loading and configuring the intermediate result storage is the following:

  1. Get the intermediate result storage configuration
    1. The user calls registerIntermediateResultStorage explicitly in the code and passes the configuration.
    2. The cache manager loads the configuration as YAML file from the classpath 
  2. The cache manager passes the configuration to the TableFactoryService to load and configure the intermediate result storage.
  3. The TableFactory service finds the implementation using service loader, instantiate and configure the object and return it to the cache manager. The CacheManager maintain the intermediate result storage. After which, it is essentially the same as FLIP-36, i.e., maintaining a set of cached tables, replacing cached tables with TableSink/TableSource.

Configurable IntermediateResultStorage

The key functionality of the pluggable intermediate result storage is writing and reading table to and from external storage. It is similar to what the TableSink and TableSource are doing, but they are also different in that the TableSink and TableSouce are designed to connect to the external storage which is managed by the user, while the cache service should manage the external storage itself. Therefore, the following requirements are needed:

  • Create a location for each table on the corresponding external storage if it does not already exist. Currently, only the file system meets the requirement, which will create the file to write to if it is not already created. Other than the file system, the table sink will expect the destination of where the table will be written to is already created by the user. This is unacceptable for it to serve as the storage of intermediate results. Imagine users have to create tens of tables on a MySQL database and tens of other tables for another Flink application. And in some cases, the user may not even know how many tables will be cached.
  • Take a table name of a table and map it to the corresponding configuration field of the external storage. At the current state, the TableSinkFactory and TableSourceFactory take a configuration as a Map<String, String> and different TableSinkFactory will map the logical table name to a field with a different key. For example, CsvTableSinkFactoryBase uses “connector.path”, KafkaTableSourceSinkFactoryBase uses “connector.topic‘ and JDBCTableSourceSinkFactory uses “connector.table”.  This is an essential requirement as the cache service need to create different TableSink/TableSouce for different tables.
  • Clean up the physical storage. As cache storage, it is important to be able to reclaim the storage space when the cache is invalidated, which is not supported currently.

Therefore, the IntermediateResultStorage interface is introduced. Any class that implements this interface can be used by the cache service and served as external storage for the intermediate result. The interface extends the TableFactory interface so that the implementation can be discovered with service loader similar to how the current TableSinkFactory and TableSourceFactory work. And we can reuse the TableFactoryService class to do the discovery and loading. 

The TableFactoryService use the service loader to instantiate the object with the parameterless constructor. However, it is limited without the ability to initialize the internal state of the object. Therefore, the IntermediateResultStorage extends the Configurable interface.

Classes that implement Configurable interface should have parameterless constructor, and the configure method should fully configure itself. The TableFactoryService will use the class loader to instantiate the object. Then, TableFactoryService should call the configure method if the object implements the Configurable interface.

The TableCreationHook and TableCleanupHook are functional interfaces, which will be called by the cache service before the cache creating and after the cache is invalidated. The TableCreationHook is responsible for preparing the location to store the content of the cached table and map the table name to fields in the configuration so that the TableSinkFactory/TableSourceFactory can understand. The TableCleanupHook is responsible for deleting the content of the cached tables and reclaim the space.

Since the cache service sits on the client-side, we have to make an assumption at the moment that the client has access to the external storage and it is capable to create and delete tables. In the future, we could integrate with the Flink Driver(FLIP-40) and let the driver run the creation hook and clean up hook in the Flink cluster. 

Integration with FLIP-36

The default intermediate result storage could implement the IntermediateResultStorage, and we can have a unified interface for default and external intermediate result storage.

Configure IntermediateResultStorage


There are some reserved keys used by the cache service and TableFactoryService. Other than those reserved keys, the implementation of the IntermediateResultStorage can specify the required keys and supported keys by implementing the requiredContext and supportedProperties methods.

Here are the reserved keys for IntermediateResultStorage descriptor

  • intermediate-result-storage.type
  • intermediate-result-storage.config

With Code

Users can write code to configure the IntermediateResultStorage using descriptor similar to how they configure a connector. Similar to the abstract ConnectorDescriptor class, we introduce the abstract class IntermediateResultStorageDescriptor.

    new FileSystemIntermediateResultStorage()
        .withConfig("cache-path", "/tmp")
        .withConfig("key", "value")


To make the external storage configurable without modifying the code, meaning users can use the configuration file (.yaml) to configure the IntermediateResultStorage. One example of such a configuration file can be the following.

  type: filesystem
    cache-path: /tmp
    key: value

The cache service would load the YAML file from the classpath and parse the YAML file to key-value properties that can be used by the Table API. For example, the YAML above needs to be translated to the following:

intermediate-result-storage.type = filesystem
intermediate-result-storage.config.cache-path = /tmp
intermediate-result-storage.config.key = value

The TableFactoryService can use fields with prefix intermediate-result-storage.configs to configure the intermediate result storage.

Migration Plan

It is backward compatible.

Rejected Alternatives

None so far

  • No labels