Status

Current state: Under Discussion

Discussion thread: here

Old Discussion thread: here

Voting thread: here

JIRA: KAFKA-10369 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka-streams applications usually consumes from topics which are written with at-least-once semantics. In companies where Kafka is the backbone platform, an application maintained by team A may read topics that are written by team B. While team A may expect that the topic is written with exactly-once semantics, team B may not always fulfill that requirement. Thus, team A is forced to add a custom processor where they deduplicate data. This can result in having to write a custom deduplication processor for every external topic, and for every Kafka-streams application.

Public Interfaces

To save applications from having to write a deduplication processor each time, we introduce a new deduplication api that does the job for them.

KStream.java
public interface KStream<K, V> {  

    /**
     * Filter out duplicates from this stream based on record's key, within the provided time interval.
     * After receiving a non-duplicate record, any record with the same key that is received within the provided deduplicationInterval
     * will be discarded. After deduplicationInterval has elapsed since a non-duplicate record is received, a new record having
     * the same key is considered a non-duplicate, and is forwarded to the resulting {@link KStream}.
     * Records with a {@code null} key are always forwarded to the resulting {@link KStream}, no deduplication is performed on them.
     * <p>
     * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
     * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka.
     * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
     * <name> is an internally generated name, and "-repartition" is a fixed suffix.
     *
     * @param deduplicationInterval             the duration within which subsequent duplicates of a record will be discarded
     * @return                                  a KStream that contains the same records of this KStream without duplicates
     */
    KStream<K, V> deduplicateByKey(final Duration deduplicationInterval);


    KStream<K, V> deduplicateByKey(final Duration deduplicationInterval,
                                   final Deduplicated<K, V> deduplicated);


    /**
     * Filter out duplicates from this stream based on record's key and the provided {@link KeyValueMapper}, within the provided time interval.
     * The provided {@link KeyValueMapper} maps a record to an id.
     * For two records to be duplicate, they must have the same key and the same id. 
     * <p>
     * Example of usage.
     * <pre>{@code
     * KStream<String, Object> inputStream = builder.stream("topic");
     *
     * KStream<String, Object> outputStream = inputStream.deduplicate(new KeyValueMapper<String, Object, String>() {
     *     String apply(String key, Object value) {
     *         return value.id;
     *     }
     * }, Duration.ofSeconds(60));
     * }</pre>
     * </p>
     * After receiving a non-duplicate record, any duplicate to it that is received within the provided deduplicationInterval
     * will be discarded. After deduplicationInterval has elapsed since a non-duplicate record is received, a new record having
     * the same (key, id) is considered a non-duplicate, and is forwarded to the resulting {@link KStream}.           
     * Records with a {@code null} key OR a {@code null} id are always forwarded to the resulting {@link KStream}, no deduplication is performed on them.
     * <p>
     * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
     * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
     * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka.
     * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
     * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
     * <name> is an internally generated name, and "-repartition" is a fixed suffix.
     *
     * @param idSelector                        a {@link KeyValueMapper} that returns the unique id of the record
     * @param deduplicationInterval             the duration within which subsequent duplicates of a record will be discarded
     * @param <KR>                              the type of the deduplication id
     * @return                                  a KStream that contains the same records of this KStream without duplicates
     */
    <KR> KStream<K, V> deduplicateByKeyValue(final KeyValueMapper<? super K, ? super V, ? extends KR> idSelector,
                                             final Duration deduplicationInterval);


    <KR> KStream<K, V> deduplicateByKeyValue(final KeyValueMapper<? super K, ? super V, ? extends KR> idSelector,
                                             final Duration deduplicationInterval,
                                             final Deduplicated<KR, V> deduplicated);

