Current state: Under Discussion
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Most large enterprises with sizable Kafka deployments support a large number of polyglot clients. Today, enforcing platform-wide conventions and best practices in these decentralized environments - e.g. schema validation, serialization, privacy enforcement, etc. -, either force platform teams to distribute custom logic in client library extensions and/or maintain special-purpose applications that enforce standards across the platform. Neither of these approaches are without their challenges. I will illustrate this with some examples below.
Managing a set of library extensions in multiple languages is a non-trivial task that poses a variety of challenges:
- Having to reimplement the same feature across multiple languages can create substantial development overhead for platform teams.
- Duplicate implementations can also increase the risks of faulty releases, inconsistencies in implementation and regressions.
- Moreover, any change in platform standards or best practices is very likely to impose migration costs on client teams, who will need to bump their apps to new library versions.
- Having to rely on clients to migrate can significantly delay the rollout of important new features. This is particularly true in the case of changes that break existing producer-consumer contracts.
- Even if a platform team succeeds in building robust library extensions, it is very hard to ensure that clients only ever use Kafka through these libs.
Special-purpose platform apps
Another popular pattern for enforcing platform standards and distributing features to all clients is through the deployment of what I refer to as (special-purpose) platform apps. Think of the stream processing application that performs some sort of stateless transformation on every message, and writes the result back to Kafka. In many instances, this architecture might be warranted (e.g. if the computations are expensive or the logic is complex). But in a lot of cases, these systems emerged as a way to work around the library extension pattern’s limitations and allow platform owners to deploy features at scale without managing the lifecycle of library extensions. Unfortunately, using platform apps to enforce standards is an expensive strategy with rather poor scalability features. For instance, if a company decides to solve privacy enforcement through a platform app that redacts every Kafka message in-flight, the system will double disk space, the number of partitions and require a large amount of additional compute.
A third way: broker interceptors
This KIP wants to provide a third way for platform owners: the ability to extend Kafka’s server-side behavior through broker interceptors. At their core, broker interceptors can be thought of as very lightweight, stateless stream processors that can intercept, mutate and filter messages either at produce or consume time.
The interceptor (or module) pattern is very common in the OSS community. Nginx offers modules for extending the proxy functionality with custom logic. Kubernetes uses custom resources. In the streaming world, Redpanda supports server-side processing through WASM.
To date, Kafka hasn't released a comparable feature to OSS users, and this KIP wants to to change that through the addition of broker interceptors to the stack.
Broker interceptors would allow platform owners to either fully move the enforcement of certain messaging standards to the brokers, or to perform two-stage rollouts where new features are first deployed server-side, until the corresponding library extensions are ready and rolled out.
Of course, this design isn’t without its challenges and pitfalls (primarily concerning performance). When defining interceptors, operators would need to exercise good judgement to avoid causing cluster-wide performance degradation.
- Server-side schema validation is supported by the Confluent server, which presupposes a similar feature
The main new interface required for produce-time interceptors is the abstract
ProduceRequestInterceptor class. User-defined implementations of this class would serve as a container for lightweight server-side record pre-processing. The class'
processRecord method enables three basic types of operations:
- Map: Mutate the original payload, and return the updated key and value in a
- Filter: Remove a record from the batch by throwing a
- Side-effect: Call an external system and keep the original record batch intact
Once a user-defined implementation of
ProduceRequestInterceptor is compiled and added to the classpath of the Kafka runtime, it could be registered as an interceptor with the help of three new broker-side config options:
- produce.request.interceptors: A list of class names that implement the
ProduceRequestInterceptorinterface and should be loaded from the classpath during startup.
- produce.request.interceptors.timeout.ms: The total amount of time in ms that produce request interceptors have to finish processing a request.
- produce.request.interceptors.max.timeout.retries: The number of times that interceptor processing can be retried in the face of timeouts. Once the retries have been exhausted, a timeout will result in a
I'm proposing the definition of a new error response that is returned by the server if the produce request interceptor encounters an unexpected error.
See this draft PR for a good overview.
Compatibility, Deprecation, and Migration Plan
As a completely new opt-in feature, adding support for broker interceptors should be fully backward compatible with previous versions.
This PR already contains a good number of integration tests, though a few important ones are still missing (e.g. a produce request that contains data for multiple partitions). More tests could also be added to cover the behavior of
ProduceRequestInterceptorManager 's methods.