Table of Contents |
---|
Status
Current state: Under discussionAccepted Vote
Discussion thread: https://lists.apache.org/thread.html/a5ee2d569448dd57647dca93f288405c13785dec9cb4a0cbddfedd35@%3Cdev.kafka.apache.org%3E
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Motivation
For users who cannot read their source topic as a changelog stream from the beginning we need to provide a way for event streams to be translated into changelog stream. As pointed out by Guozhang Wang this should not be confused with KStream.reduce(), these functions should:
- completely change the semantics of the input stream from event stream to changelog stream any null-values will still be serialized, and if the resulted bytes are also null they will be interpreted as "deletes" to the materialized KTable (i.e. tombstones in the changelog stream).
- Materialization of the KTable will follow the usual process:
- if Optimization is turned off, the KTable will always be materialized (but the store will not be queryable)
- if Optimization is turned on and if .toTable() is used, the Ktable may or may not be materialized. The store still cannot be queried.
- if .toTable(final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) is used irrespective of the optimization strategy used the KTable will be materialized and will be queryable
- .
Public Interfaces
Two functions will need to be added to the KStream interface:
Code Block | ||
---|---|---|
| ||
<VR> //java KTable<K,V> toTable() KTable<K,VR>V> toTable(Named) <VR> KTable<K,VR>KTable<K,V> toTable(final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) KTable<K,V> toTable(Named, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) //scala def toTable: KTable[K, V] def toTable(materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] |
Proposed Changes
Adding two new functions to the Kstreams DSL.
- When naming the processor & state stores the same naming convention needs to be followed as specified in KIP-307: Allow to define custom processor names with KStreams DSL
Compatibility, Deprecation, and Migration Plan
...