You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread: [DISCUSS] KIP-446: Add changelog topic configuration to KTable suppress

JIRA: KAFKA-8147 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When suppressing record updates in a KTable, an internal changelog topic is created. The configuration for this topic cannot be set using the Streams DSL, instead a user has to run an external tool to override the defaults.

Other parts of the Streams API that create internal topics do allow for a user to set their configuration. When aggregating a stream, for example, an instance of Materialized is passed to the aggregate method which holds a topicConfig of type Map<String, String>

Public Interfaces

We add two methods  to Suppressed.BufferConfig.

  • withLogginEnabled: (default) enables logging and allows for configuration of the changelog topic
  • withLoggingDisabled: disables logging to the changelog topic

When none of the above methods is called, the behavior will be no different from the current implementation, i.e., records will be written to a changelog topic with the default settings.

Public Interfaces

The public interface of BufferConfig will change to include the previously mentioned methods.

Methods to add to Suppressed.BufferConfig
public interface Suppressed<K> {

    interface BufferConfig<BC extends BufferConfig<BC>> {
        /**
         * Disable change logging for the suppressed KTable
         */
        BC withLoggingDisabled();

        /**
         * Indicates that a changelog should be created for the suppressed KTable.
         * The changelog will be created with the provided configs.
         *
         * @param config Configs that should be applied to the changelog. Note: Any unrecognized
         * configs will be ignored.
         * @return itself
         */
        BC withLoggingEnabled(final Map<String, String> config);
    }
}


Compatibility, Deprecation, and Migration Plan

The impact on existing applications will be non-existant as this change does not remove or alter existing behavior.

Rejected Alternatives

N/A

  • No labels