Status

Current state: Accepted, Targeting 4.0.0

Discussion thread: here and here

Vote Thread: here

JIRA: here

Motivation

Kafka connect currently supports running a single version of most plugins, including connectors. As the external system that a connector supports evolves over time, the connectors may need to make incompatible changes between versions to keep up with new capabilities. Without the ability to run multiple versions of a connector, we would need to run the connectors in two different connect clusters if we want to support both version simultaneously. In addition, support for multiple connectors makes it possible to do two phase upgrades to connectors with the benefit of easier rollbacks. A connector runtime restart will still be required to install a newer version, however existing connectors can still continue to use the older version. The migration to the newer version is decoupled from the installation of the connector and will not require a cluster restart. The decoupling also makes rollbacks easier and will no longer require a cluster rollback. Finally, multiple version eases development toil while making changes to a connector. 

Public Interfaces

Configuration

In addition to the existing "plugin class name" configurations accepted in Connector configurations ("connector.class", "key.converter", "value.converter", "header.converter", "transforms.<alias>.type", and "predicates.<alias>.type") Connect will accept the following new configurations:

  • connector.plugin.version
  • key.converter.plugin.version
  • value.converter.plugin.version
  • header.converter.plugin.version
  • transforms.<alias>.plugin.version
  • predicates.<alias>.plugin.version

Additionally, the WorkerConfig will accept the following configurations, with similar semantics:

  • key.converter.plugin.version
  • value.converter.plugin.version
  • header.converter.plugin.version

The values of these configurations will control which version of the corresponding plugin class is used for each instance. Values of this configuration take the form of Maven Range Specifications, which allow specifying soft version requirements, hard version requirements for particular versions, and complex ranges of versions. For example:

{
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "connector.plugin.version": "3.8", // Use exactly version 3.8, fail the connector/task if it doesn't exist on that worker.
    // "key.converter" is not specified; default to whatever is included in the WorkerConfig
    "key.converter.plugin.version": "(,4.0)" This value is silently ignored, because the "key.converter" is specified by the worker config.
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.plugin.version": "[3.5,)", // Require a version with support for replace.null.with.default
    "value.converter.replace.null.with.default": "false",
    "header.converter": "SimpleHeaderConverter",
    "header.converter.plugin.version": "[2.3.2,2.4),[2.4.2,2.5),[2.5.2,2.6),[2.6.1,)", // Make sure we don't hit that infinite loop bug fixed in KAFKA-10574, but keep this plugin up-to-date with the latest version.
    "transforms": "flatten-latest,flatten-old",
    "transforms.flatten-latest.type": "Flatten$Key",
    // "transforms.flatten-latest.plugin.version" is not specified; allow the latest version to be provided by the runtime.
    "transforms.flatten-old.type": "Flatten$Key",
    "transforms.flatten-old.plugin.version": "3.8.0", // Use exactly version 3.8.0, this will fail connector creation if the transformation version is not found
    "predicates": "tombstone",
    "predicates.tombstone.type": "RecordIsTombstone"
    // "predicates.tombstone.plugin.version" is not specified; allow the latest version to be used because AK is never going to break this one :)
}

Note: There is a slight deviation from the Maven Range Spec, which says, an unclosed version number (such as 3.8 as opposed to [ 3.8 ]) is determined to be a soft requirement. We have instead opted for it to be a hard requirement (i.e. 3.8 is the same as [ 3.8 ]) for ease of use, for anyone not familiar with maven versioning and just want to use this feature to denote a specific version while running their connector. 

Users can change any single version configuration at a time, and the runtime will be able to respond with the requested version. Two instances (two different connectors, two different transformations, etc) can each be loaded with a different version in parallel, without any conflicts.

If there is a functional conflict or version dependency between two components (e.g. FileStreamSinkConnector 3.8 requires Flatten$Key 3.8) their versions can be changed together.

REST API

GET /connector-plugins

This existing endpoint allows the user to find all plugins installed on the queried worker. The parameters and structure of the returned data will remain unchanged. Multiple versions of the same plugin will appear as distinct plugins, but with different versions. These versions will be sorted in increasing order, with the default version always appearing last.

[    ...
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "3.0.0"
    },    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "3.8.0"    },   
    ...
]


GET /connector-plugins/{plugin-name}/config

This existing endpoint allows the user to retrieve the configurations of various plugins.

This endpoint will have a new version query parameter which accepts an exact version string. The version is added as a query parameter, for example - /connector-plugins/FileStreamSinkConnector/config?version=3.8.0

