Status
Current state: Under Discussion
Discussion thread: here
Old Discussion thread: here
Voting thread: here
JIRA: - KAFKA-10369Getting issue details... STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka-streams applications usually consumes from topics which are written with at-least-once semantics. In companies where Kafka is the backbone platform, an application maintained by team A may read topics that are written by team B. While team A may expect that the topic is written with exactly-once semantics, team B may not always fulfill that requirement. Thus, team A is forced to add a custom processor where they deduplicate data. This can result in having to write a custom deduplication processor for every external topic, and for every Kafka-streams application.
Public Interfaces
To save applications from having to write a deduplication processor each time, we introduce a new deduplication api that does the job for them.
public interface KStream<K, V> { /** * Filter out duplicates from this stream based on record's key, within the provided time interval. * After receiving a non-duplicate record, any record with the same key that is received within the provided deduplicationInterval * will be discarded. After deduplicationInterval has elapsed since a non-duplicate record is received, a new record having * the same key is considered a non-duplicate, and is forwarded to the resulting {@link KStream}. * Records with a {@code null} key are always forwarded to the resulting {@link KStream}, no deduplication is performed on them. * <p> * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * <name> is an internally generated name, and "-repartition" is a fixed suffix. * * @param deduplicationInterval the duration within which subsequent duplicates of a record will be discarded * @return a KStream that contains the same records of this KStream without duplicates */ KStream<K, V> deduplicateByKey(final Duration deduplicationInterval); KStream<K, V> deduplicateByKey(final Duration deduplicationInterval, final Deduplicated<K, V> deduplicated); /** * Filter out duplicates from this stream based on record's key and the provided {@link KeyValueMapper}, within the provided time interval. * The provided {@link KeyValueMapper} maps a record to an id. * For two records to be duplicate, they must have the same key and the same id. * <p> * Example of usage. * <pre>{@code * KStream<String, Object> inputStream = builder.stream("topic"); * * KStream<String, Object> outputStream = inputStream.deduplicate(new KeyValueMapper<String, Object, String>() { * String apply(String key, Object value) { * return value.id; * } * }, Duration.ofSeconds(60)); * }</pre> * </p> * After receiving a non-duplicate record, any duplicate to it that is received within the provided deduplicationInterval * will be discarded. After deduplicationInterval has elapsed since a non-duplicate record is received, a new record having * the same (key, id) is considered a non-duplicate, and is forwarded to the resulting {@link KStream}. * Records with a {@code null} key OR a {@code null} id are always forwarded to the resulting {@link KStream}, no deduplication is performed on them. * <p> * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * <name> is an internally generated name, and "-repartition" is a fixed suffix. * * @param idSelector a {@link KeyValueMapper} that returns the unique id of the record * @param deduplicationInterval the duration within which subsequent duplicates of a record will be discarded * @param <KR> the type of the deduplication id * @return a KStream that contains the same records of this KStream without duplicates */ <KR> KStream<K, V> deduplicateByKeyValue(final KeyValueMapper<? super K, ? super V, ? extends KR> idSelector, final Duration deduplicationInterval); <KR> KStream<K, V> deduplicateByKeyValue(final KeyValueMapper<? super K, ? super V, ? extends KR> idSelector, final Duration deduplicationInterval, final Deduplicated<KR, V> deduplicated); /** * Filter out duplicates from this stream based on the provided {@link KeyValueMapper}, within the provided time interval. * The provided {@link KeyValueMapper} maps a record to its deduplication id. * This processor causes repartitioning in order to get records of the same deduplication id in the same partition. * In case you only need to deduplicate records within a partition, consider using {@link #deduplicateByKeyValue(KeyValueMapper, Duration)}. * For deduplication over all partitions, an alternative way to this processor is to manually repartition the * stream such that the new key corresponds to the deduplication key and then use {@link #deduplicateByKey(Duration)}. This * latter alternative is better when it is fine to keep the deduplication key in the resulting stream's key. * <p> * In the example below, we deduplicate on an id field within the record's value * <pre>{@code * KStream<String, Object> inputStream = builder.stream("topic"); * KStream<String, Object> outputStream = inputStream.deduplicate(new KeyValueMapper<String, Object, String>() { * String apply(String key, Object value) { * return value.id; * } * }, Duration.ofSeconds(60)); * }</pre> * </p> * After receiving a non-duplicate record, any duplicate to it that is received within the provided deduplicationInterval * will be discarded. After deduplicationInterval has elapsed since a non-duplicate record is received, a new record having * the same id is considered a non-duplicate, and is forwarded to the resulting {@link KStream}. * Records with a {@code null} id are always forwarded to the resulting {@link KStream}, no deduplication is performed on them. * * @param idSelector a {@link KeyValueMapper} that returns the unique id of the record * @param deduplicationInterval the duration within which subsequent duplicates of a record will be discarded * @param <KR> the type of deduplication id * @return a KStream that contains the same records of this KStream without duplicates */ <KR> KStream<K, V> deduplicate(final KeyValueMapper<? super K, ? super V, ? extends KR> idSelector, final Duration deduplicationInterval); <KR> KStream<K, V> deduplicate(final KeyValueMapper<? super K, ? super V, ? extends KR> idSelector, final Duration deduplicationInterval, final Deduplicated<KR, V> deduplicated); }
public class Deduplicated<K, V> implements NamedOperation<Deduplicated<K, V>> { protected final String name; protected final String storeName; protected final Serde<K> keySerde; protected final Serde<V> valueSerde; public static <K, V> Deduplicated<K, V> as(final String name); public static <K> Deduplicated<K> keySerde(final Serde<K> keySerde); public static <V> Deduplicated<V> valueSerde(final Serde<V> valueSerde); public static <K, V> Deduplicated<K, V> with(final Serde<V> storeName, final Serde<K> keySerde, final Serde<V> valueSerde); public static <K, V> Deduplicated<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde); public Deduplicated<K, V> withName(final String name); public Deduplicated<K, V> withKeySerde(final Serde<K> keySerde); public Deduplicated<K, V> withValueSerde(final Serde<V> valueSerde); }
The parameter deduplicationInterval allows to:
- control the time interval within which duplicates of a record are discarded
- limit storage by preventing the state store from growing indefinitely
Proposed changes
As mentioned in the java docs, we propose both deduplicating locally within a task, and globally over all partitions.
Internally, in order to evaluate whether a record is a duplicate or not, the deduplication processor maintains a KeyValue store which saves deduplication id in the key.
In order to efficiently purge old records, an index will be maintained. The index saves the record's timestamp in the prefix of its key as well as the record's key, such that when eviction is triggered, a range scan of expired timestamps will be performed, and the corresponding entries in the base store will be removed, those index entries are removed as well.
The value of the base store holds the timestamp and the offset of the record within the partition. For ALOS, a crash may happen after saving the record in the store but before offset committing. When the record is reprocessed, if its value in the store has the same offset, then the record is forwarded, otherwise deduplication is performed.
For punctuated records - whose offset value is null - deduplication is always performed.
Finally, given two duplicate records, the first record received (i.e. the one with the least offset) is the one forwarded.
Semantics
Null deduplication key
For `deduplicateByKey()` and `deduplicate()` records whose key (respectively id) is null are always forwarded to the output stream.
For `deduplicateByKeyValue()`, where both the key and the id are used for deduplication, if one of them is null, we forward the record to the output stream. This is because:
- if the id is null: we don't want to consider only the key for deduplication (because other records with same key were not considered duplicate). The user can still map the null id to a constant value if he wants this behavior.
- if the key is null: we don't want a non-deterministic behavior. Two records with a null key may or may not be part of the same partition. In the first case they would be deduplicated, in the second case they would not be. The user can map the key to a constant value beforehand if he wants to deduplicate such records.
Duplicate sequence
Example: deduplicationInterval = 10s. Following events a1, a2 & a3 are duplicates.
- event a1 at t → Forwarded and saved in the store
- event a2 at t+8s → Not forwarded, we don't update the store
- event a3 at t+11s → This is a duplicate of a2 (i.e. within the deduplicationInterval) which has not been forwarded, we will forward it as if there was never an event a2
Similarly, for out-of-order records
- event a1 at t → Forwarded and saved in the store
- event a2 at t-8s → Not forwarded, we don't update the store
- event a3 at t-11s → Forwarded (see section Late events)
Deduplication interval boundaries
Deduplication interval ends are inclusive.
Example: deduplicationInterval = 10s. Following events a1, a2 & a3 are duplicates.
- event a1 @t=5s → Forwarded
- event a2 @t=15s → Dropped
- event a3 @t=16s → Forwarded
The same thing applies for deduplicating out-of-order records:
- event a1 @t=15s → Forwarded
- event a2 @t=5s → Dropped
- event a3 @t=4s → Forwarded
Thus, for the lower bound of deduplicationInterval = 0:
- event a1 @t=5s → Forwarded
- event a2 @t=5s → Dropped
- event a3 @t=6s → Forwarded
Late events by more than deduplicationInterval
Late events which are more than deduplicationInterval in the past (with respect to stream time) cannot be saved in the store (they will be constantly purged), hence their duplicates received afterwards won't be detected as duplicates. We have two choices:
- Drop late records
- Advantage: The late record may be a duplicate of a previously forwarded purged record, in this case we won't forward it twice
- Drawback: The record could be a non-duplicate, in this case we should have forwarded it
- Forward late records *if no duplicate found in the store*
- Advantage: Non-duplicate records are forwarded
- Drawback: The record may be a duplicate of a previously forwarded purged record.
We choose the forward strategy. In case it's needed, the user may add a processor to purge those late records according to the first strategy.
In the following examples, deduplicationInterval = 10s. Each event is labeled with its deduplication key and timestamp as "event <dedup-key> @t=<timestamp>"
Example1:
- event k @t=20s → Forwarded
- event k @t=25s → Dropped
- event k @t=11s → Dropped (streamTime=25s → This is a late event, but its duplicate @t=20s still exist in the store)
- event k @t=9s → Forwarded (late event with no duplicate within deduplicationInterval in the store)
Example2:
- event k1 @t=10s → Forwarded
- event k2 @t=20s → Forwarded
- event k1 @t=9s → Dropped (late event, but its duplicate @t=10s still exist in the store)
Example3:
- event k1 @t=10s → Forwarded
- event k2 @t=21s → Forwarded → event k1 @t=10s is now purged from the store
- event k1 @t=9s → Forwarded (late event with no duplicate in the store)
Compatibility, Deprecation, and Migration Plan
The proposed change is backwards compatible, no deprecation or migration needed.
Rejected Alternatives
Nature of the underlying store
- We use a window store to save the ids "seen" by the processor. Setting the retention of this store to deduplicationInterval allows to automatically purge old records.
Discarding late events
- Out-of-order events that are within deduplicationInterval from max observed streamTime are handled by the processor's logic above.
- However, the above processor's logic doesn't handle events that are more than deduplicationInterval in the past (i.e. with respect to maxObservedStreamTime). In fact, after receiving such a record, it won't be saved in the window store because it is considered expired. Any duplicate to it that arrives afterwards would therefore not be detected as duplicate.
Actually, it makes sense to ignore late records beyond deduplicationInterval in the past. If a user considers that deduplicationInterval is high enough to accept new duplicates of a record after this interval elapsed, then he considers that records happening before this interval are somehow expired.
Of course, the user can set deduplicationInterval to whatever value he judges to be sufficiently large.
Processor logic
- check if the record is more than deduplicationInterval late
- If so, ignore it.
- evaluate the record's deduplication id using the supplied KeyValueMapper
- fetch entries in the store having this key within the time interval (record.timestamp-deduplicationInterval, record.timestamp+deduplicationInterval)
- If no entries found → forward the record + save the record in the store
- If any entries found → do not forward + do not update the store
How to detect late records so as to drop them
The current WindowStore interface does not give information about whether a record is expired or not. put()ing a record does not return whether the record has been actually put or was expired. It does not give as well the max observed stream time info.
Therefore, to detect late records, the information of maxObservedStreamTime should be maintained within the processor, which is already available in ProcessorContext api currentStreamTimeMs().
-----------------------------------
Initial motivation
One example: we might have multiple data sources each reporting its state periodically with a relatively high frequency, their current states should be stored in a database. In case the actual change of the state occurs with a lower frequency than it is reported, in order to reduce the number of writes to the database we might want to filter out duplicated messages using Kafka Streams.
'Distinct' operation is common in data processing, e. g.
- SQL
DISTINCT
keyword, - In standard libraries for programming languages
- .NET LINQ
Distinct
method, - Java Stream
distinct()
, - Scala Seq
distinct()
,
- .NET LINQ
- In data processing frameworks:
Apache Spark's
distinct()
,- Apache Flink's
distinct()
, - Apache Beam's
Distinct()
, - Hazelcast Jet's
distinct()
, etc.
Hence it is natural to expect the similar functionality from Kafka Streams.
Although Kafka Streams Tutorials contains an example of how distinct
can be emulated, but this example is complicated: it involves low-level coding with local state store and a custom transformer. It might be much more convenient to have distinct
as a first-class DSL operation.
Due to 'infinite' nature of KStream, distinct
operation should be windowed, similar to windowed joins and aggregations for KStreams.
Initial public interface & examples
In accordance with KStreams DSL Grammar, we introduce the following new elements:
distinct()
parameterless DSLOperation on- TimeWindowedKStream<K,V> DSLObject which returns KStream<Windowed<K>,V>
- SessionWindowedKStream<K,V> DSLObject which returns KStream<Windowed<K>,V>
The following methods are added to the corresponding interfaces:
KTable<Windowed<K>, V> distinct(final Named named); KTable<Windowed<K>, V> distinct(final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> distinct(final Named named, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
The distinct operation returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window.
The records are considered to be duplicates iff serialized forms of their keys are equal.
Examples
Consider the following example (record times are in seconds):
//three bursts of variously ordered records
4, 5, 6
23, 22, 24
34, 33, 32
//'late arrivals'
7, 22, 35
'Epoch-aligned deduplication' using tumbling windows
.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
produces
(key@[00000/10000], 4)
(key@[20000/30000], 23)
(key@[30000/40000], 34)
-- that is, one record per epoch-aligned window.
Note: hopping and sliding windows do not make much sense for distinct()
because they produce multiple intersected windows, so that one record can be multiplied instead of deduplication.
SessionWindows work for 'data-aligned deduplication'.
.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
produces only
([key@4000/4000], 4)
([key@23000/23000], 23)
because all the records bigger than 7 are 'stuck together' in one session. Setting inactivity gap to 9 seconds will return three records:
([key@4000/4000], 4)
([key@23000/23000], 23)
([key@34000/34000], 34)
Initial Rejected public Interfaces
In accordance with KStreams DSL Grammar, we introduce the following new elements:
distinct
DSLOperation on aKStream<K, V>
DSLObject which returns anotherKStream<K, V>
DSLObject,DistinctParameters<K, V, I>
DSLParameter.
The type parameters are:
K
— key typeV
— value typeI
— the type of the record's unique identifier
With DistinctParameters<K, V, I>
the following can be provided:
KeyValueMapper<K, V, I>
idExtractor
— extracts a unique identifier from a record by which we de-duplicate input records. If it returns null, the record will not be considered for de-duping and forwarded as-is. If not provided, defaults to(key, value) -> key
, which means deduplication based on key of the record. Important assumption: records from different partitions should have different IDs, otherwise same IDs might be not co-partitioned.TimeWindows timeWindows
— tumbling or hopping time-based window specification. Required parameter. Only the first message with a given id that falls into a window will be passed downstream.Serde<I> idSerde
— serde for unique identifier.boolean isPersistent
— whether theWindowStore
that stores the unique ids should be persistent or not. In many cases, non-persistent store will be preferrable because of better performance. Downstream consumers must be ready to accept occasional duplicates.
Initial rejected Proposed Changes
- Add the following method to
KStream
interface:
<I> KStream<K, V> distinct(DistinctParameters<K, V, I> params);
Given the parameters, this method returns a new KStream
with only the first occurence of each record in any of the time windows, deduplicated by unique id. Any subsequent occurences in the time window are filtered out.
2. Add and implement the following DistinctParameters class:
class DistinctParameters<K, V, I> extends Named { /** Windowing parameters only. {@code (k,v)->k} id extractor is assumed, and a persistent store with key serde is used*/ public static <K, V> DistinctParameters<K, V, K> with(final TimeWindows timeWindows); /** Windowing parameters and a store persistency flag. {@code (k,v)->k} id extractor is assumed and a key serde is used*/ public static <K, V> DistinctParameters<K, V, K> with(final TimeWindows timeWindows, final boolean isPersistent); /** Windowing parameters, ID extractor, and a serde for unique IDs. A persistent store will be used.*/ public static <K, V, I> DistinctParameters<K, V, I> with(final TimeWindows timeWindows, final KeyValueMapper<K, V, I> idExtractor, final Serde<I> idSerde); /** Windowing parameters, ID extractor, a serde for unique IDs, and a flag showing whether the {@code WindowStore} should be * persistent or not.*/ public static <K, V, I> DistinctParameters<K, V, I> with(final TimeWindows timeWindows, final KeyValueMapper<K, V, I> idExtractor, final Serde<I> idSerde, final boolean isPersistent) }