This KIP introduces a new server configuration parameter,
queued.max.request.bytes, that would specify a limit on the volume of requests that can be held in memory. This configuration parameter will co-exist with the existing
queued.max.requests (the code will respect both bounds and will not pick up new requests when either is hit).
- the pool is non-blocking, so network threads would not be blocked waiting for memory and could make progress elsewhere.
- SocketServer would instantiate and hold a memory pool, which Processor threads would try to allocate memory from when reading requests out of sockets (by passing the pool to instances of NetworkReceive that they create).
- NetworkReceive.readFromReadableChannel() would be modified to try allocating memory (it is already written in a way that reading may involve multiple repeated calls to readFromReadableChannel(), so not a big change to behavior)
- memory would be released at the end of request processing (in KafkaRequestHandler.run()), and also in case of disconnection mid request-building in KafkaChannel.close()
- As the pool would allow any size request if it has any capacity available, the actual memory bound is
queued.max.request.bytes + socket.request.max.bytes. The up-side is no issues with large requests getting starved out
queued.max.requestsis deprecated/removed in favor of
queued.max.request.bytes. In this case, the conversion of existing configurations could use
queued.max.request.bytes = queued.max.requests * socket.request.max.bytes(which is conservative, but "safe")
queued.max.requestsis supported as an alternative to
queued.max.request.bytes(either-or), in which case no migration is required. A default value of 0 could be used to disable the feature (by default) and runtime code would pick a queue implementation depending on which configuration parameter is provided.
queued.max.requestsis supported in addition
queued.max.request.bytes(both respected at the same time). In this case a default value of
queued.max.request.bytes = -1would maintain backwards compatible behavior.
The current naming scheme of
queued.max.requests (and the proposed
queued.max.request.bytes) may be a bit opaque. Perhaps using requestQueue.max.requests and requestQueue.max.bytes would more clearly convey the meaning to users (indicating that these settings deal with the request queue specifically, and not some other). The current
queued.max.requests configuration can be retained for a few more releases for backwards compatibility.
queued.max.request.bytes must be larger than socket.request.max.bytes (in other words, memory pool must be large enough to accommodate the largest single request possible), or <=0 (if disabled). the default would be -1.
- A unit test was written to validate the behavior of the memory pool
- A unit test that validates correct behavior of RequestChannel under capacity bounds would need to be written.
- A micro-benchmark for determining the performance of the pool would need to be written
- Stress testing a broker (heavy producer load of varying request sizes) to verify that the memory limit is honored.
- Benchmarking producer and consumer throughput before/after the change to prove that ingress/egress performance remains acceptable.
- Testing of SSL connections (both inter-broker and client-broker) since implementation bugs may affect ssl
- Reducing producer max batch size: this is harmful to throughput (and is also more complicated to maintain from an administrator's standpoint than simply sizing the broker itself). This is more of a workaround than a fix
- Reducing producer max request size: same issues as above.
- Limiting the number of connected clients: same issues as above
queued.max.requestsin the broker: Although this will conservatively size the queue it can be detrimental to throughput in the average case.
- controlling the volume of requests enqueued in
RequestChannel.requestQueue(would not suffice as no bound on memory read from actual sockets)
- the order of selection keys returned from a selector.poll call is undefined. in case the actual implementation uses a fixed order (say by increasing handle id?) and under prolonged memory pressure (so never enough memory to service all requests) this may lead to starvation of sockets that are always at the end of the iteration order. to overcome this the code shuffles the selection keys if memory is low.
- a strict pool (which adheres to its max size completely) will cause starvation of large requests under memory pressure (as they would never be able to allocate if there is a stream of small requests). to avoid this the pool implementation will allocate the requested amount of memory if it has any memory available (so if pool has 1 free byte and 1 MB is requested, 1MB will be returned and the number of available bytes in the pool will be negative). this means the actual bound on number of bytes outstanding is queued.max.request.bytes + socket.request.max.bytes - 1(socket.request.max.bytes representing the largest single request possible)