The structure of the returned data will be unchanged.

GET /connectors/{connector-name}/status

This existing endpoint includes information about running connectors and tasks. It will be changed to include version information for tasks and connectors. This will aid in easy discovery and diagnosis for connectors, tasks and the versions they are running. 

{
    "name": "file-sink-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "fakehost:8083"
		"version": "1.1.1"
    },
    "tasks":
    [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "fakehost:8083"
			"version": "1.1.1"
        },
        {
            "id": 1,
            "state": "FAILED",
            "worker_id": "fakehost:8083",
            "trace": "org.apache.kafka.common.errors.RecordTooLargeException\n"
			"version": "1.1.1"
        }
    ]
}

GET /connectors/{connector-name}/tasks/{task-id}/status

Similar to the overall status endpoint, the per-task endpoint will be changed include the running version. For example:

{
    "id": 0,
    "state": "RUNNING",
    "worker_id": "fakehost:8083"
    "version": "1.1.1"
}, 


Metrics

New constant-value metrics will be added for monitoring the actual distribution of plugin versions, similar to the existing metrics for the connector. Under the existing group kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}" 

Metric NameDescription
connector-class

The name of the connector class.

connector-typeThe type of the connector. One of 'source' or 'sink'.
connector-versionThe version of the connector class, as reported by the connector.
key-converter-classThe fully qualified class name from key.converter
key-converter-versionThe version instantiated for key.converter, maybe undefined
value-converter-classThe fully qualified class name from value.converter
value-converter-versionThe version instantiated for value.converter, maybe undefined
header-converter-classThe fully qualified class name from header.converter
header-converter-versionThe version instantiated for header.converter, maybe undefined

Additionally, for each transformation, in a new group: kafka.connect:type=connector-transform-metrics,connector="{connector}",task="{task}",transform="{alias}"

Metric NameDescription
transform-classThe name of the transformation class
transform-versionThe version of the transformation class

Additionally, for each predicate, in a new group: kafka.connect:type=connector-predicate-metrics,connector="{connector}",task="{task}",predicate="{alias}"

Metric NameDescription
predicate-classThe name of the predicate class
predicate-versionThe version of the predicate class


Proposed Changes

Worker Startup

Because the WorkerConfig accepts .version properties, it is possible that these properties are invalid during startup. If these configurations are invalid, worker startup will proceed normally, ignoring those errors. Connectors which inherit these configurations may fail instead, for configurations they did not specify, but affect their behavior.

Choosing Versions when instantiating plugins

Currently if a plugin needs to be instantiated and multiple versions are present, the plugins are sorted according to the Maven Version Order, and the latest overall version is used. Plugins without a version (that do not implement the Versioned interface, or fail to return a valid version) are considered earlier versions than all implementations that return valid versions. Plugins for which Versioned is optional (Converter, HeaderConverter, Transformation, and Predicate) are encouraged to implement the Versioned interface, which has been present since AK 2.0.0.

  1. If a .version property is left unset, the behavior is the same as the existing behavior, and the latest overall version is used. This is in order to be backwards compatible, and allow non-versioned configurations to continue following the latest versions without any change.
  2. If a .version property contains a soft requirement, and that exact version is installed, that version will be used. If the soft version requirement isn't found, then use the latest overall version, as if the soft requirement wasn't specified.
  3. If a .version property contains a hard requirement, select the latest installed version which satisfies the requirement. If no installed version matches the hard version requirement, fail the connector or task.

The ".version" configurations will be stripped from the configuration before configuring the corresponding plugin. This is because the ".version" key is within the configuration namespace assigned to the plugins. This will impact all plugins that already accept "version" as a configuration key.

This configuration is re-evaluated each time the connector or task are assigned to a new worker, so a configuration specifying a range of versions may run on multiple versions simultaneously across different workers if workers have differing plugin installations. Scheduling and rebalances will be version-unaware, so a connector may be scheduled to a worker without any included versions present and may become FAILED. (see Rejected Alternatives)

Validation

The Validation endpoint returns ConfigInfos containing errors, default values, and recommended values for properties. Errors when finding plugins will be attributed as follows:

  1. If the .version property is left empty, or if there are no installed plugins with a matching name, errors will only be attributed to the plugin class property, not to the .version  property.
  2. If a plugin class name is valid (at least one version is installed) but the non-empty .version property doesn't include any of the installed versions, then the error will be attributed to both the .version  property and the plugin class property.

