Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. KStream#mapValueToRecord(Named) operation exposes the new Record<K, V> type including headers, and topic metadata. The overloaded parameterless alternative mapRecordValue mapValueToRecord() is also available.

  2. RecordSerde<K, V is a public API, and it's implicitly defined as valueSerde when the KStream#mapValueToRecord operation is called.

  3. KStream#setRecordHeaders(RecordHeaderMapper, Named) operation will “flush” headers into the actual record's headers crossing the stream operation, to be used by consumers downstream. This mapper function receives K key and V value, and return a o.a.k.s.header.Headers. Users can create new Headers using the Streams' implementation o.a.k.s.header.StreamHeaders, or using existing ones by previously using KStreams#mapRecordValueKStreams#mapValueToRecord(). The overloaded parameterless alternative setRecordHeaders(RecordHeaderMapper) is also available.

...

  • KStream:
Code Block
KStream<K, Record<K, RecordValue<V>>V>> mapValueToRecord(final Named named);
 
KStream<K, Record<K, RecordValue<V>>V>> mapValueToRecord();
 
KStream<K, V> setRecordHeaders(final RecordHeadersMapper<? super K, ? super V> action, final Named named);
 
KStream<K, V> setRecordHeaders(final RecordHeadersMapper<? super K, ? super V> action);

...

Code Block
package org.apache.kafka.streams.processor.api.header;

import java.util.Iterator;

public interface Headers extends Iterable<Header> {

    // State validation
    int size();
    boolean isEmpty();
    
    // Access headers
    Iterator<Header> allWithName(final String key);
    Header lastWithName(final String key);
    boolean hasWithName(final String key);
    
    // Add/delete/clean
    Headers add(final Header header);
    Headers add(final String key, final byte[] value);
    Headers addUtf8(final String key, final String value);
    Headers remove(final String key);
    Headers retainLatest(final String key);
    Headers retainLatest();
    Headers clear();
    Headers duplicate();
    
    // Transformations
    Headers apply(final Headers.HeaderTransform transform);
    Headers apply(final String key, final Headers.HeaderTransform transform);
    
    interface HeaderTransform {
        Header apply(final Header header);
    }

    // to core Headers
    org.apache.kafka.common.header.Headers unwrap();
}


4. Add `RecordValue<V>`:

Code Block
package org.apache.kafka.streams.kstream;

import java.util.Objects;
import org.apache.kafka.streams.header.Headers;
import org.apache.kafka.streams.header.StreamHeaders;

public class RecordValue<V> {

    final String topic;
    final int partition;
    final long offset;
    final V value;
    final long timestamp;
    final Headers headers;
    //...
}

Modify Record<K, V>

Code Block
package org.apache.kafka.streams.processor.api;

import java.util.Optional;
import org.apache.kafka.streams.errors.StreamsException;

import java.util.Objects;
import org.apache.kafka.streams.processor.api.header.Header;
import org.apache.kafka.streams.processor.api.header.Headers;
import org.apache.kafka.streams.processor.api.header.StreamHeaders;

public class Record<K, V> implements RecordMetadata {
    public static final Header[] EMPTY_HEADERS = new Header[0];

    private final K key;
    private final V value;
    private final long timestamp;
    private final Headers headers;

    private final String topic;
    private final int partition;
    private final long offset;

    public Record(final K key, final V value,
        final long timestamp,
        final Headers headers,
        final String topic, final int partition, final long offset) {
    }

    public Record(final K key, final V value,
        final long timestamp,
        final org.apache.kafka.common.header.Headers headers,
        final String topic, final int partition, final long offset) {

        this(key, value, timestamp, StreamHeaders.wrap(headers), topic, partition, offset);
    }

    public Record(final K key, final V value,
        final long timestamp,
        final org.apache.kafka.common.header.Header[] headers,
        final String topic, final int partition, final long offset) {

        this(key, value, timestamp, StreamHeaders.wrap(headers), topic, partition, offset);
    }

    public Record(final K key, final V value,
        final long timestamp,
        final org.apache.kafka.common.header.Header[] headers,
        final Optional<RecordMetadata> recordMetadata) {

        this(key, value, timestamp, StreamHeaders.wrap(headers),
            recordMetadata.map(RecordMetadata::topic).orElse(null),
            recordMetadata.map(RecordMetadata::partition).orElse(-1),
            recordMetadata.map(RecordMetadata::offset).orElse(-1L));
    }

    public Record(final K key, final V value,
        final long timestamp,
        final org.apache.kafka.common.header.Headers headers,
        final Optional<RecordMetadata> recordMetadata) {

        this(key, value, timestamp, StreamHeaders.wrap(headers),
            recordMetadata.map(RecordMetadata::topic).orElse(null),
            recordMetadata.map(RecordMetadata::partition).orElse(-1),
            recordMetadata.map(RecordMetadata::offset).orElse(-1L));
    }

    public Record(final K key, final V value,
        final long timestamp,
        final Headers headers,
        final Optional<RecordMetadata> recordMetadata) {

        this(key, value, timestamp, headers,
            recordMetadata.map(RecordMetadata::topic).orElse(null),
            recordMetadata.map(RecordMetadata::partition).orElse(-1),
            recordMetadata.map(RecordMetadata::offset).orElse(-1L));
    }

    public Record(final K key, final V value,
        final long timestamp,
        final org.apache.kafka.common.header.Header[] headers) {

        this(key, value, timestamp, StreamHeaders.wrap(headers), Optional.empty());
    }

    public Record(final K key, final V value,
        final long timestamp,
        final org.apache.kafka.common.header.Headers headers) {

        this(key, value, timestamp, StreamHeaders.wrap(headers), Optional.empty());
    }

    public Record(final K key, final V value,
        final long timestamp,
        final Headers headers) {

        this(key, value, timestamp, headers, Optional.empty());
    }

    public Record(final K key, final V value, final long timestamp) {
        this(key, value, timestamp, org.apache.kafka.common.record.Record.EMPTY_HEADERS);
    }

    public K key() {
        return key;
    }

    public V value() {
        return value;
    }

    public long timestamp() {
        return timestamp;
    }

    public Headers headers() {
        return headers;
    }

    public <NewK> Record<NewK, V> withKey(final NewK key) {
        return new Record<>(key, value, timestamp, headers, recordMetadata());
    }

    public <NewV> Record<K, NewV> withValue(final NewV value) {
        return new Record<>(key, value, timestamp, headers, recordMetadata());
    }

    public Record<K, V> withTimestamp(final long timestamp) {
        return new Record<>(key, value, timestamp, headers, recordMetadata());
    }

    public Record<K, V> withHeaders(final Headers headers) {
        return new Record<>(key, value, timestamp, headers, recordMetadata());
    }

    public Record<K, V> withHeaders(final org.apache.kafka.common.header.Headers headers) {
        return new Record<>(key, value, timestamp, headers, recordMetadata());
    }

    @Override
    public String topic() {
        return topic;
    }

    @Override
    public int partition() {
        return partition;
    }

    @Override
    public long offset() {
        return offset;
    }

    Optional<RecordMetadata> recordMetadata() {
        if (topic != null) {
            return Optional.of(new RecordMetadata() {
                @Override
                public String topic() {
                    return topic;
                }

                @Override
                public int partition() {
                    return partition;
                }

                @Override
                public long offset() {
                    return offset;
                }
            });
        } else  {
            return Optional.empty();
        }
    }
}

...