Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note: This KIP is a result of working with members of Apache Cassandra community who are working on  CEP-44: Kafka integration for Cassandra CDC using Sidecar and one of the limitations they have faced was large message sizes 

Motivation

Kafka has a limit for message size which limits some use cases where they might have messages that are larger than message.max.bytes even after enabling compression on the producer side or after applying serialization formats like Apache Avro or Protocol Buffers to reduce payload size.  Increasing message size indefinitely is not a viable solution as it can lead to performance degradation, memory issues, and instability of the broker. And by looking at some of the enterprise/cloud offerings of Kafka you can see that on average they can offer 8MB to 10MB as max message size.

Popular Patterns to solve this

At the moment of writing this KIP, there are two famous patterns to handle without increasing message.max.bytes

1. Message Chunking (Splitting and Reassembly message)

Break large messages into smaller chunks, send them sequentially, and reassemble on the consumer side.

  • How it works
    • Split the payload into chunks smaller than message.max.bytes.
    • Assign the same key to all chunks to ensure they land in the same partition, preserving order.
    • Reassemble chunks on the consumer using metadata (e.g., sequence IDs, total chunk count).
  • Considerations
    • Requires custom logic for splitting/reassembling
    • Consumers must handle out-of-order or missing chunks
  • Available open-Source implementation:

2. Reference-Based Messaging