The plugin class name will include recommended values for plugin names when possible:

  • The connector.class has some legacy behavior which prevents this, see the Rejected Alternatives.
  • This is already implemented for transformations and predicates.
  • New recommenders will be added for "key.converter", "value.converter", and "header.converter" which recommend all of the available classes.

And each .version config will have dynamically generated defaults and recommended values:

  • The default value which is the latest overall installed version, or null if the plugin class name is invalid. This is just the bare version, without any version range syntax (e.g. 1.0.0 , not [1.0.0,) or [1.0.0] ) and would be interpreted as a soft requirement if made an explicit value.
  • The recommended values, will include all installed versions, sorted with the latest version first, or empty if the plugin class name is invalid.

Users can use these recommenders to discover the valid plugin classes and versions, without requiring an earlier call to GET /connector-plugins?connectorsOnly=false. Users can also read the defaultValues and insert them explicitly into their configuration as hard requirements in order to pin the exact version for stability through an upgrade.

Best Practices for Operators

We will augment the existing documentation to include some of the following advice for operators:

Plugin loading isolation introduced in KIP-146 - Classloading Isolation in Connect uses a different classloader for each plugin listed under the plugin path. The plugin loading mechanism identifies each subdirectory under the plugins.path  directory listing as a separate plugin. Any connector jars and associated dependencies under the subdirectory are isolated from other connector and dependencies.

Operators must install distinct versions of each plugin in distinct sub-directories of the plugin.path. For example, consider plugin.path=/opt/plugins/blue,/opt/plugins/green is specified in the worker config, and the directories are laid out like this:

opt/
  plugins/
    blue/
      foo-connector-1.8/
        foo-connector-1.8.jar 
        foo-dependencies-1.0.jar
      foo-connector-1.9/
        foo-connector-1.9.jar 
        foo-dependencies-1.1.jar
      bar-connector/
        bar-connector-1.0.jar
    green
      foo-connector-2.0/
        foo-connector-2.0.jar
      bar-connector/
        bar-connector-1.1.jar // BAD, not isolated from 1.2 version
        bar-connector-1.2.jar // BAD, not isolated from 1.1 version 

The foo connector is installed correctly, and versions 1.8, 1.9, and 2.0 are all available for use in the REST API. If a worker is restarted to remove /opt/plugins/blue, versions 1.8 and 1.9 will no longer be available. Plugins with hard requirements (,2.0) or similar will fail on this worker after this upgrade.

Operators can verify the safety of such an upgrade by inspecting the version metrics exported by the worker. This allows for operators to manage their users' upgrades, while not inspecting logs or handling actual user configurations.

The bar connector is not installed correctly, and the behavior is undefined. The bar-connector/  directory is considered a single plugin, and having two versions of the same class present within this directory cause conflicts. The same subdirectory can be present in a different path ( /opt/plugins/blue ) without conflict with the first path.

It is also recommended to use plugin.discovery and the connect_plugin_path script when managing a very large plugin.path, as the additional installed plugin versions will slow down the legacy plugin discovery mechanisms.

Heterogenous configurations where a connect cluster is running with different configuration for its set of plugins, in different nodes, is highly discouraged. Under current scenario this can lead to inconsistent versions of the connector and task running in different node. With this feature, no explicit failures with existing deployments should surface, however connectors and tasks might have failures down the line when version requirements are introduced in the configs. 

Compatibility, Deprecation, and Migration Plan

Any existing connector without version information in the configuration will continue to default to using the latest version available in the plugins path. Any cluster updates with a later version of the connector installed will result in the connector also getting updated to the latest version. This preserves the existing behaviour. 

A connector can be updated to assign it a particular version, and beyond this point the connector will always run with the assigned version (provided the version is present) even across cluster upgrades. However cluster downgrades to older connector runtimes will end up using the older version of the connector. Since the assignment of version to a connector needs to be explicitly opted in by the user through a connector update, this limitation is easily evident. 

Test Plan

This feature will be unit tested by making use of the existing TestPlugins infrastructure, with new plugins with specified versions.

Rejected Alternatives

Specify versions for all pluggable interfaces

We are not adding the following configurations:

  • connector.client.config.override.policy.version
  • rest.extension.classes.<index>.version
  • config.providers.<alias>.version

Because these plugins are only initialized once at worker startup, it doesn't make sense to support multiple versions of these concurrently. These plugins are also not commonly packaged with other plugins (connectors, transforms, etc) so operators can intentionally choose to install only one copy of the Override Policy, Rest Extension plugins and config provider plugins. When multiple copies of one of these plugins is installed, the current behavior of choosing the latest version will be preserved.

