...
To load plugins, users have to make sure relevant classes are already in classpath.
Java/Scala:
// note the following plugins will be of the order they are specified tableEnv.usePlugins(Core.INSTANCE, MyPlugin.INSTANCE, new Xxx(properties), new Yyy()); |
Yaml file:
Plugins: # note the following plugins will be of the order they are specified - type: core - type: myplugin - type: xxx property1: value1 property2: value2 - type: yyy |
Since it doesn’t make sense to load the same plugin multiple times, plugins don’t need to have names.
...
Plugin interface defines a set of metadata that a plugin can provide to Flink. It provides default implementations for all the APIs thus an implementation can implement only what it’s able to supply.
interface Plugin { default Optional<FunctionDefinition> getFunctionDefinition(String name) { return Optional.empty() }; } |
PluginFactory interface
PluginFactory defines a factory that is used for descriptors to uniquely identify a Plugin in service discovery, and create an instance of the plugin.
interface PluginFactory extends TableFactory { |
CorePlugin and CorePluginFactory
...
We currently only move built-in functions into CorePlugin.
public class CorePlugin implements Plugin { |
class CorePluginFactory { |
PluginManager
PluginManager is responsible for loading all the Plugins, managing their life cycles, and resolve plugin objects.
public class PluginManager { public void setPlugins(List<Plugin> plugins) {
|
FunctionCatalog
FunctionCatalog will hold PluginManager to resolve built-in functions.
class FunctionCatalog implements FunctionLookup { public Optional<FunctionLookup.Result> lookupFunction(String name) { // search built-in functions in PluginManager, rather than BuiltInFunctionsDefinitions // Resolution order depends on FLIP-57: Rework FunctionCatalog } } |
There was some proposals of merging FunctionCatalog with CatalogManager. It will not be considered in this FLIP.
...
To support numerous Hive versions, we will use the shim approach, which is similar to that of existing HiveCatalog. Letting users explicitly specifying Hive versions is necessary since there are differences in Flink-Hive data conversions among different Hive versions.
public class HivePlugin implements Plugin { private final HiveShim hiveShim; this.hiveShim = HiveShimLoader.load(hiveVersion); |
public abstract class HivePluginFactory implements PluginFactory { |
Java/Scala:
tableEnv.usePlugins(CorePlugin.INSTANCE, new HivePlugin("2_2_1")); |
Yaml file:
plugins: - type: core - type: hive hive-version: 2.2.1 |
Limitations and Future Work
...