Status

Current state: "Accepted"

Discussion thread: here 

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams has a set of configs with `default.` prefix. The intent for the default-prefix is to make a distinction between, well the default, and in-place overwrites in the code. Eg, users can specify ts-extractors on a per-topic basis.

However, for the deserialization- and production-exception handlers, no such overwrites are possible, and thus, `default.` does not really make sense, because there is just one handler overall. Via KIP-1033 we added a new processing-exception handler w/o a default-prefix, too.

Thus, we should consider to deprecate the two existing configs names and add them back w/o the `default.` prefix.

Public Interfaces

Target release 4.0

Changes to org.apache.kafka.streams.StreamsConfig

  1. The below configs in org.apache.kafka.streams.StreamsConfig will be deprecated
  • default.deserialization.exception.handler
  • default.production.exception.handler

      2. Configs to be added to org.apache.kafka.streams.StreamsConfig

  • deserialization.exception.handler
  • production.exception.handler

      3. Adding new public method to get the new config for deserialization exception handler deserializationExceptionHandler()

      4. Adding new public method to get the new config for production exception handler productionExceptionHandler()

      5. Marking defaultDeserializationExceptionHandler() as deprecated

Proposed Changes

As such there are no functional changes at all. The main changes look like below.

         

Main changes
		@Deprecated
        public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
        
        @Deprecated
        public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";

        /** {@code default.production.exception.handler} */
        @SuppressWarnings("WeakerAccess")
        @Deprecated
        public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
        
        private static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";

         /** {@code default.production.exception.handler} */
        @SuppressWarnings("WeakerAccess") 
        public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler";

        /** {@code default.production.exception.handler} */
        @SuppressWarnings("WeakerAccess")
        public static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "production.exception.handler";

The below changes arise when we deprecate above 2 configs like above.

  1. Deprecate the below two configs in org.apache.kafka.streams.StreamsConfig
    1. default.deserialization.exception.handler
    2. default.production.exception.handler
    3. and deprecate DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
  2. Configs to be added to org.apache.kafka.streams.StreamsConfig
    1. deserialization.exception.handler
    2. production.exception.handler
    3. and add DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
  3. Update all the relevant dependency classes, so that both old and new configs are supported
    1. Update ConfigDef in TopologyConfig and code blocks like isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) with new configs, so that code behaves the same for both the old and new configs
    2. RecordDeserializer update handleDeserializationFailure method
  4. Mark the tests depending on old configs as deprecated and introduce new tests in the below
    1. InternalTopologyBuilderTest
    2. StreamTaskTest
    3. StreamThreadTest
  5. If user sets both old and new configs, new config should be considered. And always look for new config, and if it's not available, fallback to old one.

Compatibility, Deprecation, and Migration Plan

The below configs in StreamsConfig will be marked for deprecation, and new configs will be introduced, and in later further releases, they could be removed

  • default.deserialization.exception.handler
  • default.production.exception.handler
  • DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC

Basically we have to make sure that after deprecation, the functionality for StreamsConfig, TopologyConfig and RecordDeserializer remains the same and should behave exactly same for old and new configs.

But if user sets both old and new configs, like in other patterns, new config should be considered. And always look for new config, and if it's not available, fallback to old one.

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

The test plan for this change includes the following making sure functionality for StreamsConfig, TopologyCongfig and RecordDeserializer remains the same and should behave exactly same for old and new configs.

  • Deprecate tests which are using old configs
  • Add new tests (copy) for newly introduced configs

Rejected Alternatives

None

  • No labels