Use exact versions only, with no ranges

If we did not accept version ranges in the initial version, it would be difficult to retroactively add later.

Ranges themselves are very useful, as they allow for demos and documentation to contain sanity checks, while not requiring the reader install particular versions. There are times where a user may know ahead-of-time that they want to auto-upgrade to the latest minor/patch version, while manually upgrading to the next major version. If we only supported exact versions, users would need to be involved for every  version upgrade, not just major ones.

Ranges can also be utilized to ensure that bugs cannot regress by an installed plugin being silently downgraded. If a bug in a particular version is noticed, that version can be excluded by the version configuration, and a downgrade to that version would immediately cause the connector to become FAILED.

Using an alternative version sorting/preference scheme

The Maven version ordering is already included in Connect, but is only commonly exercised for conflict resolution. Changing to a different version parsing mechanism has a risk of breaking backwards compatibility for existing conflict resolution.

Adding new REST API endpoints

Instead of accepting version via a query parameter in GET /connector-plugins/{plugin-name}/config?version={connector-version}, we could add new endpoints:

  • GET /connector-plugins/{plugin-name}  which is like GET /connector-plugins  but returns PluginDescs that are filtered by plugin name
  • GET /connector-plugins/{plugin-name}/{plugin-version}/config  which fills the role of GET /connector-plugins/{plugin-name}/config  but for a specific plugin version

Because it is sometimes desirable to auto-upgrade plugins to the latest version, it isn't reasonable to deprecate or remove the old endpoint. Rather than duplicate the endpoint, it makes sense to keep the existing endpoint and add a query parameter. 

Accepting a version range instead of exact version in /config 

HTTP query parameters must be http encoded with percent notation for any special characters, such as those used in the version range specification: ()[],. Users would not be able to trivially copy a range specified in a configuration into the REST API, as it would be encoded incorrectly, so it does not make sense to align this endpoint with the endpoints which accept full versions.

By accepting full version identifiers, we align this endpoint with the GET /connector-plugins endpoint which returns plain version strings which can be immediately used in a follow-up query for the specific version.

Provide recommenders and inline errors for connector.class

Here is the legacy behavior, and how it would need to change to provide better feedback:

  • Currently validating an empty connector.class has status 400, and an invalid connector.class has status 500. These cases would need to be changed to 200 OK, in order to provide recommended values, and errors in the returned ConfigInfos.
  • Currently POST or PUT an empty connector.class has status 400. The return code would stay the same, but the error message will change from "Connector config {} contains no connector type" to "Connector configuration is invalid and contains the following N errors" like other validation errors.
  • Currently POST or PUT an invalid connector.class has status 500. This would be unconditionally changed to status 400, and the error message will change from "Failed to find any class that implements Connector and which name matches ..." to "Connector configuration is invalid and contains the following N errors" like other validation errors.

These changes are backwards-incompatible, and it is unclear how many clients of the REST API would handle the change gracefully. Rather than increase the scope of this KIP, this can be addressed separately and the connector.class/connector.plugin.version validation and recommendation can improve with it.

Currently tracked via KAFKA-13756 - Getting issue details... STATUS

Provide version deprecation and removal mechanisms

This feature provides the mechanisms to allow some connectors to stay on older versions while still deploying newer versions, which also permits some connectors to lag significantly behind the latest version. This is harmful for the operators, as they must support many different versions of the same plugins, and potentially very old plugins.

It would be natural for Connect to provide some mechanism for operators maintaining the plugin.path to mark certain versions as deprecated, and have those deprecations propagated to the REST API, and ultimately users configuring connectors. We may choose to add this in the future, but should not consider it a requirement for first allowing multi-version plugins.

In the intervening time, operators can continue to communicate with their operators via other means to discuss new version availability and old version deprecation and removal.

Not adding diagnostic Metrics

The metrics feature is critical for allowing operators to deprecate and remove versions from their workers in a multi-version cluster, and provides convenient monitoring as a side-effect. Without the described metrics, an operator would need to inspect the connector configurations or startup logs in order to determine if a particular version is in use, both of which aren't reliable, and could mistake a plugin as not in-use. If the operator removed an in-use plugin, connectors could incur downtime.

Add version-aware scheduling, and intentional heterogeneous installation support

This is a rather complex feature, and would require adding a new connect.protocol and rebalance algorithm. This is certainly possible, but shouldn't be a prerequisite for multi-version plugins.


  • No labels