Current state: released in 2.0
Discussion thread: here
JIRA: JIRA 6657
Kafka Streams allows to pass in different configs for different clients by prefixing the corresponding parameter with `producer.` or `consumer.`.
However, Kafka Streams internally uses multiple consumers, (1) the main consumer (2) the restore consumer and (3) the global consumer
For some use cases, it's required to set different configs for different consumers. Thus, we should add three new prefixes for main, restore and global consumer.
We might also consider to extend `KafkaClientSupplier` and add a `getGlobalConsumer()` method.
The changes affected classes including StreamsConfig and KafkaClientSupplier. Existing APIs look like below:
Currently, user could use function consumerPrefix() to add specific config for stream consumers. There are no way to differentiate configuration for main consumer, restore consumer and global consumer, which may have different behavior from base settings.
We first add APIs in the StreamsConfig class to generate prefix for main consumer, restore consumer and global consumer, and functions to retrieve them as override configs:
And add a new public API in the KafkaClientSupplier class to get global consumer during init.
By deprecating the getConsumerConfigs function (from now user should use getMainConsumerConfigs instead), rewriting the getRestoreConsumerConfigs function and adding the getGlobalConsumerConfigs function, if one user uses restoreConsumerPrefix or globalConsumerPrefix when adding new configurations, the configs shall overwrite base consumer config. If one just wants to change main consumer behavior without actually affecting other consumers, using mainConsumerPrefix would make sure the change only apply to main consumer. If not specified, main consumer, restore consumer and global consumer shall share the same config with base consumer.
if user writes:
During initialization, consumers would get:
|main consumer||100||Target assignment with "main.consumer" prefix|
|restore consumer||50||Get override config 50 by prefixing "restore-consumer"|
|global consumer||5||Since no "global.consumer" prefix is used, default config will be applied.|
Compatibility, Deprecation, and Migration Plan
There is no backward compatibility issue as we are not deprecating any public API, and the underlying change should be transparent to the user.