Current state: "Under Discussion"
Discussion thread: here
Whenever there is a need to introduce dependencies for handlers into a Kafka Streams project, additional steps used to be required to ensure this process. One possibility was to provide dependencies inside a Map for a configure method during application configuration process and another one was to create your own StreamsConfig class implementation with one method being overridden.
In Kafka 2.0.0, the latter option is marked as deprecated, which means it won’t be available in upcoming versions. This implies, that projects which have been using dependency injection for Kafka Streams handlers, in the future will be forced to use a Map for the configure method during application configuration process instead.
Currently, the introduction of dependencies for Kafka Streams handler proceeds the following way:
Registration of an exception handler class in Kafka Streams configuration
Kafka Streams invokes a default constructor and creates an object out of provided class using reflection
Kafka Streams passes dependency configuration Map to the new instance's configure method
Dependencies are retrieved from the Map and have to be casted to a particular dependency type
Therefore, if your exception handler needs some other dependency, you have to construct it ahead of time and insert into the Kafka Streams config Properties.
Afterwards, you need to retrieve it back in a configure method of your exception handler by extracting it from the Map and then casting it to an appropriate interface/class.
In addition, the newly introduced TopologyTestDriver is also affected. There is no straightforward, easy to maintain and developer-friendly possibility to benefit from dependency injection frameworks.
With respect to the mentioned above, developers experience major complication during testing and maintenance of Kafka Streams applications.
As a part of the proposed change, a deprecation annotation for three constructors in KafkaStreams class could be removed, in particular:
public KafkaStreams(final Topology topology, final StreamsConfig config)
public KafkaStreams(final Topology topology, final StreamsConfig config, final KafkaClientSupplier clientSupplier)
public KafkaStreams(final Topology topology, final StreamsConfig config, final Time time)
To enable easy testing with dependency injection frameworks (e.g., Spring), three additional constructors for the TopologyTestDriver class could be introduced:
public TopologyTestDriver(final Topology topology, final StreamsConfig config)
public TopologyTestDriver(final Topology topology, final StreamsConfig config, final long initialWallClockTimeMs)
private TopologyTestDriver(final InternalTopologyBuilder builder, final StreamsConfig config, final long initialWallClockTimeMs)
One possible option is to override a method inside the StreamsConfig class and replace a reflection-based creation of a handler class by the means of Spring dependency injection. Please consult example below:
This offers two main advantages:
- Spring can create dependencies for your beans. So that you don't need to construct and provide them inside a Kafka Streams configuration, as well as extract and cast it on handler's side.
- You are obtaining an automated control over new dependencies, introduced Kafka Streams handlers
One minor advantage is that your dependencies can be set into final fields.
The second option it to create an additional interface as indicated in the example below:
And then provide the implementation of ConfiguredStreamsFactory while creating StreamsConfig:
A default implementation could be provided as well, based on the current implementation:
Compatibility, Deprecation, and Migration Plan
In case the first option is chosen, it is necessary to remove deprecation from three KafkaStreams constructors described in "Public interfaces" section.
Alternatively, the second option can be applied. Although, the second option needs to be studied in detail.
None at this point of time.