This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • A Case for Kafka Headers
Skip to end of metadata
Go to start of metadata

This document is an attempt to sketch out a Kafka metadata architecture. It would provide an API for metadata information to be added to kafka messages and a way for plugins and interceptors to add, remove, modify and otherwise act based on this metadata.

Goals

  1. implement a per-message metadata architecture
  2. allow the creation independent kafka middleware libraries
  3. allow the composition of metadata stacks/sets for producing, consuming and processing
  4. different middleware libraries (from potentially different vendors) should be able to co-exist and do not need to know of each other's existence- they all interact with a common header structure thats part of the core kafka API
  5. end users (people who code against KafkaProducer / KafkaConsumer API) dont have to be aware of any middleware. this enables infrastructure teams in large organizations to provide a common platform for a much larger number of internal developers in a transparent way.
  6. we want to enable middleware on the producer, on the consumer, and also on the broker itself.

What We Mean by Headers

generally, headers are a map of header identifiers to header values. in order for different middlewares to co-exist the they must all agree on what identifiers are, but they need not agree on what values are those identifiers are (so Map<[Identifier], byte[]>)

Header Identifier and Value Format

common header schemes use Map<String, String> for headers (think http), but that could result in a large size overheads - which is why for example http/2 is a binary protocol. forcing the use of Strings as header values is also too restrictive - the contents of a header should be the owning middleware's issue, and its alone. to minimize the size of messages on the wire we propose header identifiers be 32-bit values and header values be arbitrary-length blobs. also, to easily enable middleware on the broker we propose that the wire format for headers be fixed and not up to the configuration of individual producers/consumers.

Binary Identifiers and Interoperability

its easy for different middleware libraries to not interfere with each other if header identifiers are strings - for example by using reverse domain names in audit fields like java code has been doing for over 20 years. we claim its also possible for different middleware to not interfere with each other even with identifiers are (much more compact) integers - by partitioning the identifier space similar to how IPv4 ranges are. the kafka apache project could act as the "registrar" for new int identifiers, with a certain range dedicated to "internal" middleware not meant for distribution (similar in concept to 192.168.*.* IPv4 addresses).

Binary Values for flexibility

as opposed to header identifiers, which are a "global namespace" where middlewares need to be careful not to conflict, header values would be arbitrary blobs to enable maximum flexibility. some middlewares might use UTF-8 encoded Strings for values, but some may choose to use binary values to achieve a more compact message size (things like security data, checksums, tokens etc)

Agreed-Upon wire encoding for broker-side middleware

we would like to enable middleware that resides on the broker. such a component would still need to be able to decode header information (but not necessarily the message key and value) and so the wire format needs to fixed. TBD: wire format

Proposed API

add headers to ProducerRecord and ConsumerRecord

ProducerRecord.java
class ProducerRecord<K, V> {
   K key;
   V value;
   Map<Integer, byte[]> headers;
}
ConsumerRecord.java
class ConsumerRecord<K, V> {
	K key;
	V value;
	Map<Integer, byte[]> headers;
}

 

expose the headers to serializers and deserializers

Serializer.java
interface Serializer<T> {
	byte[] serialize(String topic, T data, Map<Integer, byte[]> headers);
}
Deserializer.java
interface Deserializer<T> {
	T deserialize(String topic, byte[] data, Map<Integer, byte[]> headers);
}

also add the ability to intercept producer records post-serialization

post-serialization interceptor
interface ProducerPostSerializationInterceptor {
	//key and value have already been serialized, but headers not
	ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> serializedRecord);
}

another proposed pluggability-point is broker-side interceptors both inline when ingressing records and at compaction time

Example Middleware Applications Utilizing Headers

Schema-Registry-aware avro serialization

AvroCodec.java
AvroSerializer<T extends GenericRecord> implements Serializer<T> Deserializer<T>{
	@Override
	byte[] serialize(String topic, T data, Map<Integer, byte[]> headers) {
		byte[] schemaMd5 = schemaRegistryClient.depositSchemaIfNew(...)
		headers.put(CONTENT_TYPE, "application/avro-binary".getBytes());
		headers.put(AVRO_SCHEMA_MD5, schemaMd5);
        return doSerialize(data);
	}

	@Override
	T deserialize(String topic, byte[] data, Map<Integer, byte[]> headers) {
		byte[] schemaMd5 = headers.get(AVRO_SCHEMA_MD5);
        return deDeserialize(schemaRegistryClient.get(schemaMd5), data);
	}
 }

alternatively this could be more elegantly split into a producer interceptor to deposit the schema and set the header and a generic avro serializer that is not aware of schema registry

Polyglot kafka topics

this could enable a consumer to read from a topic (or set of topics) which contains messages serialized in a veriety of ways

PolyglotDeserializer
PolyglotDeserializer implements Deserializer<T> {
	Map<String, Deserializer> byContentType = ...

	@Override
	T deserialize(String topic, byte[] data, Map<Integer, byte[]> headers) {
		String type = new String(headers.get(CONTENT_TYPE));
        return byContentType.get(type).deserialize(...);
	}
}

 

End-to-End routing trail including mirroring

RoutingTrailInterceptor
class Entry implements Serializable {
   String byWhom;
   String producedIntoCluster;
   String producedIntoTopic;
   long when;
   String readFromCluster;
   String readFromTopic;
}

