Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
* An interface for publishing and downloading serialised data to/from payload store.
* The store config will passed down from the original config of the Kafka producer/consumer client. 
*/
public interface PayloadStore implements Configurable, Closeable, Monitorable {
     /**
     * Publish data into the store.
     *
     * @param data data that will be published to the store.
     * @return full path to object in the store.
     * @throw PayloadStoreException in case failed to publish to the store.
     */
     String publish(String topic, byte[] data) throw PayloadStoreException;

    /**
     * Download full data from the store.
     *
     * @param fullPath of the data's reference in the store for example `remote_store/topic_name/<record_random_uuid>`
     * @return content of the object as bytes.
     * @throw PayloadStoreException 
     */
     byte[] download(String fullPath) throw PayloadStoreException;

    /** 
    * Generate an id for the data's reference in the store (Not the full path in the store). 
    * By default the id is a random UUID however some stores might need more smarter way to calculate 
    * its reference id based on the data itself. In such a case please override this method. 
    * @param data data that will be published to the store.
    * @return object id foras example `record_random_uuid`
	*/
    default String id(byte[] data) {
       return UUID.randomUUID().toString();
    }
}

6. org.apache.kafka.common.serialization.largemessage.PayloadStoreException

string, the default implementation support UTF-8 encoding. If any other encoding is required the 
    *        implementation will need to address this.
    default String id(byte[] data) {
       return java.util.UUID.randomUUID().toString(); // Java UUID is already composed entirely of ASCII characters which are valid UTF-8
    }
}

6. org.apache.kafka.common.serialization.largemessage.PayloadStoreException

Code Block
/**
* Exception class that represent exceptions during interaction with the store.
* this helps the Payload store to decided either to retry or to crash. 
* The final Serializer and Deserializer will propagate this as SerializationException to client.
**/
public class PayloadStoreException extends RuntimeException {
    /**
     * Constructor PayloadStoreException with message and throwable.
     */
    public PayloadStoreException(String message, Throwable t) {
        
Code Block
/**
* Exception class that represent exceptions during interaction with the store.
* this helps the Payload store to decided either to retry or to crash. 
* The final Serializer and Deserializer will propagate this as SerializationException to client.
**/
public class PayloadStoreException extends RuntimeException {
    /**
     * Constructor PayloadStoreException with message and throwable.
     */
    public PayloadStoreException(String message, Throwable t) {
        super(message, t);
    }

    /**
     * Constructor PayloadStoreException with message.
     */
    public PayloadStoreException(String message) {
        super(message);
    }

    /**
     * Constructor PayloadStoreException with throwable.
     */
    public PayloadStoreException(Throwable t) {
        super(t);
    }
}

...


Example

Producer

Code Block
languagejava
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("value.serializers", "kafka.serializers.KafkaAvroDeserializer, org.apache.kafka.common.serialization.largemessage.Serializer");
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerConfig.put("large.message.payload.store.class", "myclient.payloadstore.CustomS3Store")
producerConfig.put("large.message.threshold.bytes", 1048576);
producerConfig.put("s3.bucket", "my-bucket")
producerConfig.put("s3.retry.attempts", "3");
producerConfig.put("s3.connection.timeout.ms", "5000");
producerConfig.put("bootstrap.servers", "localhost:9092");

KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerConfig);

Consumer

Code Block
languagejava
Map<String, Object> producerConfigconsumerConfig = new HashMap<>();
producerConfig.put("value.serializersdeserializers", "kafka.serializers.KafkaAvroDeserializer, org.apache.kafka.common.serialization.largemessage.Serializer, kafka.serializers.KafkaAvroDeserializer");
producerConfigconsumerConfig.put("key.serializerdeserializer", "org.apache.kafka.common.serialization.StringSerializer"); producerConfigconsumerConfig.put("large.message.payload.store.class", "myclient.serializerspayloadstore.payload.store.CustomS3Store")
producerConfigconsumerConfig.put("large.message.threshold.bytes", 1048576);
producerConfig.put("s3.bucket", "my-bucket")
producerConfigconsumerConfig.put("s3.retry.attempts", "3");
producerConfigconsumerConfig.put("s3.connection.timeout.ms", "5000");
producerConfigconsumerConfig.put("bootstrap.servers", "localhost:9092");

KafkaProducer<StringKafkaConsumer<String, Double>Object> producerconsumer = new KafkaProducer<>KafkaConsumer<>(producerConfigconsumerConfig);

Considerations

  • TTL Configuration Risk: If the payload store owner doesn't configure an appropriate TTL that aligns with Kafka topic retention, the payload store may grow indefinitely. This occurs because objects remain in storage even after Kafka no longer references them, leading to unnecessary storage costs.

  • TTL Too Short Risk: If the TTL is set too aggressively (shorter than needed), Kafka references may point to objects that no longer exist in the payload store. When this happens:

    • Consumers will encounter NOT_FOUND errors

    • To prevent blocking behavior, consumers should capture SerializationException::getCause and decide what to do if the exception is PayloadException/PayloadNotFoundException.

    • This allows graceful handling of missing payload references

  • TTL and usage of Compacted Topic: for applications that do use compacted topics with large payloads, the PayloadStore implementation should handle this by:
    • Consistent ID Generation:PayloadStore implementations should use deterministic IDs based on message content or metadata (rather than random UUIDs) so that identical payloads can reuse the same storage object, reducing storage costs.

    • TTL Strategy:

      • Since compacted topics can retain data indefinitely, users must choose between setting a business-appropriate TTL or accepting accept the indefinite storage costs that comes with this case. If cost is an issue they will need to setup a cleanup job that observer Kafka key with null value and delete the associated payload with this key. 

      • Topics with cleanup.policy=compact,delete  will eventually remove old data, so standard TTL approaches work normally.

...