    /**
     * Filter out duplicates from this stream based on the provided {@link KeyValueMapper}, within the provided time interval.
     * The provided {@link KeyValueMapper} maps a record to its deduplication id.
     * This processor causes repartitioning in order to get records of the same deduplication id in the same partition.
     * In case you only need to deduplicate records within a partition, consider using {@link #deduplicateByKeyValue(KeyValueMapper, Duration)}.
     * For deduplication over all partitions, an alternative way to this processor is to manually repartition the
     * stream such that the new key corresponds to the deduplication key and then use {@link #deduplicateByKey(Duration)}. This
     * latter alternative is better when it is fine to keep the deduplication key in the resulting stream's key.
     * <p>
     * In the example below, we deduplicate on an id field within the record's value
     * <pre>{@code
     * KStream<String, Object> inputStream = builder.stream("topic");
     * KStream<String, Object> outputStream = inputStream.deduplicate(new KeyValueMapper<String, Object, String>() {
     *     String apply(String key, Object value) {
     *         return value.id;
     *     }
     * }, Duration.ofSeconds(60));
     * }</pre>
     * </p>
     * After receiving a non-duplicate record, any duplicate to it that is received within the provided deduplicationInterval
     * will be discarded. After deduplicationInterval has elapsed since a non-duplicate record is received, a new record having
     * the same id is considered a non-duplicate, and is forwarded to the resulting {@link KStream}.
     * Records with a {@code null} id are always forwarded to the resulting {@link KStream}, no deduplication is performed on them.
     *
     * @param idSelector                        a {@link KeyValueMapper} that returns the unique id of the record
     * @param deduplicationInterval             the duration within which subsequent duplicates of a record will be discarded
     * @param <KR>                              the type of deduplication id
     * @return                                  a KStream that contains the same records of this KStream without duplicates
     */
    <KR> KStream<K, V> deduplicate(final KeyValueMapper<? super K, ? super V, ? extends KR> idSelector,
                                   final Duration deduplicationInterval);


    <KR> KStream<K, V> deduplicate(final KeyValueMapper<? super K, ? super V, ? extends KR> idSelector,
                                   final Duration deduplicationInterval,
                                   final Deduplicated<KR, V> deduplicated);  
}
Deduplicated.java
public class Deduplicated<K, V> implements NamedOperation<Deduplicated<K, V>> {

    protected final String name;
    protected final String storeName;
    protected final Serde<K> keySerde;
    protected final Serde<V> valueSerde;


    public static <K, V> Deduplicated<K, V> as(final String name);


    public static <K> Deduplicated<K> keySerde(final Serde<K> keySerde);


    public static <V> Deduplicated<V> valueSerde(final Serde<V> valueSerde);


    public static <K, V> Deduplicated<K, V> with(final Serde<V> storeName,
                                                 final Serde<K> keySerde,
                                                 final Serde<V> valueSerde);


    public static <K, V> Deduplicated<K, V> with(final Serde<K> keySerde,
                                                 final Serde<V> valueSerde); 

    public Deduplicated<K, V> withName(final String name);

    public Deduplicated<K, V> withKeySerde(final Serde<K> keySerde);

    public Deduplicated<K, V> withValueSerde(final Serde<V> valueSerde);
}


The parameter deduplicationInterval allows to:

  • control the time interval within which duplicates of a record are discarded
  • limit storage by preventing the state store from growing indefinitely


Proposed changes

As mentioned in the java docs, we propose both deduplicating locally within a task, and globally over all partitions.

Internally, in order to evaluate whether a record is a duplicate or not, the deduplication processor maintains a KeyValue store which saves deduplication id in the key.

In order to efficiently purge old records, an index will be maintained. The index saves the record's timestamp in the prefix of its key as well as the record's key, such that when eviction is triggered, a range scan of expired timestamps will be performed, and the corresponding entries in the base store will be removed, those index entries are removed as well.

The value of the base store holds the timestamp and the offset of the record within the partition. For ALOS, a crash may happen after saving the record in the store but before offset committing. When the record is reprocessed, if its value in the store has the same offset, then the record is forwarded, otherwise deduplication is performed.

For punctuated records - whose offset value is null - deduplication is always performed.

Finally, given two duplicate records, the first record received (i.e. the one with the least offset) is the one forwarded.


Semantics

Null deduplication key

For `deduplicateByKey()` and `deduplicate()` records whose key (respectively id) is null are always forwarded to the output stream.

For `deduplicateByKeyValue()`, where both the key and the id are used for deduplication, if one of them is null, we forward the record to the output stream. This is because:

  • if the id is null: we don't want to consider only the key for deduplication (because other records with same key were not considered duplicate). The user can still map the null id to a constant value if he wants this behavior.
  • if the key is null: we don't want a non-deterministic behavior. Two records with a null key may or may not be part of the same partition. In the first case they would be deduplicated, in the second case they would not be. The user can map the key to a constant value beforehand if he wants to deduplicate such records.


Duplicate sequence

Example: deduplicationInterval = 10s. Following events a1, a2 & a3 are duplicates.

  • event a1 at t → Forwarded and saved in the store
  • event a2 at t+8s → Not forwarded, we don't update the store
  • event a3 at t+11s → This is a duplicate of a2 (i.e. within the deduplicationInterval) which has not been forwarded, we will forward it as if there was never an event a2

Similarly, for out-of-order records

  • event a1 at t → Forwarded and saved in the store
  • event a2 at t-8s → Not forwarded, we don't update the store
  • event a3 at t-11s → Forwarded (see section Late events)


