This document is an attempt to sketch out a Kafka metadata architecture. It would provide an API for metadata information to be added to kafka messages and a way for plugins and interceptors to add, remove, modify and otherwise act based on this metadata.
- implement a per-message metadata architecture
- allow the creation independent kafka middleware libraries
- allow the composition of metadata stacks/sets for producing, consuming and processing
- different middleware libraries (from potentially different vendors) should be able to co-exist and do not need to know of each other's existence- they all interact with a common header structure thats part of the core kafka API
- end users (people who code against KafkaProducer / KafkaConsumer API) dont have to be aware of any middleware. this enables infrastructure teams in large organizations to provide a common platform for a much larger number of internal developers in a transparent way.
- we want to enable middleware on the producer, on the consumer, and also on the broker itself.
What We Mean by Headers
generally, headers are a map of header identifiers to header values. in order for different middlewares to co-exist the they must all agree on what identifiers are, but they need not agree on what values are those identifiers are (so Map<[Identifier], byte>)
Header Identifier and Value Format
common header schemes use Map<String, String> for headers (think http), but that could result in a large size overheads - which is why for example http/2 is a binary protocol. forcing the use of Strings as header values is also too restrictive - the contents of a header should be the owning middleware's issue, and its alone. to minimize the size of messages on the wire we propose header identifiers be 32-bit values and header values be arbitrary-length blobs. also, to easily enable middleware on the broker we propose that the wire format for headers be fixed and not up to the configuration of individual producers/consumers.
Binary Identifiers and Interoperability
its easy for different middleware libraries to not interfere with each other if header identifiers are strings - for example by using reverse domain names in audit fields like java code has been doing for over 20 years. we claim its also possible for different middleware to not interfere with each other even with identifiers are (much more compact) integers - by partitioning the identifier space similar to how IPv4 ranges are. the kafka apache project could act as the "registrar" for new int identifiers, with a certain range dedicated to "internal" middleware not meant for distribution (similar in concept to 192.168.*.* IPv4 addresses).
Binary Values for flexibility
as opposed to header identifiers, which are a "global namespace" where middlewares need to be careful not to conflict, header values would be arbitrary blobs to enable maximum flexibility. some middlewares might use UTF-8 encoded Strings for values, but some may choose to use binary values to achieve a more compact message size (things like security data, checksums, tokens etc)
Agreed-Upon wire encoding for broker-side middleware
we would like to enable middleware that resides on the broker. such a component would still need to be able to decode header information (but not necessarily the message key and value) and so the wire format needs to fixed. TBD: wire format
add headers to ProducerRecord and ConsumerRecord
expose the headers to serializers and deserializers
also add the ability to intercept producer records post-serialization
another proposed pluggability-point is broker-side interceptors both inline when ingressing records and at compaction time
Example Middleware Applications Utilizing Headers
Schema-Registry-aware avro serialization
alternatively this could be more elegantly split into a producer interceptor to deposit the schema and set the header and a generic avro serializer that is not aware of schema registry
Polyglot kafka topics
this could enable a consumer to read from a topic (or set of topics) which contains messages serialized in a veriety of ways
End-to-End routing trail including mirroring
Cross-service process tracing
this examples assumes the existence of a PostSerializationProducerInterceptor, whose onSend() method is called after key and value serialization (and before header serialization)
an equivalent pre-deserialization interceptor on the consumer end could verify the signature.
Per-record custom compression
Large Message Support
this is a fairly complicated use case where a producer interceptor slices a large record into a sequence of smaller records and a consumer interceptor assembles them back
Broker-side custom retention and compaction
it would be possible to write a broker-side compaction and retention plugins that could honor per-message headers for things like retention
using the routing-trail data produced above we could write a broker-side plugin that will examine headers on incoming/outgoing records for purposes of usage-tracking and/or chargeback.
Pluggable Authentication and Authorization
another use case that would require broker-side access to headers is using them to implement authentication / authorization in a pluggable way - producer interceptor sets the headers, broker-side interceptor accepts/rejects the messages.