Store the large payload externally and send only a reference (e.g URI, databases key) in the Kafka message.

  • How it works
    • Upload the payload to external storage (e.g., S3, HDFS, or a database).
    • Send a reference (e.g., s3://bucket/key) via Kafka.
    • Consumers fetch the payload using the reference
  • Considerations
    • Reduces Kafka network/storage load.
    • Introduces dependency on external systems.
  • Available open-source implementation
    • There isn't a specific open-source implementation for Reference-Based Messaging. Mostly everyone adopting this has their custom solution
PatternProsCons
ChunkingNo external storage required Complex client logic to split and reassemble messages
Reference-BasedMinimizes Kafka loadExternal system dependency

This KIP is proposing

  1. A serializer in Apache Kafka that implements Reference-Based Messaging as this is the simplest one.
  2. A notion of a Composable  serializer where the client can apply a list of serializers before applying a large message serializer


Note: 

This KIP will benefit CEP-44: Kafka integration for Cassandra CDC using Sidecar 

Public Interfaces

1. New Producer/Consumer configs to support Composable Serializer/Deserializer

In some use-cases we might need a way to be able to chain of Kafka serializer/deserializer before applying large message serializer for example need to apply schema or special format like avro or protobuf. We could just create a single org.apache.kafka.common.serialization.largemessage.Serializer that implemented the Kafka Serializer interface, but we would need to create the different versions (AvroSerializer, ProtobufSerializer) with their support for the schema.
Instead the KIP proposing a composable serializer that allows concatenating several serializers to perform what we are looking for here. 

This will be enabled by new configs in Producer/Consumer side 

2. org.apache.kafka.common.serialization.largemessage.Serializer

A configurable Kafka serializer that:

  • Check if the size of the data (bytes) needs to serialize is larger than the configured threshold.
    • If it is large than the provided threshold (`large.message.threshold.bytes`):
      • Use the provided PayloadStore implementation to publish the large message into payload store, generating an id which is the reference to access this later.
      • Pass payload Id as new Kafka Event down to Kafka 
      • Add a large-message: true header
    • If it’s not large then provided threshold (large.message.threshold.bytes):
      • Do nothing, pass the data as it is.

3.org.apache.kafka.common.serialization.largemessage.Deserializer

A configurable Kafka deserializer that performs:

  • Check if the event is large message by looking for large-message: true header
    • If it does have `large-message` header:
      • Parse the event and retrieve the ID and the needed useful information on the event.
      • Use the provided PayloadStore implementation to download the original data from the payload-store.
      • Return payload as the final value
    • If it doesn't have the large-message header:
      • Do nothing return the Kafka message as it is

Proposed Changes

1.  New ProducerConfig to support Composable Serializer

Configuration

KeyDescriptionTypeDefaultRequired
value.serializersList of serializers class that implements the `org.apache.kafka.common.serialization.Serializer` interface in order. The first serializer is `Serializer<T>` while the remaining serializers must be `Serializer<byte[]>`. Can't be configured with value.serializer at the same time.List<Serializer>NoneNo, If not exist the code will fail back to value.serializer. Can't exist with  value.serializer
key.serializersList of serializers class that implements the `org.apache.kafka.common.serialization.Serializer` interface in order. The first serializer is `Serializer<T>` while the remaining serializers must be `Serializer<byte[]>`.  Can't be configured with key.serializer at the same time.List<Serializer>NoneNo, If not exist the code will fail back to key.serializer. Can't exist with key.serializer


2. New ConsumerConfig to support Composable Deserializer

Configuration

KeyDescriptionTypeDefaultRequired
value.deserializersList of deserializer classes that implements the `org.apache.kafka.common.serialization.Deserializer` interface in order. The last deserializer must be `Deserializer<T>` while the rest must be `Deserializer<byte[]>`.  Can't be configured with value.deserializer at the same time.List<Deserialzer>NoneNo, If not exist the code will fail back to value.deserializer. Can't exist with value.deserializer
key.deserializersList of deserializer classes that implements the `org.apache.kafka.common.serialization.Deserializer` interface in order. The last deserializer must be `Deserializer<T>` while the rest must be `Deserializer<byte[]>`. Can't be configured with key.deserializer at the same time.List<Deserialzer>NoneNo, If not exist the code will fail back to key.deserializer. Can't exist with key.deserializer

3. org.apache.kafka.common.serialization.largemessage.Serializer<byte[]>

Configuration

KeyDescriptionTypeDefaultRequired
large.message.payload.store.classImplementation of org.apache.kafka.common.serialization.largemessage.PayloadStore .ClassNoneYes
large.message.threshold.bytes The maximum size of the message is considered large.Long1MB (Default of max.message.bytes)No

4.org.apache.kafka.common.serialization.largemessage.Deserializer<byte[]>

Configuration

KeyDescriptionTypeDefaultRequired
large.message.payload.store.classImplementation of org.apache.kafka.common.serialization.largemessage.PayloadStore .ClassNoneYes

5.org.apache.kafka.common.serialization.largemessage.PayloadStore

/**
* An interface for publishing and downloading serialised data to/from payload store.
* The store config will passed down from the original config of the Kafka producer/consumer client. 
*/
public interface PayloadStore implements Configurable, Closeable {
     /**
     * Publish data into the store.
     *
     * @param data data that will be published to the store.
     * @return full path to object in the store.
     * @throw PayloadStoreException in case failed to publish to the store.
     */
     String publish(String topic, byte[] data) throw PayloadStoreException;

    /**
     * Download full data from the store.
     *
     * @param fullPath of the data's reference in the store for example `remote_store/topic_name/<record_random_uuid>`
     * @return content of the object as bytes.
     * @throw PayloadStoreException 
     */
     byte[] download(String fullPath) throw PayloadStoreException;

    /** 
    * Generate an id for the data's reference in the store (Not the full path in the store). 
    * By default the id is a random UUID however some stores might need more smarter way to calculate 
    * its reference id based on the data itself. In such a case please override this method. 
    * @param data data that will be published to the store.
    * @return object id as string, the default implementation support UTF-8 encoding. If any other encoding is required the 
    *        implementation will need to address this.
    default String id(byte[] data) {
       return java.util.UUID.randomUUID().toString(); // Java UUID is already composed entirely of ASCII characters which are valid UTF-8
    }
}

6. org.apache.kafka.common.serialization.largemessage.PayloadStoreException

/**
* Exception class that represent exceptions during interaction with the store.
* this helps the Payload store to decided either to retry or to crash. 
* The final Serializer and Deserializer will propagate this as SerializationException to client.
**/
public class PayloadStoreException extends RuntimeException {
    /**
     * Constructor PayloadStoreException with message and throwable.
     */
    public PayloadStoreException(String message, Throwable t) {
        super(message, t);
    }

    /**
     * Constructor PayloadStoreException with message.
     */
    public PayloadStoreException(String message) {
        super(message);
    }

    /**
     * Constructor PayloadStoreException with throwable.
     */
    public PayloadStoreException(Throwable t) {
        super(t);
    }
}


Example

Producer

Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("value.serializers", "kafka.serializers.KafkaAvroDeserializer, org.apache.kafka.common.serialization.largemessage.Serializer");
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerConfig.put("large.message.payload.store.class", "myclient.payloadstore.CustomS3Store")
producerConfig.put("large.message.threshold.bytes", 1048576);
producerConfig.put("s3.bucket", "my-bucket")
producerConfig.put("s3.retry.attempts", "3");
producerConfig.put("s3.connection.timeout.ms", "5000");
producerConfig.put("bootstrap.servers", "localhost:9092");

KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerConfig);

Consumer

Map<String, Object> consumerConfig = new HashMap<>();
producerConfig.put("value.deserializers", "org.apache.kafka.common.serialization.largemessage.Serializer, kafka.serializers.KafkaAvroDeserializer");
consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer"); consumerConfig.put("large.message.payload.store.class", "myclient.payloadstore.CustomS3Store")
consumerConfig.put("s3.bucket", "my-bucket")
consumerConfig.put("s3.retry.attempts", "3");
consumerConfig.put("s3.connection.timeout.ms", "5000");
consumerConfig.put("bootstrap.servers", "localhost:9092");

KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(consumerConfig);

Considerations

  • TTL Configuration Risk: If the payload store owner doesn't configure an appropriate TTL that aligns with Kafka topic retention, the payload store may grow indefinitely. This occurs because objects remain in storage even after Kafka no longer references them, leading to unnecessary storage costs.

  • TTL Too Short Risk: If the TTL is set too aggressively (shorter than needed), Kafka references may point to objects that no longer exist in the payload store. When this happens:

    • Consumers will encounter NOT_FOUND errors

    • To prevent blocking behavior, consumers should capture SerializationException::getCause and decide what to do if the exception is PayloadException/PayloadNotFoundException.

    • This allows graceful handling of missing payload references

  • TTL and usage of Compacted Topic: for applications that do use compacted topics with large payloads, the PayloadStore implementation should handle this by:
    • Consistent ID Generation: PayloadStore implementations should use deterministic IDs based on message content or metadata (rather than random UUIDs) so that identical payloads can reuse the same storage object, reducing storage costs.

    • TTL Strategy:

      • Since compacted topics can retain data indefinitely, users must accept the indefinite storage costs that comes with this case. If cost is an issue they will need to setup a cleanup job that observer Kafka key with null value and delete the associated payload with this key. 

      • Topics with cleanup.policy=compact,delete  will eventually remove old data, so standard TTL approaches work normally.

Recommendation: Set TTL duration to exceed your Kafka topic retention period by a reasonable buffer (e.g., 10-20%) to ensure payload availability throughout the message lifecycle while preventing indefinite storage growth.

  • Partial Failure Handling: If payload storage succeeds but Kafka produce fails, the payload will remain in storage until TTL expires. This is acceptable as it only affects storage costs, not correctness. If Kafka message is consumed but payload download fails, the client can capture SerializationException::getCause and determine behavior.
  • Critical Timing Constraints: The total timeout for payload store operations (including all retries) cannot exceed `max.block.ms` for Kafka producers or `max.poll.interval.ms` for Kafka consumers. This is a fundamental requirement for any implementation of org.apache.kafka.common.serialization.largemessage.PayloadStore.

    • Exceeding these limits will cause producer blocking or consumer rebalancing issues.

  • Memory Constrains: Large messages will consume memory during serialization so ensure heap size can accommodate your largest expected message. Consider PayloadStore implementations that support compression to reduce storage footprint.

Compatibility, Deprecation, and Migration Plan

  • Old clients just need to set the needed configuration to use this feature

Rejected Alternatives

  • We have rejected implementing the chunking pattern due to its many potential edge cases and complexity added to the consumer side. A peak of those complexities can be explored more deeply in the LinkedIn presentation.
  • Implement this as a separate project outside of Apache Kafka as this seems to be a pattern that needs more use cases and it would be better to have this in Apache Kafka as native implementation instead of a separate project
  • Implement the AsyncPayloadStore: This doesn't provide any benefit since Kafka's Serializer interface is synchronous/blocking - any async operations would need to immediately call .get(), negating the async benefits. Also the reality is that most of use-cases that needs more > 1MB messages:
    • Are dealing with bulk data (collections, arrays)

    • Can afford the memory cost during brief serialization

    • Run on appropriately sized JVMs

              Usecases with truly memory-constrained environments probably shouldn't be sending GB-sized messages through Kafka anyway.

  • No labels