This document is an attempt to sketch out a Kafka metadata architecture. It would provide an API for metadata information to be added to kafka messages and a way for plugins and interceptors to add, remove, modify and otherwise act based on this metadata.
Goals
- implement a per-message metadata architecture
- allow the creation independent kafka middleware libraries
- allow the composition of metadata stacks/sets for producing, consuming and processing
- different middleware libraries (from potentially different vendors) should be able to co-exist and do not need to know of each other's existence- they all interact with a common header structure thats part of the core kafka API
- end users (people who code against KafkaProducer / KafkaConsumer API) dont have to be aware of any middleware. this enables infrastructure teams in large organizations to provide a common platform for a much larger number of internal developers in a transparent way.
- we want to enable middleware on the producer, on the consumer, and also on the broker itself.
What We Mean by Headers
generally, headers are a map of header identifiers to header values. in order for different middlewares to co-exist the they must all agree on what identifiers are, but they need not agree on what values are those identifiers are (so Map<[Identifier], byte[]>)
Header Identifier and Value Format
common header schemes use Map<String, String> for headers (think http), but that could result in a large size overheads - which is why for example http/2 is a binary protocol. forcing the use of Strings as header values is also too restrictive - the contents of a header should be the owning middleware's issue, and its alone. to minimize the size of messages on the wire we propose header identifiers be 32-bit values and header values be arbitrary-length blobs. also, to easily enable middleware on the broker we propose that the wire format for headers be fixed and not up to the configuration of individual producers/consumers.
Binary Identifiers and Interoperability
its easy for different middleware libraries to not interfere with each other if header identifiers are strings - for example by using reverse domain names in audit fields like java code has been doing for over 20 years. we claim its also possible for different middleware to not interfere with each other even with identifiers are (much more compact) integers - by partitioning the identifier space similar to how IPv4 ranges are. the kafka apache project could act as the "registrar" for new int identifiers, with a certain range dedicated to "internal" middleware not meant for distribution (similar in concept to 192.168.*.* IPv4 addresses).
Binary Values for flexibility
as opposed to header identifiers, which are a "global namespace" where middlewares need to be careful not to conflict, header values would be arbitrary blobs to enable maximum flexibility. some middlewares might use UTF-8 encoded Strings for values, but some may choose to use binary values to achieve a more compact message size (things like security data, checksums, tokens etc)
Agreed-Upon wire encoding for broker-side middleware
we would like to enable middleware that resides on the broker. such a component would still need to be able to decode header information (but not necessarily the message key and value) and so the wire format needs to fixed. TBD: wire format
Proposed API
add headers to ProducerRecord and ConsumerRecord
class ProducerRecord<K, V> { K key; V value; Map<Integer, byte[]> headers; }
class ConsumerRecord<K, V> { K key; V value; Map<Integer, byte[]> headers; }
expose the headers to serializers and deserializers
interface Serializer<T> { byte[] serialize(String topic, T data, Map<Integer, byte[]> headers); }
interface Deserializer<T> { T deserialize(String topic, byte[] data, Map<Integer, byte[]> headers); }
also add the ability to intercept producer records post-serialization
interface ProducerPostSerializationInterceptor { //key and value have already been serialized, but headers not ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> serializedRecord); }
another proposed pluggability-point is broker-side interceptors both inline when ingressing records and at compaction time
Example Middleware Applications Utilizing Headers
Schema-Registry-aware avro serialization
AvroSerializer<T extends GenericRecord> implements Serializer<T> Deserializer<T>{ @Override byte[] serialize(String topic, T data, Map<Integer, byte[]> headers) { byte[] schemaMd5 = schemaRegistryClient.depositSchemaIfNew(...) headers.put(CONTENT_TYPE, "application/avro-binary".getBytes()); headers.put(AVRO_SCHEMA_MD5, schemaMd5); return doSerialize(data); } @Override T deserialize(String topic, byte[] data, Map<Integer, byte[]> headers) { byte[] schemaMd5 = headers.get(AVRO_SCHEMA_MD5); return deDeserialize(schemaRegistryClient.get(schemaMd5), data); } }
alternatively this could be more elegantly split into a producer interceptor to deposit the schema and set the header and a generic avro serializer that is not aware of schema registry
Polyglot kafka topics
this could enable a consumer to read from a topic (or set of topics) which contains messages serialized in a veriety of ways
PolyglotDeserializer implements Deserializer<T> { Map<String, Deserializer> byContentType = ... @Override T deserialize(String topic, byte[] data, Map<Integer, byte[]> headers) { String type = new String(headers.get(CONTENT_TYPE)); return byContentType.get(type).deserialize(...); } }
End-to-End routing trail including mirroring
class Entry implements Serializable { String byWhom; String producedIntoCluster; String producedIntoTopic; long when; String readFromCluster; String readFromTopic; } PaperTrailInterceptor implements ProducerInterceptor<K, V>, ConsumerInterceptor<K, V> { @Override ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { List<Entry> trail = getOrCreateTrail(record.headers, false); //if we're on a mirrormaker the consumer has already set the latest entry Entry latest = trail.get(trail.size()-1); latest.setProducedIntoCluster(...); latest.setProducedIntoTopic(record.topic); record.headers.put(PAPER_TRAIL, serialize(trail)); } @Override ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { for each record { List<Entry> trail = getOrCreateTrail(record.headers, true); Entry latest = trail.get(trail.size()-1); latest.setReadFromCluster(...); latest.setReadFromTopic(record.topic); record.headers.put(PAPER_TRAIL, serialize(trail)); } } private Entry getOrCreateTrail(Map<Integer, byte[]> headers, boolean createRegardless) { List<Entry> trail = deserialize(headers.get(PAPER_TRAIL)); Entry latest; if (trail == null || createRegardless) { trail = new ArrayList<>(1); latest = new Entry(hostName, now); trail.add(latest); } return trail; } }
Cross-service process tracing
TraceInterceptor implements ProducerInterceptor<K, V>, ConsumerInterceptor<K, V> { @Override ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { Object traceCookie = SomeThreadLocalContext.getContext(); if (traceCookie != null) { //this produce call is part of some larger tx, pass it on record.headers.put(TRACE_COOKIE, serialize(traceCookie)); } } @Override ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { //this would really benefit from a per-record onConsume()... } }
Signing
this examples assumes the existence of a PostSerializationProducerInterceptor, whose onSend() method is called after key and value serialization (and before header serialization)
Signer implements ProducerPostSerializationInterceptor { @Override void onSend(ProducerRecord<byte[], byte[]> record) { byte[] signature = sign(record.value); record.headers.put(SIGNATURE, signature); } }
an equivalent pre-deserialization interceptor on the consumer end could verify the signature.
Per-record custom compression
SmartCompressor implements ProducerPostSerializationInterceptor { @Override void onSend(ProducerRecord<byte[], byte[]> record) { //could be made much smarter, by CONTENT_TYPE of even sniffing for known payload types byte[] compressed = compress(record.value); if (compressed.length < record.value.length) { //only if content is actually compressible record.value = compressed; record.headers.put(COMPRESSION, "gzip".getBytes()); } } }
Large Message Support
this is a fairly complicated use case where a producer interceptor slices a large record into a sequence of smaller records and a consumer interceptor assembles them back
Slicer implements ProducerPostSerializationInterceptor { @override List<ProducerRecord<byte[], byte[]>> onSend(ProducerRecord<byte[], byte[]> serializedRecord) { //API would need to support returning List<Record> if (sizeOf(serializedRecord) < maxSingleRecordSize) { return Collections.singletonList(serializedRecord); //in other words, dont touch it } byte[][] chunks = slice(serializedRecord.value, maxSingleRecordSize - sizeOfOurExtraHeader); int numChunks = chunks.length; List<ProducerRecord<byte[], byte[]>> chunkRecords = new ArrayList<>(numChunks); for (int i=0; i<numChunks; i++) { byte[] chunk = chunks[i]; ProducerRecord<byte[], byte[]> chunkRecord; if (i==0) { //1st chunk retains all headers chunkRecord = serializedRecord; } else { chunkRecord = new ProducerRecord(serializedRecord.key, chunk, new HashMap<>(1)); //empty header map for chunks 2+ } chunkRecord.headers.put(CHUNK, (new ChunkDescriptor(i, numChunks)).serialize()); chunkRecords.add(chunkRecord); } return chunkRecords; } } Glue implements ConsumerInterceptor<K, V> { List<ConsumerRecord<K, V>> chunksSoFar = new ArrayList<>(); //making assumptions about consumer interceptor threading guarantees ... @Override ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { ConsumerRecords<K, V> reassembled = new ...; for (ConsumerRecord<K, V> record : records) { ChunkDescriptor desc = getOrNull(record.headers); if (desc == null) { //this record is not part of a bigger record, so "pass" as-is reassembled.add(record); continue; } chunksSoFar.add(record); if (desc.getNum() == desc.getCount()) { //over simplified reassembled.add(pieceBack(chunksSoFar)); chunksSoFar.clear(); } } return reassembled; } }
Broker-side custom retention and compaction
it would be possible to write a broker-side compaction and retention plugins that could honor per-message headers for things like retention
Broker-side chargeback
using the routing-trail data produced above we could write a broker-side plugin that will examine headers on incoming/outgoing records for purposes of usage-tracking and/or chargeback.
Pluggable Authentication and Authorization
another use case that would require broker-side access to headers is using them to implement authentication / authorization in a pluggable way - producer interceptor sets the headers, broker-side interceptor accepts/rejects the messages.