...
KStream#mapValueToRecord(Named)
operation exposes the newRecord<K, V>
type including headers, and topic metadata. The overloaded parameterless alternative mapRecordValuemapValueToRecord
() is also available.RecordSerde<K, V
is a public API, and it's implicitly defined asvalueSerde
when theKStream#mapValueToRecord
operation is called.KStream#setRecordHeaders(RecordHeaderMapper, Named)
operation will “flush” headers into the actual record's headers crossing the stream operation, to be used by consumers downstream. This mapper function receivesK
key andV
value, and return ao.a.k.s.header.Headers
. Users can create new Headers using the Streams' implementationo.a.k.s.header.StreamHeaders
, or using existing ones by previously usingKStreams#mapRecordValueKStreams#mapValueToRecord()
. The overloaded parameterless alternativesetRecordHeaders(RecordHeaderMapper)
is also available.
...
KStream
:
Code Block |
---|
KStream<K, Record<K, RecordValue<V>>V>> mapValueToRecord(final Named named); KStream<K, Record<K, RecordValue<V>>V>> mapValueToRecord(); KStream<K, V> setRecordHeaders(final RecordHeadersMapper<? super K, ? super V> action, final Named named); KStream<K, V> setRecordHeaders(final RecordHeadersMapper<? super K, ? super V> action); |
...
Code Block |
---|
package org.apache.kafka.streams.processor.api.header; import java.util.Iterator; public interface Headers extends Iterable<Header> { // State validation int size(); boolean isEmpty(); // Access headers Iterator<Header> allWithName(final String key); Header lastWithName(final String key); boolean hasWithName(final String key); // Add/delete/clean Headers add(final Header header); Headers add(final String key, final byte[] value); Headers addUtf8(final String key, final String value); Headers remove(final String key); Headers retainLatest(final String key); Headers retainLatest(); Headers clear(); Headers duplicate(); // Transformations Headers apply(final Headers.HeaderTransform transform); Headers apply(final String key, final Headers.HeaderTransform transform); interface HeaderTransform { Header apply(final Header header); } // to core Headers org.apache.kafka.common.header.Headers unwrap(); } |
4. Add `RecordValue<V>`:
Code Block |
---|
package org.apache.kafka.streams.kstream;
import java.util.Objects;
import org.apache.kafka.streams.header.Headers;
import org.apache.kafka.streams.header.StreamHeaders;
public class RecordValue<V> {
final String topic;
final int partition;
final long offset;
final V value;
final long timestamp;
final Headers headers;
//...
} |
Modify Record<K, V>
Code Block |
---|
package org.apache.kafka.streams.processor.api; import java.util.Optional; import org.apache.kafka.streams.errors.StreamsException; import java.util.Objects; import org.apache.kafka.streams.processor.api.header.Header; import org.apache.kafka.streams.processor.api.header.Headers; import org.apache.kafka.streams.processor.api.header.StreamHeaders; public class Record<K, V> implements RecordMetadata { public static final Header[] EMPTY_HEADERS = new Header[0]; private final K key; private final V value; private final long timestamp; private final Headers headers; private final String topic; private final int partition; private final long offset; public Record(final K key, final V value, final long timestamp, final Headers headers, final String topic, final int partition, final long offset) { } public Record(final K key, final V value, final long timestamp, final org.apache.kafka.common.header.Headers headers, final String topic, final int partition, final long offset) { this(key, value, timestamp, StreamHeaders.wrap(headers), topic, partition, offset); } public Record(final K key, final V value, final long timestamp, final org.apache.kafka.common.header.Header[] headers, final String topic, final int partition, final long offset) { this(key, value, timestamp, StreamHeaders.wrap(headers), topic, partition, offset); } public Record(final K key, final V value, final long timestamp, final org.apache.kafka.common.header.Header[] headers, final Optional<RecordMetadata> recordMetadata) { this(key, value, timestamp, StreamHeaders.wrap(headers), recordMetadata.map(RecordMetadata::topic).orElse(null), recordMetadata.map(RecordMetadata::partition).orElse(-1), recordMetadata.map(RecordMetadata::offset).orElse(-1L)); } public Record(final K key, final V value, final long timestamp, final org.apache.kafka.common.header.Headers headers, final Optional<RecordMetadata> recordMetadata) { this(key, value, timestamp, StreamHeaders.wrap(headers), recordMetadata.map(RecordMetadata::topic).orElse(null), recordMetadata.map(RecordMetadata::partition).orElse(-1), recordMetadata.map(RecordMetadata::offset).orElse(-1L)); } public Record(final K key, final V value, final long timestamp, final Headers headers, final Optional<RecordMetadata> recordMetadata) { this(key, value, timestamp, headers, recordMetadata.map(RecordMetadata::topic).orElse(null), recordMetadata.map(RecordMetadata::partition).orElse(-1), recordMetadata.map(RecordMetadata::offset).orElse(-1L)); } public Record(final K key, final V value, final long timestamp, final org.apache.kafka.common.header.Header[] headers) { this(key, value, timestamp, StreamHeaders.wrap(headers), Optional.empty()); } public Record(final K key, final V value, final long timestamp, final org.apache.kafka.common.header.Headers headers) { this(key, value, timestamp, StreamHeaders.wrap(headers), Optional.empty()); } public Record(final K key, final V value, final long timestamp, final Headers headers) { this(key, value, timestamp, headers, Optional.empty()); } public Record(final K key, final V value, final long timestamp) { this(key, value, timestamp, org.apache.kafka.common.record.Record.EMPTY_HEADERS); } public K key() { return key; } public V value() { return value; } public long timestamp() { return timestamp; } public Headers headers() { return headers; } public <NewK> Record<NewK, V> withKey(final NewK key) { return new Record<>(key, value, timestamp, headers, recordMetadata()); } public <NewV> Record<K, NewV> withValue(final NewV value) { return new Record<>(key, value, timestamp, headers, recordMetadata()); } public Record<K, V> withTimestamp(final long timestamp) { return new Record<>(key, value, timestamp, headers, recordMetadata()); } public Record<K, V> withHeaders(final Headers headers) { return new Record<>(key, value, timestamp, headers, recordMetadata()); } public Record<K, V> withHeaders(final org.apache.kafka.common.header.Headers headers) { return new Record<>(key, value, timestamp, headers, recordMetadata()); } @Override public String topic() { return topic; } @Override public int partition() { return partition; } @Override public long offset() { return offset; } Optional<RecordMetadata> recordMetadata() { if (topic != null) { return Optional.of(new RecordMetadata() { @Override public String topic() { return topic; } @Override public int partition() { return partition; } @Override public long offset() { return offset; } }); } else { return Optional.empty(); } } } |
...