...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.authorizer; import java.util.Collections; import java.util.Collection; import java.util.Optional; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.ApiException; @InterfaceStability.Evolving public class AclDeleteResult { private final ApiException exception; private final Collection<AclBindingDeleteResult> aclBindingDeleteResults; public AclDeleteResult(ApiException exception) { this(Collections.emptySet(), exception); } public AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults) { this(deleteResults, null); } private AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults, ApiException exception) { this.aclBindingDeleteResults = deleteResults; this.exception = exception; } /** * Returns any exception while attempting to match ACL filter to delete ACLs. */ public Optional<ApiException> exception() { return exception == null ? Optional.empty() : Optional.of(exception); } /** * Returns delete result for each matching ACL binding. */ public Collection<AclBindingDeleteResult> aclBindingDeleteResults() { return aclBindingDeleteResults; } /** * Delete result for each ACL binding that matched a delete filter. */ public static class AclBindingDeleteResult { private final AclBinding aclBinding; private final ApiException exception; public AclBindingDeleteResult(AclBinding aclBinding) { this(aclBinding, null); } public AclBindingDeleteResult(AclBinding aclBinding, ApiException exception) { this.aclBinding = aclBinding; this.exception = exception; } /** * Returns ACL binding that matched the delete filter. {@link #deleted()} indicates if * the binding was deleted. */ public AclBinding aclBinding() { return aclBinding; } /** * Returns any exception that resulted in failure to delete ACL binding. */ public Optional<ApiException> exception() { return exception == null ? Optional.empty() : Optional.of(exception); } } } |
Proposed Changes
Asynchronous update requests
kafka.server.KafkaApis
will be updated to handle CreateAcls
and DeleteAcls
requests asynchronously using a purgatory. If Authorizer.createAcls
or Authorizer.deleteAcls
returns any ComplettionStage
that is not complete, the request will be added to a purgatory and completed when all the stages complete. Authorizer implementations with low latency updates may continue to update synchronously and return a completed future. These requests will be completed in-line and will not be added to the purgatory.
Asynchronous updates are useful for Authorizer implementations that use external stores for ACLs, for example a database. Async handling of update requests will enable Kafka brokers to handle database outages without blocking request threads. As many databases now support async APIs (https://dev.mysql.com/doc/x-devapi-userguide/en/synchronous-vs-asynchronous-execution.html, https://blogs.oracle.com/java/jdbc-next:-a-new-asynchronous-api-for-connecting-to-a-database), async update API enables authorizers to take advantage of these APIs.
Purgatory metrics will be added for ACL updates, consistent with metrics from other purgatories. Two new metrics will be added:
kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=acl-update
kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=acl-update
In addition to these metrics, existing request metrics for CreateAcls
and DeleteAcls
can be used to track the portion of time spent on async operations since local time is updated before the async wait and remote time is updated when async wait completes:
kafka.network:type=RequestMetrics,name=LocalTimeMs,request=CreateAcls
kafka.network:type=RequestMetrics,name=RemoteTimeMs,request=CreateAcls
kafka.network:type=RequestMetrics,name=LocalTimeMs,request=DeleteAcls
kafka.network:type=RequestMetrics,name=RemoteTimeMs,request=
DeleteAcls
Deprecate existing Scala Authorizer Trait
...