Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework.  Hence we must provide ability for users to plug resources that help achieve the required capabilities.

Public Interfaces

 While security is prime use cases for this extension. Its not limited to that. Some of the common use cases are

  • Build a custom Authentication filter
  • Build a custom Authorization filter
  • Complex extensions can even provide filters that rewrite/validate the connector requests to enforce additional constraints on the connector configurations

Public Interfaces

Developers would be required to implement only the ConnectRestExtension interface to provide an extension.  ConnectRestExtension provides an implementation of ConnectRestExtensionContext whose configurable() provides an implementation of javax.ws.rs.core.Configurable. Using this developers can register new JAX-RS resources like filter, new end points, etc

 This is a new capability and introduces the following public interfaces

 

Code Block
interface ConnectRestPluginConnectRestExtension extends Closeable{
    void register(ConnectRestPluginContextConnectRestExtensionContext restPluginContext);
}

As mentioned above, even though the developers are required to only implement the ConnectRestExtension, they will be using with the following new public interfaces as well.

ConnectRestExtensionContext

This is a request Context interface that composes and provides access to 

  • Configurable - register JAX RS resources
  • workerConfig - access user provided worker configs
  • clusterState - A new interface that helps provide some cluster state information

Code Block
interface ConnectRestPluginContextConnectRestExtensionContext{
    ResourceConfigConfigurable resourceConfigconfigurable();
    WorkerConfig workerConfig();
    ConnectClusterState clusterState();
}

ConnectClusterState

Provides methods to get some connector states and running connectors. 

 

Code Block
interface ConnectClusterState{
    /**
     * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
     * from the current configuration.
     */
    Collection<String> connectors();
 
    /**
     * Lookup the current status of a connector.
     * @param connName name of the connector
     */
    ConnectorStateInfo connectorStatus(String connName);
 
}


This also introduces a new configuration that  rest.extension.pluginsclasses that allows to configure a comma separated list of Plugin implementaionsRest extension implementations.

Proposed Changes

Plugin Interface

Users will be able to create a plugin by implementing the interface ConnectRestPluginConnectRestExtension. The  key method to implement is the register method which takes ConnectRestPluginContextConnectRestExtensionContext . This allows us to change the interface easily in future to add new parameters. Connect runtime would also provide a default implementation for the interface ConnectRestPluginContextConnectRestExtensionContext One or more of the implementation can be configured via the configuration rest.plugins as a comma separated list of class names.

 

Implementations would use the ResourceConfig to register one or more Jersey resources. They can use the WorkerConfig to get any plugin specific configuration.

Code Block
interface ConnectRestPluginConnectRestExtension extends Closeable{
    void register(ConnectRestPluginContextConnectRestExtensionContext restPluginContext);
}
 
interface ConnectRestPluginContextConnectRestExtensionContext{
    ResourceConfigConfigurable resourceConfigconfigurable();
    WorkerConfig workerConfig();
    ConnectClusterState clusterState();
}
 
class ConnectRestPluginContextImplConnectRestExtensionContextImpl implements ConnectRestPluginContextConnectRestExtensionContext{
    private final ResourceConfigConfigurable resourceConfigconfigurable;
    private final WorkerConfig workerConfig;
    private final ConnectClusterState clusterState;
 
    ConnectRestPluginContextConnectRestExtensionContext(ResourceConfigConfigurable resourceConfigconfigurable, WorkerConfig workerConfig, ConnectClusterState clusterState){
        this.resourceConfigconfigurable = resourceConfigconfigurable;
        this.workerConfig = workerConfig;
        this.clusterState = clusterState;
    }
 
    public ResourceConfig resourceConfigconfigurable(){
        return this.resourceConfigconfigurable;
    }
 
    public WorkerConfig workerConfig(){
        return this.workerConfig;
    }
 
    public ConnectClusterState clusterState(){
        return this.clusterState;
    }
}

 

We will be introducing another new public API ConnectClusterState which will at present provide some of the read only methods from the Herder.  The change would also include a default implementation ConnectClusterStateImpl in the connect runtime that will delegate to the underlying Herder. This will be useful when you want to add new resources like healthcheck, monitoring, etc.

Code Block
interface ConnectClusterState{
    /**
     * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
     * from the current configuration.
     */
    Collection<String> connectors();
 
    /**
     * Lookup the current status of a connector.
     * @param connName name of the connector
     */
    ConnectorStateInfo connectorStatus(String connName);
 
}
 
class ConnectClusterStateImpl implements ConnectClusterState{
    private final Herder herder;
 
    public ConnectClusterStateImpl(Herder herder){
        this.herder = herder;
    }
 
    @Override
    Collection<String> connectors(){
        //delegate to herder
    }
 
    @Override
    ConnectorStateInfo connectorStatus(String connName);{
        //delegate to herder
    }
}
Plugin Integration with Connect

The plugin's would be registered in the RestServer.start(Herder herder) method before registering the default Connect resources. Currently, when there are duplicate registrations the first registration wins in Jersey but it is an implementation detail of Jersey and Connect can't gurantee this. Hence, it is recommended that the plugins don't re-register the default Connect Resources. This could potentially lead to unexpected errors.The close for the plugins would be invoked as part of the stop() in the same class. 

The new extension class and its dependencies would need to be as part of the plugin path. Hence ConnectRestPlugin Hence ConnectRestExtension would be defined as a new plugin to be loaded by the PluginClassLoader.The plugin would be looked up based on Service Provider mechanism instead of the Reflections scan that is used for other plugins. This will help in terms of not adding class loader cost that is associated in scanning the classes today for other plugins.

Example

Consider the following example that defines a single plugin to add an authenticating filter and a health check resource.

Code Block
class  ExampleConnectPlugin implements ConnectRestPluginConnectRestExtension{
    public void register(ConnectRestPluginContextConnectRestExtensionContext restPluginContext){
        resourceConfig.register(new AuthenticationFilter(restPluginContext.workerConfig()));
        resourceConfig.register(new HealthCheckResource(restPluginContext.workerConfig(), restPluginContext.clusterState()));
    }
 
    public void close() {
        //close any resources
    }
}
 
class AuthenticationFilter implements ContainerRequestFilter {
 
  public AuthenticationFilter(WorkerConfig workerConfig){
    //set up filter
  }
     
  @Override
  public void filter(ContainerRequestContext requestContext) {
        //authentication logic
  }
 
}
 
@Path("/connect")
class HealthCheckResource {
 
    public HealthCheckResource(WorkerConfig workerConfig, ConnectClusterState clusterState){
        //initialize resource
    }
 
    @path("/health")
    public void healthcheck(){
        //check herder health
    }
}

The above illustrated plugin can then be configured in the worker's configuration as below

 

Code Block
titleconnect-worker.properties
 // configure plugin
rest.plugins=com.example.ExampleConnectPlugin

 

...