Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-7596
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
A Kafka audit system is very useful for both users and developers of the streaming platform since it reveals system insights. For example, one question that could be answered with audit information is that which application consumes or produces how much data to which topic(s) over what period of time. Such Insights derived from audit information opens up opportunities. For example,
Traffic analysis. It allows detailed analysis of traffic on particular topics, for particular principles, or for specific request types. Useful information could be derived from the audit information. For example, user can easily figure out what topics have not been accessed for some certain period of time. With this information, users can initiate some topic deletion process and etc.
Correctness analysis and reporting. There could be various definitions on “correctness”. For example, “correctness” could be defined as whether the number of records produced to a Kafka cluster equals the number of records that is mirrored to another cluster.
SLO monitoring. The response time to a request could be broken down by topic, principle or any other arbitrary criteria.
Cost attribution. For example, the number of bytes produced or consumed could be broken down by topic, principle or any other arbitrary criteria.
Sources of an audit system are where the auditing information is initially generated. Kafka server could be a source since it can access every request and its response.
Public interfaces
Define a ObservableStruct interface:
This interface defines the methods that can be used in the client's implementation of the observer to extract information from the request.
Change the Struct class to implement the ObservableStruct interface. Only "implements ObservableStruct" is added and no other change is required.
public class Struct implements ObservableStruct { // ... }
Define ObservableStructImpl class which also implements the ObservableStruct interface:
The instantiated instance of this class is used by the observer.
The class hierarchy of the Struct class is that the ObservableStruct interfaces define a set of getters and the interface is implemented by the Struct class and the ObservableStructImp class.
Define an Observer interface:
/** The Observer interface. This class allows user to implement their own auditing solution. * Notice that the Observer may be used by multiple threads, so the implementation should be thread safe. */ public interface Observer extends Configurable { /** * Record the request * * @param struct Struct information in a request. * @param clientAddress Client host IP information. * @param principal Client principle from its request. * @param apiKey Request type information. * @param apiVersion Version of the request. * @param correlationId An ID used to correspond a recordRequest method call to a recordResponse method call. */ public void recordRequest(ObservableStructImpl struct, InetAddress clientAddress, KafkaPrincipal principal, ApiKeys apiKey, short apiVersion, String correlationId); /** * Record the request * * @param struct Struct information in a response. * @param apiVersion Version of the response. * @param correlationId An ID used to correspond a recordRequest method call to a recordResponse method call. */ public void recordResponse(ObservableStructImpl struct, short apiVersion, String correlationId); /** * Close the observer with timeout. * * @param timeout The maximum time to wait to close the observer. * @param unit The time unit. */ public void close(long timeout, TimeUnit unit); }
Observers are instantiated by calling the no-arg constructor, then the configure() method is called, and then close() is called with no further recordRequest() nor recordResponse() calls expected.
With this interface and its given arguments, the information we can extract from every produce or fetch request is:
Topic name
Client principal
Client IP address
Number of bytes produced or consumed
Records count (if the batch version is 2 or greater, otherwise a deep decompression is required to get the count)
Proposed Changes
Add interfaces and implementations for classes that are described above and a configuration property in the KafkaConfig class to allow users to specify implementations of the observer interface. The configuration property lets user define a list of observer implementations which are going to be invoked.
/** ********* Broker-side Observer Configuration ****************/ val ObserverClassProp = "observer.class.names"
Add code to the broker (in SocketServer, RequestContext and AbstractResponse) to allow Kafka servers to invoke all observers defined on each request and each response. More specifically, initialize a list of observer instances in the SocketServer and pass them to RequestContext and AbstractResponse. For example, in SocketServer:
// Initialize a list of observer instances using configuration and classes provided by user private val observers = config.getConfiguredInstances(KafkaConfig.ObserverClassesProp, classOf[Observer])
In RequestContext:
// This method is called on every RequestContext instance and RequestContext instance is constructed when SocketServer receives a request public RequestAndSize parseRequest(ByteBuffer buffer) { if (isUnsupportedApiVersionsRequest()) { // Unsupported ApiVersion requests are treated as v0 requests and are not parsed ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0, header.apiVersion()); return new RequestAndSize(apiVersionsRequest, 0); } else { ApiKeys apiKey = header.apiKey(); try { short apiVersion = header.apiVersion(); Struct struct = apiKey.parseRequest(apiVersion, buffer); // Execute the observing logic only when there is at least on observer implementation is provided if (observers != null && observers.size() > 0) { // Convert Struct to ObservableStructImpl which is recorded by each observer ObservableStructImpl observableStruct = new ObservableStructImpl(struct); for (Observer observer : observers) { observer.recordRequest(observableStruct, clientAddress, principal, apiKey, apiVersion, connectionId); } } AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); return new RequestAndSize(body, struct.sizeOf()); } catch (Throwable ex) { throw new InvalidRequestException("Error getting request for apiKey: " + apiKey + ", apiVersion: " + header.apiVersion() + ", connectionId: " + connectionId + ", listenerName: " + listenerName + ", principal: " + principal, ex); } } }
In AbstractResponse:
// This method is called on every response protected Send toSend(String destination, ResponseHeader header, short apiVersion, List<Observer> observers) { return new NetworkSend(destination, serialize(destination, apiVersion, header, observers)); } public ByteBuffer serialize(String connectionId, short version, ResponseHeader responseHeader, List<Observer> observers) { Struct responseStruct = toStruct(version); // Execute the observing logic only when there is at least on observer implementation is provided if (observers != null && observers.size() > 0) { ObservableStructImpl observableStruct = new ObservableStructImpl(responseStruct); for (Observer observer : observers) { observer.recordResponse(observableStruct, version, connectionId); } } return serialize(responseHeader.toStruct(), responseStruct); }
There are mostly two reasons why observers are invoked in these two classes:
- Each request/response instance is passed through those functions so that the observer can truly intercepts each request/response.
- It is in these functions (serialize() and parseRequest()) that the transformation from/to Struct happens. In other words, the natural lifecycle of Struct instance exists only in these functions. Therefore convert Struct into ObservableStructImpl which get passed to observers could only happen in these functions.
An example of how user could implement the recordRequest method and recordResponse method from the observer interface to record on the Produce request and Fetch response:
public void recordRequest(ObservableStructImpl struct, InetAddress clientAddress, KafkaPrincipal principal, ApiKeys apiKey, short apiVersion, String correlationId) { if (apiKey == ApiKeys.PRODUCE) { // The information extracted from the ObservableStructImpl Map<String, Integer> topicProduceBytesCount = new HashMap<>(); for (ObservableStructImpl topicDataStruct : struct.getObservableStructImplArray("topic_data")) { String topic = topicDataStruct.get(TOPIC_NAME); for (ObservableStructImpl partitionResponseStruct : topicDataStruct.getObservableStructImplArray("data")) { MemoryRecords records = (MemoryRecords) partitionResponseStruct.getRecords("record_set"); if (topicProduceBytesCount.containsKey(topic)) { topicProduceBytesCount.put(topic, topicProduceBytesCount.get(topic) + records.sizeInBytes()); } else { topicProduceBytesCount.put(topic, topicProduceBytesCount.get(topic) + records.sizeInBytes()); } } } } else if (apiKey == ApiKeys.FETCH) { // ... } } public void recordResponse(ObservableStructImpl struct, short apiVersion, String correlationId) { // ... // Use correlationId to figure out to what request type this response is // ... // The information extracted from the ObservableStructImpl Map<String, Integer> topicFetchBytesCount = new HashMap<>(); for (ObservableStructImpl topicDataStruct : struct.getObservableStructImplArray("responses")) { String topic = topicDataStruct.get(TOPIC_NAME); for (ObservableStructImpl partitionResponseStruct : topicDataStruct.getObservableStructImplArray("partition_responses")) { BaseRecords baseRecords = partitionResponseStruct.getRecords("record_set"); if (!(baseRecords instanceof MemoryRecords)) throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass()); MemoryRecords records = (MemoryRecords) baseRecords; if (topicFetchBytesCount.containsKey(topic)) { topicFetchBytesCount.put(topic, topicFetchBytesCount.get(topic) + records.sizeInBytes()); } else { topicFetchBytesCount.put(topic, topicFetchBytesCount.get(topic) + records.sizeInBytes()); } } } }
With ObservableStructImp, user of observer can interact with Struct in a read-only way. In the above example, the way information is extracted from the ObservableStructImpl is very similar to the "toStruct()" method for each specific request and response.
Rejected Alternatives for Interfaces
There are two set of rejected interfaces.
The first approach is to introduce one interface (with implementation) for each kind of AbstractRequest and AbstractResponse. Each interface would have a set of specific getters that are necessary to retrieve commonly important information from that request/response. For example, for a ProduceRequest, a getter to get a mapping from topic/partition to the number of bytes/records produced is commonly important. Then user can use those interfaces to retrieve request/response-specific information in their observer implementation. The issue with this approach is that the number of interfaces (with implementation) is too large due to the fact that we have about 40 requests and 40 responses. It is not trivial to define exactly what set of getters to be defined on each interface. Maintainability and extensibility are not ideal either. On the other side, the proposed approaches do not introduce 80 interfaces plus implementation and do not have much need to be extended potentially.
The second approach is to introduce a generic observer interface:
public interface Observer <T extends AbstractRequest, R extends AbstractResponse> { // ... public void record(T request, R response); // ... }
So that user can provide implementations such as:
class ProduceObserver <ProduceRequest, ProduceResponse> { // ... public void record(ProduceRequest request, ProduceResponse response); // ... }
In this approach, Observer implementation is type-specific which means there is one implementation of the observer interface corresponds to only one type of request/response. This approach is straight-forward and the interface is simple and clean. However the fatal issue is that users can cast the specific request/response implementation class to the abstract class which is considered as the internal classes that should not be exposed and freeze these into a public API would hinder future evolution of the project. The current proposed interfaces base on the Struct class which essentially represents the Kafka protocol format("wire format") which is public. With knowledge about the format of each request/response, user could extract information in a flexible way. Thus the proposed interfaces are unlikely to hinder the evolution of the project.
Compatibility, Deprecation, and Migration Plan
N/A. No observers are defined by default. Test of any other non-trivial implementation should be done by its implementers. Exceptions thrown by any of the observer(s) should be caught and the functionalities of the broker should not be affected.
Rejected Alternatives for Kafka Audit
Besides the broker-side auditing approach, the alternative approach is client-side auditing which means the auditor is embedded in the producer and consumer. From what we have learned in practice, the major issue of the client-side auditing approach which badly affects the usability of the auditing system is the lack of central administration on user’s selection of Kafka clients and versions of Kafka clients to use in their applications. Because there are many Kafka clients (even ones implemented in different programming languages such as Golang, C++, Python and etc) and versions. There are three direct consequences.
Lack of coverage due to the sheer number of users, slow deployment of diverse clients, and hard to enforce compliance.
In order to support diverse clients, various implementations of the auditor have to be provided which causes engineering overhead.
The slow rate of development on the audit system again due to the number of clients.
These existing Kafka server metrics provide lower-level insights into the system such as byte-in/out rates. However, an audit system complements the operation-level server metrics by providing additional application-level information such as byte-in/out rate associated with some specific application(s).
Since the major motivation is to support the auditing system, the interface could be named as “Auditor”. However, “Observer” is more general and flexible than it.
Another rejected alternative is to make an interceptor interface instead of an observer interface. Firstly, “inceptor” implies the possibility of modifying data which is not implied by an “observer”. Secondly, except the difference between interceptor and observer, the concept of the interceptor is more general than an observer in terms of what things could be done. However, the extra generalization complicates the KIP and is not necessary.