Status

Current state: Adopted

Discussion thread: [DISCUSS] KIP-446: Add changelog topic configuration to KTable suppress

JIRA: KAFKA-8147 - Getting issue details... STATUS

Implemented: 2.6.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When suppressing record updates in a KTable, an internal changelog topic is created. The suppress operator sends a record to the topic when it is received from upstream, and sends a tombstone after it is emtited.  Thus, data in the buffer is much shorter lived than data in other KTables, and the buffer changelog also may be much more compactable than KTable changelogs. Currently the configuration for this topic cannot be set using the Streams DSL, instead a user has to run an external tool to override the defaults.

Other parts of the Streams API that create internal topics do allow for a user to set their configuration. When aggregating a stream, for example, an instance of Materialized is passed to the aggregate method which holds a topicConfig of type Map<String, String>.

Public Interfaces

We add two methods  to Suppressed.BufferConfig.

  • withLogginEnabled: (default) enables logging and allows for configuration of the changelog topic
  • withLoggingDisabled: disables logging to the changelog topic

When none of the above methods is called, the behavior will be no different from the current implementation, i.e., records will be written to a changelog topic with the default settings.

Public Interfaces

The public interface of BufferConfig will change to include the previously mentioned methods.

Methods to add to Suppressed.BufferConfig
public interface Suppressed<K> {

    interface BufferConfig<BC extends BufferConfig<BC>> {
        /**
         * Disable the changelog for store built by this {@link StoreBuilder}.
         * This will turn off fault-tolerance for your store.
         * By default the changelog is enabled.
         * @return this
         */
        BC withLoggingDisabled();

        /**
         * Indicates that a changelog topic should be created containing the currently suppressed 
         * records. Due to the short-lived nature of records in this topic it is likely more
         * compactable than changelog topics for KTables.
         *
         * @param config Configs that should be applied to the changelog. Note: Any unrecognized
         * configs will be ignored.
         * @return this
         */
        BC withLoggingEnabled(final Map<String, String> config);
    }
}


Compatibility, Deprecation, and Migration Plan

The impact on existing applications will be non-existant as this change does not remove or alter existing behavior.

Rejected Alternatives

N/A

  • No labels