Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleDelete Results
package org.apache.kafka.server.authorizer;

import java.util.Collections;
import java.util.Collection;
import java.util.Optional;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

@InterfaceStability.Evolving
public class AclDeleteResult {
    private final ApiException exception;
    private final Collection<AclBindingDeleteResult> aclBindingDeleteResults;

    public AclDeleteResult(ApiException exception) {
        this(Collections.emptySet(), exception);
    }

    public AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults) {
        this(deleteResults, null);
    }

    private AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults, ApiException exception) {
        this.aclBindingDeleteResults = deleteResults;
        this.exception = exception;
    }

    /**
     * Returns any exception while attempting to match ACL filter to delete ACLs.
     */
    public Optional<ApiException> exception() {
        return exception == null ? Optional.empty() : Optional.of(exception);
    }

    /**
     * Returns delete result for each matching ACL binding.
     */
    public Collection<AclBindingDeleteResult> aclBindingDeleteResults() {
        return aclBindingDeleteResults;
    }


    /**
     * Delete result for each ACL binding that matched a delete filter.
     */
    public static class AclBindingDeleteResult {
        private final AclBinding aclBinding;
        private final ApiException exception;

        public AclBindingDeleteResult(AclBinding aclBinding) {
            this(aclBinding, null);
        }

        public AclBindingDeleteResult(AclBinding aclBinding, ApiException exception) {
            this.aclBinding = aclBinding;
            this.exception = exception;
        }

        /**
         * Returns ACL binding that matched the delete filter. {@link #deleted()} indicates if
         * the binding was deleted.
         */
        public AclBinding aclBinding() {
            return aclBinding;
        }

        /**
         * Returns any exception that resulted in failure to delete ACL binding.
         */
        public Optional<ApiException> exception() {
            return exception == null ? Optional.empty() : Optional.of(exception);
        }
    }
}


Proposed Changes

Asynchronous update requests

kafka.server.KafkaApis will be updated to handle CreateAcls and DeleteAcls requests asynchronously using a purgatory. If Authorizer.createAcls or Authorizer.deleteAcls returns any ComplettionStage that is not complete, the request will be added to a purgatory and completed when all the stages complete. Authorizer implementations with low latency updates may continue to update synchronously and return a completed future. These requests will be completed in-line and will not be added to the purgatory.

Asynchronous updates are useful for Authorizer implementations that use external stores for ACLs, for example a database. Async handling of update requests will enable Kafka brokers to handle database outages without blocking request threads. As many databases now support async APIs (https://dev.mysql.com/doc/x-devapi-userguide/en/synchronous-vs-asynchronous-execution.html, https://blogs.oracle.com/java/jdbc-next:-a-new-asynchronous-api-for-connecting-to-a-database), async update API enables authorizers to take advantage of these APIs.

Purgatory metrics will be added for ACL updates, consistent with metrics from other purgatories. Two new metrics will be added:

  • kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=acl-update
  • kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=acl-update

In addition to these metrics, existing request metrics for CreateAcls and DeleteAcls can be used to track the portion of time spent on async operations since local time is updated before the async wait and remote time is updated when async wait completes:

  • kafka.network:type=RequestMetrics,name=LocalTimeMs,request=CreateAcls
  • kafka.network:type=RequestMetrics,name=RemoteTimeMs,request=CreateAcls
  • kafka.network:type=RequestMetrics,name=LocalTimeMs,request=DeleteAcls
  • kafka.network:type=RequestMetrics,name=RemoteTimeMs,request=DeleteAcls

Deprecate existing Scala Authorizer Trait

...