Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/1967vwhhy34lcjmcd0lfk6p5k6chj8rq

Vote thread: https://lists.apache.org/thread/7x2vl4rzo2zp6nwgmo3rw1m5c2qf3r9h

JIRA: KAFKA-18760 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The org.apache.kafka.network.EndPoint and org.apache.kafka.common.Endpoint have same fields - host, port, listenerName, and securityProtocol. The org.apache.kafka.network.EndPoint was migrated from kafka.cluster.EndPoint. It was introduced in 0.9.0.0 (commit). The org.apache.kafka.common.Endpoint was introduced in 2.4.0 (commit). The only difference between these two classes is that org.apache.kafka.network.EndPoint#listenerName returns String and org.apache.kafka.common.Endpoint#listenerName returns Optional<String>. In previous design, org.apache.kafka.common.Endpoint could be reused in server and client sides, so the listenerName may be null. This KIP would like to deprecate redundant Optional return and just return String object cause of two reasons:

  • The only usage of common Endpoint in client module is Authorizer. However, the listenerName is not null in Authorizer, because the Endpoint data is from server side.
  • The common Endpoint is not reused in all places. For example, there are RaftVoterEndpoint and StreamsGroupMemberDescription#Endpoint. These customized Endpoint use different fields in different cases.

This KIP introduces three changes:

  • The org.apache.kafka.common.Endpoint#listenerName will be deprecated and a new function listener will be added to return String.
  • Replaces org.apache.kafka.network.EndPoint with org.apache.kafka.common.Endpoint, because we don't need redundant class to represent same data.
  • Change RaftVoterEndpoint#name to RaftVoterEndpoint#listener to unify function name for listener.

Public Interfaces

org.apache.kafka.common.Endpoint

Deprecate the function listenerName.

Add a new function listener to replace listenerName and return String.

package org.apache.kafka.common

public class Endpoint {

    private final String listenerName;
    private final SecurityProtocol securityProtocol;
    private final String host;
    private final int port;

    public Endpoint(String listenerName, SecurityProtocol securityProtocol, String host, int port) {
        this.listenerName = listenerName;
        this.securityProtocol = securityProtocol;
        this.host = host;
        this.port = port;
    }

    /**
     * Returns the listener name of this endpoint.
     */
    public String listener() {
        return listenerName;
    }

    /**
     * Returns the listener name of this endpoint. This is non-empty for endpoints provided
     * to broker plugins, but may be empty when used in clients.
     * @deprecated Since 4.1. Use {@link #listener} instead. This function will be removed in 5.0.
     */
    @Deprecated
    public Optional<String> listenerName() {
        return Optional.ofNullable(listenerName);
    }

    /**
     * Returns the security protocol of this endpoint.
     */
    public SecurityProtocol securityProtocol() {
        return securityProtocol;
    }

    /**
     * Returns advertised host name of this endpoint.
     */
    public String host() {
        return host;
    }

    /**
     * Returns the port to which the listener is bound.
     */
    public int port() {
        return port;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Endpoint)) {
            return false;
        }

        Endpoint that = (Endpoint) o;
        return Objects.equals(this.listenerName, that.listenerName) &&
            Objects.equals(this.securityProtocol, that.securityProtocol) &&
            Objects.equals(this.host, that.host) &&
            this.port == that.port;

    }

    @Override
    public int hashCode() {
        return Objects.hash(listenerName, securityProtocol, host, port);
    }

    @Override
    public String toString() {
        return "Endpoint(" +
            "listenerName='" + listenerName + '\'' +
            ", securityProtocol=" + securityProtocol +
            ", host='" + host + '\'' +
            ", port=" + port +
            ')';
    }
}

org.apache.kafka.clients.admin.RaftVoterEndpoint

Deprecate the function name.

Add a new function listener to replace name.

public class RaftVoterEndpoint {
    private final String listener;
    private final String host;
    private final int port;

    static String requireNonNullAllCapsNonEmpty(String input) {
        if (input == null) {
            throw new IllegalArgumentException("Null argument not allowed.");
        }
        if (!input.trim().equals(input)) {
            throw new IllegalArgumentException("Leading or trailing whitespace is not allowed.");
        }
        if (input.isEmpty()) {
            throw new IllegalArgumentException("Empty string is not allowed.");
        }
        if (!input.toUpperCase(Locale.ROOT).equals(input)) {
            throw new IllegalArgumentException("String must be UPPERCASE.");
        }
        return input;
    }

    /**
     * Create an endpoint for a metadata quorum voter.
     *
     * @param listener          The human-readable name for this endpoint. For example, CONTROLLER.
     * @param host              The DNS hostname for this endpoint.
     * @param port              The network port for this endpoint.
     */
    public RaftVoterEndpoint(
        String listener,
        String host,
        int port
    ) {
        this.listener = requireNonNullAllCapsNonEmpty(listener);
        this.host = Objects.requireNonNull(host);
        this.port = port;
    }

    /**
     * @deprecated Since 4.1. Use {@link #listener} instead. This function will be removed in 5.0.
     */
    @Deprecated
    public String name() {
        return listener;
    }

    /**
     * The listener name for this endpoint.
     */
    public String listener() {
        return listener;
    }

    public String host() {
        return host;
    }

    public int port() {
        return port;
    }

    @Override
    public boolean equals(Object o) {
        if (o == null || (!o.getClass().equals(getClass()))) return false;
        RaftVoterEndpoint other = (RaftVoterEndpoint) o;
        return listener.equals(other.listener) &&
            host.equals(other.host) &&
            port == other.port;
    }

    @Override
    public int hashCode() {
        return Objects.hash(listener, host, port);
    }

    @Override
    public String toString() {
        // enclose IPv6 hosts in square brackets for readability
        String hostString = host.contains(":") ? "[" + host + "]" : host;
        return listener + "://" + hostString + ":" + port;
    }
}


Proposed Changes

  • Replace org.apache.kafka.network.EndPoint with org.apache.kafka.common.Endpoint.

  • Remove InterfaceStability.Evolving from org.apache.kafka.common.Endpoint.

Compatibility, Deprecation, and Migration Plan

In 4.1:

  • Deprecate org.apache.kafka.common.Endpoint#listenerName.
  • Add a new function listener to org.apache.kafka.common.Endpoint.
  • Replace org.apache.kafka.network.EndPoint with org.apache.kafka.common.Endpoint.
  • Deprecate org.apache.kafka.admin.RaftVoterEndpoint#name and add a new function listener.

In 5.0:

  • Remove org.apache.kafka.common.Endpoint#listenerName.
  • Remove org.apache.kafka.admin.RaftVoterEndpoint#name.

Test Plan

Make sure all existent test cases can pass with proposed changes.

Rejected Alternatives

N/A

  • No labels