Current state: Accepted
Discussion thread: here
Vote thread: here
JIRA: KAFKA-4453
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Today there is no separate between controller requests and regular data plane requests. Specifically (1) a controller in a cluster uses the same advertised endpoints to connect to brokers as what clients and regular brokers use (2) on the broker side, the same network (processor) thread could be multiplexed by handling a controller connection and many other data plane connections (3) after a controller request is read from the socket by a network thread, it is enqueued into the single FIFO requestQueue, which is used for all types of requests (4) request handler threads poll requests from the requestQueue and handles the controller requests with the same priority as regular data requests.
Because of the multiplexing at every stage of request handling, controller requests could be significantly delayed under the following scenarios:
Delaying a controller request for a prolonged period can have serious consequences, and we'll examine the impact of the delayed processing for a LeaderAndISR request and a UpdateMetadata request now[1].
In summary, we'd like to mitigate the effect of stale metadata by shortening the latency between a controller request's arrival and processing on a given broker.
kafka.network:name=RequestQueueSize,type=RequestChannel
will be changed, and it will be used to show the size of the data request queue only, which does not include controller requests.In order to eliminate queuing for controller requests, we plan to add a dedicated endpoint on a brokers for controller connections, as well as two dedicated control plane threads for handling controller requests. To explain the proposed change, we first go through how brokers should get the dedicated endpoints through configs, and expose the endpoints to Zookeeper. Then we discuss how a controller can learn about the dedicated endpoints exposed by brokers. Finally we describe how controller requests are handled over the dedicated connections.
Upon startup, a broker needs to get two list of endpoints: the listeners endpoints that are used to bind the server socket and accept incoming connections, as well as an advertised listeners endpoints list that are published to Zookeeper for clients or other brokers to establish connections with. More details on the reason of separating these two lists can be found at KAFKA-1092 and KIP-103. In terms of how the values for the two lists are derived, we find it intuitive to understand the relationships of different configs using the following chart:
To support dedicated ports for controller connections, we need a way to specify the dedicated endpoints. We propose to support the new dedicated endpoints by adding new a new entry to the "listeners" and "advertised.listeners" config. For instance, if a cluster already has multiple listener names with config
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL advertised.listeners=INTERNAL://broker1.example.com:9092,EXTERNAL://host1.example.com:9093 listeners=INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093 |
in order to support the new endpoint for controller, it can be changed to
listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL advertised.listeners=CONTROLLER://broker1.example.com:9091,INTERNAL://broker1.example.com:9092,EXTERNAL://host1.example.com:9093 listeners=CONTROLLER://192.1.1.8:9091,INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093 |
Upon startup, a broker should maintain the existing behavior by publishing all the endpoints in advertised-listeners to Zookeeper.
Today each broker publishes a list of endpoints to Zookeeper, in the json format:
{ "listener_security_protocol_map": { "INTERNAL": "PLAINTEXT", "EXTERNAL": "SSL" }, "endpoints": [ "INTERNAL://broker1.example.com:9092", "EXTERNAL://host1.example.com:9093" ], "host": "host1.example.com", "port": 9092, "jmx_port": -1, "timestamp": "1532467569343", "version": 4 } |
Upon detecting a new broker through Zookeeper, the controller will figure out which endpoint it should use to connect to the new broker by first determining the the inter-broker-listener-name. The inter-broker-listener-name is decided by using either the "inter.broker.listener.name" config or the "security.inter.broker.protocol" config. Then by using the "endpoints" section of the broker info, the controller can determine which endpoint to use for the given inter-broker-listener-name. For instance, with the sample json payload listed above, if the controller first determines inter-broker-listener-name to be "INTERNAL", then it knows to use the endpoint "INTERNAL://broker1.example.com:9092" and security protocol PLAINTEXT to connect to the given broker.
Instead of using the inter-broker-listener-name value, we propose to add a new config "control.plane.listener.name" for determining the control plane endpoints. For instance, if the controller sees that the exposed endpoints by a broker is the following:
{ "listener_security_protocol_map": { "CONTROLLER": "PLAINTEXT" "INTERNAL": "PLAINTEXT", "EXTERNAL": "SSL" }, "endpoints": [ "CONTROLLER://broker1.example.com:9091", "INTERNAL://broker1.example.com:9092", "EXTERNAL://host1.example.com:9093" ], "host": "host1.example.com", "port": 9092, "jmx_port": -1, "timestamp": "1532467569343", "version": 4 } |
and the "control.plane.listener.name" config is set to value "CONTROLLER", it will use the corresponding endpoint "CONTROLLER://broker1.example.com:9091" and the security protocol "PLAINTEXT" for connections with this broker.
Whenever the "control.plane.listener.name" is set, upon broker startup, we will validate its value and make sure it's different from the inter-broker-listener-name value.
If the "control.plane.listener.name" config is not set, the controller will fall back to the current behavior and use inter-broker-listener-name value to determine controller-to-broker endpoints.
With the dedicated endpoints for controller connections, upon startup a broker will use the "control.plane.listener.name" to look up the corresponding endpoint in the listeners list for binding. For instance, in the example given above, the broker will derive the dedicated endpoint to be "CONTROLLER://192.1.1.8:9091". Then it will have a new dedicated acceptor that binds to this endpoint, and listens for controller connections. When a connection is received, the socket will be given to a dedicated control plane processor thread (network thread). The dedicated processor thread reads controller requests from the socket and enqueues them to a new dedicated control plane request queue, whose capacity is 20 [2]. On the other side of the controller request queue, a dedicated control plane request handler thread will take requests out, and handles them in the same way as being done today. In summary, we are 1) adding a dedicated acceptor, 2) pinning one processor thread, 3) adding a new request queue, and 4) pinning one request handler thread for controller connections and requests. The two new threads are exclusively for requests from the controller and do not handle data plane requests.
The metrics
kafka.network:name=ControlPlaneRequestQueueSize,type=RequestChannel
kafka.network:name=ControlPlaneResponseQueueSize,type=RequestChannel
will be added to monitor the size of the new control plane request and response queues. Another two new metrics
kafka.network:name=ControlPlaneNetworkProcessorIdlePercent,type=SocketServer
kafka.server:name=ControlPlaneRequestHandlerIdlePercent,type=KafkaRequestHandlerPool
will be added to monitor the idle ratio of the new control plane network thread, and control plane request handler thread respectively.
Finally as a special case, if the "control.plane.listener.name" config is not set, then there is no way to tell the dedicated endpoint for controller. Hence there will be no dedicated acceptor, network processor, or request handler threads. The behavior should be exactly same as the current implementation.
Compatibility, Deprecation, and Migration Plan
[1] There is another type of Controller request, which is the StopReplica request. Topic deletion uses the StopReplica request with the field deletePartitions set to true, hence delayed processing of such StopReplica requests can degrade the performance of the Topic deletion process. Whether topic deletion is more important than client requests may vary under different settings, and when topic deletion is more important, it'll be better to prioritize the StopReplica requests over data requests.
[2] The rationale behind the default value is that currently the max number of inflight requests from controller to broker is hard coded to be 1, meaning a broker should have at most one controller request from a given controller. However, during controller failovers, a broker might receive multiple controller requests from different controllers. Yet we expect it to be rare for the number of controller requests to go above 20.