Child pages
  • KIP-244: Add Record Header support to Kafka Streams
Skip to end of metadata
Go to start of metadata

Status

Current state: Under Discussion

Discussion thread: here

JIRA:

Loading
Key Summary T Created Updated Due Assignee Reporter P Status Resolution
KAFKA-6395 KIP: Add Record Header support to Kafka Streams Improvement Dec 21, 2017 Mar 29, 2018 Jorge Quilcate Jorge Quilcate Major Resolved Duplicate
KAFKA-5632 Message headers not supported by Kafka Streams Bug Jul 24, 2017 Mar 29, 2018 Unassigned CJ Woolard Minor Open Unresolved


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is aimed to add Record Headers support as part of Streams API for routing, filtering and mapping. Stateful operations (tables, join) will be out-of-scope.

Public Interfaces

1. Add headers as part of processing:

org.apache.kafka.streams.processor.Processor:
  • void process(K key, V value, Headers headers);
org.apache.kafka.streams.processor.ProcessorContext:
  • <K, V> void forward(K key, V value);
  • <K, V> void forward(K key, V value, Headers headers);
  • <K, V> void forward(K key, V value, String childName);
  • <K, V> void forward(K key, V value, Headers headers, String childName); 
  • <K, V> void forward(K key, V value, int childIndex);
  • <K, V> void forward(K key, V value, Headers headers, int childIndex);
org.apache.kafka.streams.processor.internals.RecordContext:
  • Headers headers();
org.apache.kafka.streams.processor.internals.RecordCollector:
  • <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
  • <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner); 
  • <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final Headers headers); 
  • <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Headers headers);

2. Add methods to use Headers as part of processing:

org.apache.kafka.streams.kstream.KStream:
  • `KStream<K, V> filter(PredicateWithHeaders<? super K, ? super V> predicate);`
  • `KStream<K, V> filterNot(PredicateWithHeaders<? super K, ? super V> predicate);`
  • `<KR, VR> KStream<KR, VR> map(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);`
  • `<KR, VR> KStream<KR, VR> flatMap(final KeyValueAndHeadersMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);`
  • `void foreach(final ForeachActionWithHeaders<? super K, ? super V> action);`
  • `KStream<K, V> peek(final ForeachActionWithHeaders<? super K, ? super V> action);`
  • `KStream<K, V>[] branch(final PredicateWithHeaders<? super K, ? super V>... predicates);`

Proposed Changes

1. Enable Kafka Streams to filter, map and process Records using Headers. This is based on API change #3. 

It will require the following functional interfaces:

  • PredicateWithHeaders
  • KeyValueAndHeadersMapper
  • ForeachActionWithHeaders

Then, clients will be able to use headers from Lambda expressions or in their anonymous classes:

```java

  stream.filter(new PredicateWithHeaders<String, WikiFeed>() {
  @Override
      public boolean test(final String dummy, final WikiFeed value, final Headers headers) {
          ...
      }
   })

```

And `RecordContext` will have headers as member to wrap it and pass it to internals.

 

Compatibility, Deprecation, and Migration Plan

  • Clients using High-Level DSL should not be affected with changes proposed.
  • Clients using Processor API will need to implement `void process(K key, V value, Headers headers);` to by-pass or handle Headers. 

Rejected Alternatives

1. Adding Headers to KTable API will mean propagate Headers to Stores that are Key Value specific like RocksDB. If headers are required in stateful operations, clients will need to map headers values first to key or value and then do processing.

 

References

Draft/WIP: https://github.com/jeqo/kafka/tree/streams-headers

Changes: https://github.com/apache/kafka/compare/trunk...jeqo:streams-headers

  • No labels