Current state: Under Discussion
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This KIP is an extension of KIP-883: Add isDeleted flag when stopping a connector, and focuses on providing a flag indicating that a connector has been deleted when the task is stopped. This flag can then be used by the Task implementation to perform any additional cleanup steps if needed. This KIP is related to (and supersedes) KIP-419: Safely notify Kafka Connect SourceTask is stopped.
As connectors interact with external systems, they sometimes need to provision external resources. Imagine a Sink connector that creates new queues in a Messaging System before writing messages to them, or a Source connector that activates an account before polling a source system, or any other use cases that requires extra setup. A more concrete example (and one that concerns us in particular) is a source connector that audits database changes by creating an "audit" table and setting up database triggers to capture database row-level updates into the audit table.
In cases like these we might want to cleanup the resources when the connector that provisioned them is deleted. There can be many reasons why cleanup is desirable: be it to save costs (e.g. the external system charges per active account) compute resources (triggers writing to an audit table that will no longer be polled should be removed, and so does the audit table), and many others.
Add an overload to the
void stop(boolean connectorDeleted) method to the Task public API, with a default implementation that calls the existing
void stop() method. This new method can then be overridden by connectors that need to take any additional steps as part of the deletion process.
Both StandaloneHerder and DistributedHerder invoke methods from the
Worker class to start/stop the tasks.
StandaloneHerder, the tasks will be stopped with the
connectorDeleted flag set to true as part of the
StandaloneHerder#deleteConnectorConfig(...) method. In the case of the
connectorDeleted flag will be computed during the
RebalanceListener#onRevoked(...) callback, by checking if the tasks being revoked are for a connector that has been deleted (the connector configuration no longer exists in the ClusterConfigState store).
Compatibility, Deprecation, and Migration Plan
The proposed change is fully backward-compatible with existing Kafka Connect releases. The new method added to the public interface includes an default implementation of the new method, so existing connectors don't need to override it if not needed.
Integration tests will be added to make sure that the new flag is used when stopping a task for a deleted connector.
Delete provisioned resources out-of-band
In theory, users can monitor Kakfa Connect configuration topics to determine if/when a connector has been deleted. Reacting to this event outside of the connector's context is probably not very useful, as there might not be enough contextual information to perform any meaningful action. There is some value on keeping these concerns encapsulated within the connector framework itself.