Current state: Under Discussion

Discussion thread: (Original Archive) (Markmail)


Released: <Kafka Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Kafka allows users to plugin a custom PrincipalBuilder and a custom Authorizer by specifying the classpath of the corresponding classes in the config.

When a Kafka broker receives request bytes from clients over the socket channel, it reconstructs the request which is then handed over to the request handler threads (a.k.a API threads) to process the request. In the process of doing this, the Kafka broker constructs a Session object that holds a KafkaPrincipal and client's socket IP address. The KafkaPrincipal includes the type of the client principal ("User" as of now) and the name of the Principal, generated by the PrincipalBuilder. The Authorizer interface includes an authorize(...) method, that is invoked on every request that is received by the Kafka broker. If the broker has a custom Authorizer configured, it will delegate the authorize(...) call to the custom implementation. The authorize(...) method takes in the Session object,  Operation requested and the Resource on which the operation is requested, as method parameters and returns true or false depending on the configured ACLs as follows :

def authorize(session: Session, operation: Operation, resource: Resource): Boolean

However, the principals generated by the plugged in PrincipalBuilder may contain additional custom fields, and the user's Authorizer implementation may need to access those fields in order to enforce ACLs correctly for those principals. Unfortunately, Kafka currently only extracts the name of the Principal when constructing the Session object as shown below, and loses the additional information at runtime:

val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) // (custom fields in principal if any are not passed through)

It is important to note that Java's Principal API is opaque and different Kafka service providers can have custom implementations of the Principal interface with additional features as per their requirements.  Since Kafka allows users to plug in a custom PrincipalBuilder and a custom Authorizer, it does not make sense to extract only the name of the Principal and ignore the other fields in the generated Principal (which may be required by the custom Authorizer).

This issue can be addressed if Kafka preserves the original Principal object when it processes the incoming request, before handing it over to the API threads. The Authorizer will then be able to access this Principal object and use it to verify the ACLs.

New or Changed Public Interfaces

This KIP introduces a change to Session class to accept a parameter of Java Principal type  instead of KafkaPrincipal type.

Proposed Changes

Compatibility, Deprecation, and Migration Plan

What impact (if any) will there be on existing users?

There is no compatibility impact as there is no change in behavior.

Test Plan

- Unit tests to validate that new changes work as expected without affecting the existing behavior.

Rejected Alternatives

Alternative 1 :

Alternative 2 :