Deduplication interval boundaries

Deduplication interval ends are inclusive.

Example: deduplicationInterval = 10s. Following events a1, a2 & a3 are duplicates.

  • event a1 @t=5s  → Forwarded
  • event a2 @t=15s → Dropped
  • event a3 @t=16s → Forwarded

The same thing applies for deduplicating out-of-order records:

  • event a1 @t=15s  → Forwarded
  • event a2 @t=5s → Dropped
  • event a3 @t=4s → Forwarded

Thus, for the lower bound of deduplicationInterval = 0:

  • event a1 @t=5s → Forwarded
  • event a2 @t=5s → Dropped
  • event a3 @t=6s → Forwarded

Late events by more than deduplicationInterval

Late events which are more than deduplicationInterval in the past (with respect to stream time) cannot be saved in the store (they will be constantly purged), hence their duplicates received afterwards won't be detected as duplicates. We have two choices:

  • Drop late records
    • Advantage: The late record may be a duplicate of a previously forwarded purged record, in this case we won't forward it twice
    • Drawback: The record could be a non-duplicate, in this case we should have forwarded it
  • Forward late records *if no duplicate found in the store*
    • Advantage: Non-duplicate records are forwarded
    • Drawback: The record may be a duplicate of a previously forwarded purged record.

We choose the forward strategy. In case it's needed, the user may add a processor to purge those late records according to the first strategy.

In the following examples, deduplicationInterval = 10s. Each event is labeled with its deduplication key and timestamp as "event <dedup-key> @t=<timestamp>"

Example1:

  • event k @t=20s → Forwarded
  • event k @t=25s → Dropped           
  • event k @t=11s → Dropped            (streamTime=25s → This is a late event, but its duplicate @t=20s still exist in the store)
  • event k @t=9s → Forwarded         (late event with no duplicate within deduplicationInterval in the store)

Example2:

  • event k1 @t=10s → Forwarded    
  • event k2 @t=20s → Forwarded    
  • event k1 @t=9s → Dropped           (late event, but its duplicate @t=10s still exist in the store)

Example3:

  • event k1 @t=10s → Forwarded    
  • event k2 @t=21s → Forwarded       → event k1 @t=10s is now purged from the store
  • event k1 @t=9s → Forwarded      (late event with no duplicate in the store)






Compatibility, Deprecation, and Migration Plan

The proposed change is backwards compatible, no deprecation or migration needed.


Rejected Alternatives

Nature of the underlying store

  • We use a window store to save the ids "seen" by the processor. Setting the retention of this store to deduplicationInterval allows to automatically purge old records.

Discarding late events

  • Out-of-order events that are within deduplicationInterval from max observed streamTime are handled by the processor's logic above.
  • However, the above processor's logic doesn't handle events that are more than deduplicationInterval in the past (i.e. with respect to maxObservedStreamTime). In fact, after receiving such a record, it won't be saved in the window store because it is considered expired. Any duplicate to it that arrives afterwards would therefore not be detected as duplicate.

          Actually, it makes sense to ignore late records beyond deduplicationInterval in the past. If a user considers that deduplicationInterval is high enough to accept new duplicates of a record after this interval elapsed, then he considers that records happening before this interval are somehow expired.

          Of course, the user can set deduplicationInterval to whatever value he judges to be sufficiently large.

Processor logic

  • check if the record is more than deduplicationInterval late
    • If so, ignore it.
  • evaluate the record's deduplication id using the supplied KeyValueMapper
  • fetch entries in the store having this key within the time interval (record.timestamp-deduplicationInterval, record.timestamp+deduplicationInterval)
    • If no entries found → forward the record + save the record in the store 
    • If any entries found → do not forward + do not update the store

How to detect late records so as to drop them

The current WindowStore interface does not give information about whether a record is expired or not. put()ing a record does not return whether the record has been actually put or was expired. It does not give as well the max observed stream time info.

Therefore, to detect late records, the information of maxObservedStreamTime should be maintained within the processor, which is already available in ProcessorContext api currentStreamTimeMs().


-----------------------------------

Initial motivation

One example: we might have multiple data sources each reporting its state periodically with a relatively high frequency, their current states should be stored in a database. In case the actual change of the state occurs with a lower frequency than it is reported, in order to reduce the number of writes to the database we might want to filter out duplicated messages using Kafka Streams.

'Distinct' operation is common in data processing, e. g.

Hence it is natural to expect the similar functionality from Kafka Streams.

Although Kafka Streams Tutorials contains an example of how distinct can be emulated, but this example is complicated: it involves low-level coding with local state store and a custom transformer. It might be much more convenient to have distinct as a first-class DSL operation.

