DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Connect provides a rich ecosystem of pluggable components, including Connectors, Converters, Transformations, and Predicates.
Each of these component types exposes configuration metadata through a config() method that returns a ConfigDef object describing the configuration properties the component accepts.
However, this config() method is currently defined independently across multiple interfaces and classes:
public abstract class Connector implements Versioned {
public abstract ConfigDef config();
}
public interface Converter {
default ConfigDef config() {
return new ConfigDef();
}
}
public interface HeaderConverter extends Configurable, Closeable {
ConfigDef config();
}
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
ConfigDef config();
}
public interface Predicate<R extends ConnectRecord<R>> extends Configurable, AutoCloseable {
ConfigDef config();
}
This fragmentation creates several challenges:
Component Discovery: Tools that need to discover and introspect component configurations must search for multiple different interfaces/classes. There is no single, unified mechanism to identify all classes that provide configuration specifications.
Code Duplication: The same method signature is duplicated across multiple interfaces.
Extensibility: Future component types that require configuration specification must remember to implement the config() method independently, without compile-time enforcement of a common contract.
Tooling Complexity: External tools (such as schema generators, UI builders, validation frameworks, and documentation generators) must maintain separate logic to handle each component type, increasing maintenance burden and the risk of inconsistencies.
This improvement benefits several use cases:
- Schema Generation: Tools like Debezium's schema generator need to automatically extract configuration metadata from components to generate descriptors for UI applications.
- Configuration Validation: Build-time and runtime validation tools can uniformly discover and validate configurations across all component types.
- Documentation Generation: Automated documentation tools can consistently extract and present configuration options.
- Configuration Management UIs: Web interfaces for managing Kafka Connect deployments (like Debezium Platform) need to discover available components and their configuration requirements dynamically.
Public Interfaces
The idea is to introduce a new interface in the org.apache.kafka.connect.components package:
package org.apache.kafka.connect.components;
import org.apache.kafka.common.config.ConfigDef;
/**
* Interface for components that provide version and configuration specifications.
* This interface establishes a common contract for all Kafka Connect components
* that define a versin and expose configurable properties, enabling uniform discovery and introspection
* of component configurations.
*
* <p>Components implementing this interface declare their version and configuration requirements
* through a {@link ConfigDef} object, which describes the configuration properties
* including their names, types, default values, validators, and documentation.
*
*/
public interface ConnectPlugin extends Versioned {
/**
* Returns the configuration specification for this component.
*
* <p>The returned {@link ConfigDef} object describes all configuration properties
* that this component accepts, including their types, default values, validators,
* importance levels, and documentation strings.
*
* @return the configuration definition for this component; never null
*/
ConfigDef config();
}
The following existing interfaces will be modified to implement/extend ConnectorPlugin:
public abstract class Connector implements ConnectPlugin {
/**
* Define the configuration for the connector.
* @return The ConfigDef for this connector; may not be null.
*/
@Override
public abstract ConfigDef config();
}
public interface Converter extends Closeable, ConnectPlugin {
/**
* Configuration specification for this converter.
* @return the configuration specification; may not be null
*/
@Override
default ConfigDef config() { return new ConfigDef();}
/**
* Get the version of this component.
*
* @return the version, formatted as a String. The version may not be {@code null} or empty.
*/
@Override
default String version() {
return "undefined";
}
}
public interface HeaderConverter extends Configurable, Closeable, ConnectPlugin {
/**
* Configuration specification for this set of header converters.
* @return the configuration specification; may not be null
*/
@Override
ConfigDef config();
/**
* Get the version of this component.
*
* @return the version, formatted as a String. The version may not be {@code null} or empty.
*/
@Override
default String version() {
return "undefined";
}
}
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable, ConnectPlugin {
/** Configuration specification for this transformation.
*/
@Override
ConfigDef config();
/**
* Get the version of this component.
*
* @return the version, formatted as a String. The version may not be {@code null} or empty.
*/
@Override
default String version() {
return "undefined";
}
}
public interface Predicate<R extends ConnectRecord<R>> extends Configurable, AutoCloseable, ConnectPlugin {
/**
* Configuration specification for this predicate.
*
* @return the configuration definition for this predicate; never null
*/
@Override
ConfigDef config();
/**
* Get the version of this component.
*
* @return the version, formatted as a String. The version may not be {@code null} or empty.
*/
@Override
default String version() {
return "undefined";
}
}
Other interfaces like ConnectRestExtension and ConnectorClientConfigOverridePolicy, even if they currently don't declare a config() method, since they implement the Configurable, we can think of adding the ConnectPlugin for future use in this way
package org.apache.kafka.connect.connector.policy;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigValue;
import java.util.List;
/**
* An interface for enforcing a policy on overriding of Kafka client configs via the connector configs.
* <p>
* Common use cases are ability to provide principal per connector, <code>sasl.jaas.config</code>
* and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges.
* <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}.
* <p>
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the policy to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>connector.client.config.override.policy</code>, and <code>class</code> set to the
* ConnectorClientConfigOverridePolicy class name.
*/
public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable, ConnectPlugin {
/**
* Workers will invoke this before configuring per-connector Kafka admin, producer, and consumer client instances
* to validate if all the overridden client configurations are allowed per the policy implementation.
* This would also be invoked during the validation of connector configs via the REST API.
* <p>
* If there are any policy violations, the connector will not be started.
*
* @param connectorClientConfigRequest an instance of {@link ConnectorClientConfigRequest} that provides the configs
* to be overridden and its context; never {@code null}
* @return list of {@link ConfigValue} instances that describe each client configuration in the request and includes an
{@link ConfigValue#errorMessages() error} if the configuration is not allowed by the policy; never null
*/
List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest);
/**
* Configuration specification for this policy override.
*
* @return the configuration definition for this policy override; never null
*/
@Override
default ConfigDef config() { return new ConfigDef();}
}
package org.apache.kafka.connect.rest;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.health.ConnectClusterState;
import java.io.Closeable;
import java.util.Map;
/**
* A plugin interface to allow registration of new JAX-RS resources like Filters, REST endpoints, providers, etc. The implementations will
* be discovered using the standard Java {@link java.util.ServiceLoader} mechanism by Connect's plugin class loading mechanism.
*
* <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension}.
* <p>The extension class(es) must be packaged as a plugin, including the JARs of all dependencies except those
* already provided by the Connect framework.
*
* <p>To install into a Connect installation, add a directory named for the plugin and containing the plugin's JARs into a directory that is
* on Connect's {@code plugin.path}, and (re)start the Connect worker.
*
* <p>When the Connect worker process starts up, it will read its configuration and instantiate all of the REST extension implementation
* classes that are specified in the `rest.extension.classes` configuration property. Connect will then pass its configuration to each
* extension via the {@link Configurable#configure(Map)} method, and will then call {@link #register} with a provided context.
*
* <p>When the Connect worker shuts down, it will call the extension's {@link #close} method to allow the implementation to release all of
* its resources.
*
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the extension to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>rest.extension.classes</code>, and <code>class</code> set to the ConnectRestExtension class name.
*/
public interface ConnectRestExtension extends Configurable, Closeable, ConnectPlugin {
/**
* ConnectRestExtension implementations can register custom JAX-RS resources via this method. The Connect framework
* will invoke this method after registering the default Connect resources. If the implementations attempt
* to re-register any of the Connect resources, it will be ignored and will be logged.
*
* @param restPluginContext The context provides access to JAX-RS {@link jakarta.ws.rs.core.Configurable} and {@link
* ConnectClusterState}.The custom JAX-RS resources can be registered via the {@link
* ConnectRestExtensionContext#configurable()}
*/
void register(ConnectRestExtensionContext restPluginContext);
/**
* Configuration specification for this rest extension.
*
* @return the configuration definition for this rest extension; never null
*/
@Override
default ConfigDef config() { return new ConfigDef();}
}
Proposed Changes
Implementation Plan
1. Introduce ConnectPlugin Interface
- Add the new org.apache.kafka.connect.components.ConnectPlugin interface to the connect-api module
- Include a comprehensive JavaDoc explaining the purpose and usage
2. Update Existing Interfaces
- Modify Connector, Converter, Transformation, Predicate, ConnectorClientConfigOverridePolicy, and ConnectRestExtension to extendConnectPlugin
- Preserve all existing method signatures and semantics
Compatibility, Deprecation, and Migration Plan
Backward Compatibility
This change is fully backward compatible; existing plugins will work seamlessly with newer Connect runtimes:
- Source Compatibility: Existing component implementations are not required to change. They already implement the config() and version() methods, which satisfy the new
interface requirement. - Binary Compatibility: The change only adds a new interface to the type hierarchy. Existing compiled classes will continue to work without recompilation.
- Behavioral Compatibility: No changes to method signatures, return types, or semantics. All existing code will behave identically.
Forward Compatibility
Plugins compiled against newer versions of the Connect API (with ConnectorPlugin) will remain compatible with older Connect runtimes under normal usage
patterns:
When a plugin is compiled against the new API, its bytecode stores only direct interface relationships. For example, a class implementing Transformation
records implements Transformation in its bytecode, but does not record that Transformation extendsConnectorPlugin.
At runtime, the JVM resolves the complete interface hierarchy by loading interface classes from the runtime's classpath. This means:
- New plugin on old runtime: The JVM loads the plugin's class (which declares implements Transformation), then loads Transformation from the old runtime
(which doesn't extendConnectorPlugin). The hierarchy is resolved using the old runtime's interface definition, soConnectorPlugin is never referenced. - New plugin on new runtime: The same plugin bytecode loads Transformation from the new runtime (which extendsConnectorPlugin), and the full hierarchy
, includingConnectorPlugin, is available for discovery.
When Forward Compatibility Breaks
Forward compatibility is broken only when plugin code directly references theConnectPlugin type. Specifically:
Incompatible patterns (will fail on older runtimes):
// Field with ConnectPlugin type
private ConnectPlugin specifier;
// Method parameter with ConnectPlugin type
public MyConnector(ConnectorPlugin spec) { ... }
// Local variable with explicit ConnectPlugin type
ConnectPlugin spec = this;
// instanceof checks
if (obj instanceof ConnectPlugin) { ... }
// Explicit cast to ConnectPlugin
ConnectPlugin casted = (ConnectPlugin) obj;
// Explicitly implementing ConnectPlugin
public class MyPlugin implements Transformation, ConnectPlugin { ... }
Any of these patterns causes the ConnectPlugin class to be referenced in the plugin's bytecode, resulting in NoClassDefFoundError when loading on older runtimes that lack the ConnectPlugin class.
// Simply implement existing interfaces as before
public class MyTransformation implements Transformation<Record> {
@Override
public String version() {
return "1.0.0";
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
// ... other methods
}
This pattern never directly references ConnectPlugin in the plugin bytecode, allowing it to run on both old and new Connect runtimes.
Guidance for Plugin Developers
To maintain compatibility with older Connect runtimes while compiling against newer APIs:
- Do not explicitly implement ConnectPlugin in your plugin classes
- Do not use ConnectPlugin as a field, parameter, return, or local variable type
- Do continue implementing the standard component interfaces (Connector, Transformation, Converter, etc.) as you always have, the ConnectPlugin contract will be satisfied automatically through interface inheritance
In practice, this is unlikely to be an issue as there is no compelling reason for plugin code to directly reference ConnectPlugin; it exists purely to enable uniform component discovery by tooling and the Connect runtime.
Migration Path
For Component Developers:
- No changes required. Existing implementations already satisfy the new interface contract.
- Optional: Add @Override annotations for clarity and compile-time checking.
For Tool Developers:
- Can immediately start using ConnectPlugin to discover components uniformly.
- Existing type-specific discovery code continues to work and can be gradually migrated.
- The new interface provides an additive capability without breaking existing approaches.
Deprecation
No deprecation is necessary. The existing config() and version() methods in individual interfaces remain valid; they gain a common ancestor.
Test Plan
All current tests should be fine to detect any issues.
Rejected Alternatives
None