Add correlation id to all requests
A field to make it possible to multiplex requests over a single socket. This field would be set by the client and would be returned by the server with the response. This would allow a client to make multiple requests on a socket and receive responses asynchronously and know which response was for which request.
Reduce duplication in APIs
Currently we have both ProduceRequest and MultiProducerRequest and FetchRequest and MultiFetchRequest. The Multi*Request is just the single request version repeated N times. There are a few problems with this: (1) the ProduceRequest and FetchRequest are just special cases of the general Multi*Request format with no real benefit to them (the reason for their existence is largely historical), (2) having both means more API formats to maintain and evolve and test. We should get rid of the single topic/partition APIs and rename the existing Multi*Requests to ProduceRequest and FetchRequest to keep the naming clean.
Reduce repetition of topic name in Multi* APIs
Currently the form of the APIs for the Multi* requests looks something like this: [(topic, partition, messages), (topic, partition, messages), ...]. (Here square brackets denote a variable length list and parenthesis denote a tuple or record). This format is driven by the fact that the Multi* requests are really just a bunch of repeated single topic/partition ProducerRequests. This is really inefficient, though, as a common case is that we are producing a bunch of messages for different partitions under the same topic (i.e. if we are doing the key-based partitioning). It would be better for the format to be [(topic, [(partition, messages), ...], topic, [(partition, messages), ...], ...]. This would mean that each topic name is only given once per request no matter how many partitions within that partition are being produced to.
Support "long poll" fields in fetch request
Add two fields to the fetch request which cause the request to not immediately response. Currently fetch requests always immediately return, potentially with no data for the consumer. It is hence up to the consumer to continually poll for updates. This is not desirable. A better approach would be for the consumer request to block until either (1) min_bytes are available in total amongst all the topics being requests or (2) max_wait time in milliseconds has gone by. This would greatly simplify implementing a high-throughput, high-efficiency, low-latency consumer.
Add producer acknowledgement count and timeout
Currently the produce requests are asynchronous with no acknowledgement from the broker. We should add an option to have the broker acknowledge. The orginal proposal was just to have a boolean "acknowledgement needed" but we also need a field to control the number of replicas to block on, so a generalization is to allow the required_acks to be an integer between 0 and the number of replicas. 0 yields the current async behavior whereas > 1 would mean that in addition to blocking on the master we also block on some number of replicas.
Add offset to produce response
As discussed in KAFKA-49 it would be useful for the acknowledgement from the broker to include the offset at which the message set is available on the broker.
Separate request id and version
Currently we have a single int32 that identifies both the api and the version of the api. This is slightly more confusing then splitting out the request id and the version id into two 16 bit fields. This isn't a huge win but it does make it more clear the intention when bumping the version number versus adding a new request entirely.
Add a client id
Currently we can only correlate client applications to server requests via the tcp connection. This is a pain. It would be good to have a shared logical id for each application so that we can track metrics by client, log it with errors, etc.
Add replica id to fetch request
This replica id allows the broker to count the fetch as an acknowledgement for all previous offsets on the given partition. This should be set to -1 for fetch requests from non-replicas outside the cluster.
The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer. It must be the first field in the request.
An id that can be set by the client and will be returned untouched by the server in the response.
A version number that indicates the format of the response message
The id of the (request-level) error, if any occurred.