Due to 'infinite' nature of KStream, distinct operation should be windowed, similar to windowed joins and aggregations for KStreams.


Initial public interface & examples

In accordance with KStreams DSL Grammar, we introduce the following new elements:

  • distinct() parameterless DSLOperation on

    • TimeWindowedKStream<K,V>  DSLObject which returns KStream<Windowed<K>,V> 
    • SessionWindowedKStream<K,V> DSLObject which returns KStream<Windowed<K>,V>

The following methods are added to the corresponding interfaces:



KTable<Windowed<K>, V> distinct(final Named named);
KTable<Windowed<K>, V> distinct(final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
KTable<Windowed<K>, V> distinct(final Named named,
                                    final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);


The distinct operation returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window.

The records are considered to be duplicates iff serialized forms of their keys are equal.


Examples

Consider the following example (record times are in seconds):

//three bursts of variously ordered records
4, 5, 6
23, 22, 24
34, 33, 32
//'late arrivals'
7, 22, 35

'Epoch-aligned deduplication' using tumbling windows

.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()

produces

(key@[00000/10000], 4)
(key@[20000/30000], 23)
(key@[30000/40000], 34)

-- that is, one record per epoch-aligned window.

Note: hopping and sliding windows do not make much sense for distinct() because they produce multiple intersected windows, so that one record can be multiplied instead of deduplication.

SessionWindows work for 'data-aligned deduplication'.

.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()

produces only

([key@4000/4000], 4)
([key@23000/23000], 23)

because all the records bigger than 7 are 'stuck together' in one session. Setting inactivity gap to 9 seconds will return three records:

([key@4000/4000], 4)
([key@23000/23000], 23)
([key@34000/34000], 34)

Initial Rejected public Interfaces

In accordance with KStreams DSL Grammar, we introduce the following new elements:

  • distinct DSLOperation on a KStream<K, V> DSLObject which returns another KStream<K, V> DSLObject,

  • DistinctParameters<K, V, I> DSLParameter.

The type parameters are:

  • K — key type
  • V — value type
  • I — the type of the record's unique identifier

With DistinctParameters<K, V, I> the following can be provided:

  1. KeyValueMapper<K, V, I> idExtractor — extracts a unique identifier from a record by which we de-duplicate input records. If it returns null, the record will not be considered for de-duping and forwarded as-is. If not provided, defaults to (key, value) -> key, which means deduplication based on key of the record. Important assumption: records from different partitions should have different IDs, otherwise same IDs might be not co-partitioned.
  2. TimeWindows timeWindows — tumbling or hopping time-based window specification. Required parameter. Only the first message with a given id that falls into a window will be passed downstream.
  3. Serde<I> idSerde — serde for unique identifier.
  4. boolean isPersistent — whether the WindowStore that stores the unique ids should be persistent or not. In many cases, non-persistent store will be preferrable because of better performance. Downstream consumers must be ready to accept occasional duplicates.

Initial rejected Proposed Changes

  1. Add the following method to KStream interface:
<I> KStream<K, V> distinct(DistinctParameters<K, V, I> params);

Given the parameters, this method returns a new KStream with only the first occurence of each record in any of the time windows, deduplicated by unique id. Any subsequent occurences in the time window are filtered out.

2. Add and implement the following DistinctParameters class:

class DistinctParameters<K, V, I> extends Named {
    /** Windowing parameters only. {@code (k,v)->k} id extractor is assumed, and a persistent store with key serde is used*/
    public static <K, V> DistinctParameters<K, V, K> with(final TimeWindows timeWindows);

   /** Windowing parameters and a store persistency flag. {@code (k,v)->k} id extractor is assumed and a key serde is used*/
    public static <K, V> DistinctParameters<K, V, K> with(final TimeWindows timeWindows, final boolean isPersistent);
    
    /** Windowing parameters, ID extractor, and a serde for unique IDs. A persistent store will be used.*/
    public static <K, V, I> DistinctParameters<K, V, I> with(final TimeWindows timeWindows,
                                                             final KeyValueMapper<K, V, I> idExtractor,
                                                             final Serde<I> idSerde);
    /** Windowing parameters, ID extractor, a serde for unique IDs, and a flag showing whether the {@code WindowStore} should be 
     * persistent or not.*/ 
    public static <K, V, I> DistinctParameters<K, V, I> with(final TimeWindows timeWindows,
                                                             final KeyValueMapper<K, V, I> idExtractor,
                                                             final Serde<I> idSerde,
                                                             final boolean isPersistent)
}


  • No labels