PaperTrailInterceptor implements ProducerInterceptor<K, V>, ConsumerInterceptor<K, V> {
    @Override
	ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
		List<Entry> trail = getOrCreateTrail(record.headers, false); //if we're on a mirrormaker the consumer has already set the latest entry
		Entry latest = trail.get(trail.size()-1);
		latest.setProducedIntoCluster(...);
        latest.setProducedIntoTopic(record.topic);
		record.headers.put(PAPER_TRAIL, serialize(trail));
	}

	@Override
	ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
		for each record {
			List<Entry> trail = getOrCreateTrail(record.headers, true);
			Entry latest = trail.get(trail.size()-1);
    	    latest.setReadFromCluster(...);
    	    latest.setReadFromTopic(record.topic);
			record.headers.put(PAPER_TRAIL, serialize(trail));
		}
	}

	private Entry getOrCreateTrail(Map<Integer, byte[]> headers, boolean createRegardless) {
		List<Entry> trail = deserialize(headers.get(PAPER_TRAIL));
		Entry latest;
		if (trail == null || createRegardless) {
			trail = new ArrayList<>(1);            
            latest = new Entry(hostName, now);
            trail.add(latest);
		}
		return trail;
	}
}

 

Cross-service process tracing

TracingInterceptor
TraceInterceptor implements ProducerInterceptor<K, V>, ConsumerInterceptor<K, V> {

	@Override
	ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
		Object traceCookie = SomeThreadLocalContext.getContext();
		if (traceCookie != null) { //this produce call is part of some larger tx, pass it on
			record.headers.put(TRACE_COOKIE, serialize(traceCookie));
		}
	}

	@Override
	ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
		//this would really benefit from a per-record onConsume()...
	}
}

 

Signing

this examples assumes the existence of a PostSerializationProducerInterceptor, whose onSend() method is called after key and value serialization (and before header serialization)

Digital Signing
Signer implements ProducerPostSerializationInterceptor {
	@Override
	void onSend(ProducerRecord<byte[], byte[]> record) {
		byte[] signature = sign(record.value);
		record.headers.put(SIGNATURE, signature);
	}
}

an equivalent pre-deserialization interceptor on the consumer end could verify the signature.

Per-record custom compression

per-record compression
SmartCompressor implements ProducerPostSerializationInterceptor {
	@Override
	void onSend(ProducerRecord<byte[], byte[]> record) {
		//could be made much smarter, by CONTENT_TYPE of even sniffing for known payload types
		byte[] compressed = compress(record.value);
		if (compressed.length < record.value.length) {
			//only if content is actually compressible
			record.value = compressed;
			record.headers.put(COMPRESSION, "gzip".getBytes());
		}
	}
}

 

Large Message Support

this is a fairly complicated use case where a producer interceptor slices a large record into a sequence of smaller records and a consumer interceptor assembles them back

large message support
Slicer implements ProducerPostSerializationInterceptor {
	@override
	List<ProducerRecord<byte[], byte[]>> onSend(ProducerRecord<byte[], byte[]> serializedRecord) { //API would need to support returning List<Record>
		if (sizeOf(serializedRecord) < maxSingleRecordSize) {
			return Collections.singletonList(serializedRecord); //in other words, dont touch it
		}
		byte[][] chunks = slice(serializedRecord.value, maxSingleRecordSize - sizeOfOurExtraHeader);
		int numChunks = chunks.length;
		List<ProducerRecord<byte[], byte[]>> chunkRecords = new ArrayList<>(numChunks);
		for (int i=0; i<numChunks; i++) {
			byte[] chunk = chunks[i];
			ProducerRecord<byte[], byte[]> chunkRecord;
			if (i==0) {
				//1st chunk retains all headers
				chunkRecord = serializedRecord;
			} else {
				chunkRecord = new ProducerRecord(serializedRecord.key, chunk, new HashMap<>(1)); //empty header map for chunks 2+
			}
			chunkRecord.headers.put(CHUNK, (new ChunkDescriptor(i, numChunks)).serialize());
			chunkRecords.add(chunkRecord);
		}
		return chunkRecords;
	}
}

Glue implements ConsumerInterceptor<K, V> {
	List<ConsumerRecord<K, V>> chunksSoFar = new ArrayList<>(); //making assumptions about consumer interceptor threading guarantees ...

	@Override
	ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
		ConsumerRecords<K, V> reassembled = new ...;

		for (ConsumerRecord<K, V> record : records) {
			ChunkDescriptor desc = getOrNull(record.headers);
			if (desc == null) {
				//this record is not part of a bigger record, so "pass" as-is
				reassembled.add(record);
				continue;
			}
			chunksSoFar.add(record);
			if (desc.getNum() == desc.getCount()) { //over simplified
				reassembled.add(pieceBack(chunksSoFar));
				chunksSoFar.clear();
			}
		}
		
		return reassembled;
	}
}

 

Broker-side custom retention and compaction

it would be possible to write a broker-side compaction and retention plugins that could honor per-message headers for things like retention

 

Broker-side chargeback

using the routing-trail data produced above we could write a broker-side plugin that will examine headers on incoming/outgoing records for purposes of usage-tracking and/or chargeback.

 

Pluggable Authentication and Authorization

another use case that would require broker-side access to headers is using them to implement authentication / authorization in a pluggable way - producer interceptor sets the headers, broker-side interceptor accepts/rejects the messages.

  • No labels