Current stateUnder Discussion

Discussion thread: None

JIRA: None

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


At Intuit, we read messages of Kafka and decrypt using an in-house deserializer. This deserializer can throw an exception, either temporary (due to loss of network connection) or permanent (e.g. corrupted data), in which case the KafkaConsumer throws the exception.

Recovery is not straight-forward.

In the permanent case, we just want to skip the message (maybe writing it to a dead-letter queue).

In the temporary case, we might wish to retry the decryption before either giving up, or re-throwing exception.

We want to combine that with support for deserialization to CloudEvents. 

It would be good to establish a pattern for this, rather than each team or organization coming up with their own solutions.

  • The community benefits from having gotten it right, once, one time.
  • Everyone understands how they work.
  • Well thought out by the Kafka team.
  • No proprietary solutions.

Public Interfaces

  • org/apache/kafka/common/serialization/ExceptionHandlingDeserializer
  • org/apache/kafka/common/serialization/RetryDeserializer
  • org/apache/kafka/common/serialization/PipeDeserializer
  • org/apache/kafka/common/serialization/PipeSerializer

Proposed Changes

I propose to introduce three new deserializers and one new serializer.

They all follow the decorator pattern. 


This class wraps a delegate deserializer specified by a property:


To deserialize messages, it calls the delegate. If the delegate throws an exception, it return an object contaiing the exception, if there is no exception, then it simple returns the result.

We only want to support some exception (the permanent ones), so we need a matching strategy.


The matcher needs to implement a functional interface:

public interface Matcher {
boolean matches(Exception e);

Proof of concept PR


Again, this wraps a delegate deserializer specified by a property:


The backoff policy is configurable and pluggable. One strategy would be no back-off attempts. No back-off would be suitable for exceptions that are permanent.


The above deserializer need to be composed together to create more complex serialization and deserialization. For example,

  1. Decrypt message into bytes
  2. Wrap that decryption in retry.
  3. Wrap the retry in exception handling. 
  4. Convert the result into an object (e.g. a CloudEvent using CloudEventsDeserializer).

Putting that all together:


Proof of concept PR

Compatibility, Deprecation, and Migration Plan

  • New deserializers that support many common patterns.
  • Well understood

Test Plan

These can all be unit tested. They also need to be battle-tested with users.

Rejected Alternatives

Anyone can build themself in-house monolithic deserializer, but that's inflexible and results in deserializer explosion, ultimately you end up with CloudEventsDecryptingRetryExceptionHandlingDeserializier. I hope I don't need to explaining. LMK if you don't understand why that is undesirable. 

  • No labels