Current state: Accepted
Discussion thread: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, all used output topics must be know beforehand, and thus, it's not possible to send output records to topic in a dynamic fashion.
By allowing users to dynamically choose which topic to send to at runtime based on each record's key value pairs, Streams can help removing the burden of having to update and restart KStreams applications when routing decisions are changed.
I propose adding the dynamic routing functionality at both the Topology and the DSL layer. More specifically, the following new APIs will be added:
Also I propose to modify StreamPartitioner#partition to include the topic name, since with dynamic routing the topic name would not always be pre-known anymore:
With the newly added APIs, on the topology's sink node, for each received record we will apply the extractor to get the topic name to send; if the topic that is sending to does not exist, Streams will fall into the normal fail-fast scenario, in which the metadata refresh will exhaust retries and a fatal RequestTimeout will be thrown.
Compatibility, Deprecation, and Migration Plan
- This KIP should not require changes unless the users function calls may result in ambiguities of overloaded functions (think: KStream.to(null)).
We have thought about also add the functionality to automatically create destination topics if they don't exist while being sent to. The reason to not include this in the KIP is the following:
- There are security/operational risks where a rogue KStreams app might create unneeded/wrong/... topics. (One potential way to address this risk is to introduce a new quota feature for e.g. how many topics could be created by an application in order to control collateral damage of a rogue app); note current ACL may not be sufficient to secure the destination Kafka cluster of the Streams application since it only allows either a single wildcard or a full topic name.
- Even if the topics can be auto-created, downstream applications would still need to become aware of those topics. Because, by definition, (some of) the topics wouldn't exist at the time the downstream applications have been started themselves.
We have also considered adding a `DynamicStreamPartitioner` to allow topic names in addition to `StreamPartitioner` to not break compatibility. I've tried it out the implementation, and realized that DynamicStreamPartitioner needs to implement two interface functions, with and without the topic name if it is extending from StreamPartitioner; we could also let it to not extend from StreamPartition so it has one function only but then we'd need Produced to have two functions allowing StreamPartitioner and DynamicStreamPartitioner. Thinking about the pros and cons I'm think it may be better to just change the interface of StreamPartitioner itself, since even without dynamic routing, allowing the topic name could let users to give one partitioner implementation that branch on the topic names other than having one partitioner per topic.