Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We can solve the scalability and latency problems discussed above by creating "incremental" fetch requests and responses that only include information about what has changed.  In order to do this, we need to introduce the concept of "fetch sessions."

...

There are several changes to the FetchRequest API.

Fetch Session ID

The fetch session ID uniquely A 32-bit number which identifies the current fetcher.If this is set to 0, the server may create a new fetch session and return its value in the FetchResponse.If this is set to a non-zero value, it is the 0, there is no current fetch session ID.

Fetch

...

There is now a fetch_type field with the following values:

FULL_FETCH_REQUEST(0): This is a full fetch request.

INCREMENTAL_FETCH_REQUEST(1): This is an incremental fetch request.

The response to a full fetch request is always a full fetch response.  Similarly, the response to an incremental fetch request is always an incremental fetch response.

Incremental FetchRequests will only contain information about partitions which have changed on the follower.  If a partition has not changed, it will not be included. Possible changes include:

  • Needing to fetch from a new offset in the partition
  • Changing the maximum number of bytes which should be fetched from the partition
  • Changes in the logStartOffset of the partition on the follower

Note that the existing system by which the leader becomes aware of changes to the follower's fetch position is not changed.  Every time the follower wants to update the fetch position, it must include the partition in the next incremental request.  Also note that if no partitions have changed, the next incremental FetchRequest will not contain any partitions at all.

An incremental fetch request cannot be made with a 0 fetch session ID.  An incremental fetch request should only be made if the server has returned INCREMENTAL_FETCH_OK in a previous full fetch request (see FetchResponse changes below)

Full FetchRequests and full FetchResponses will contain information about all partitions.  This is unchanged from the current behavior.  The client can alter the set of partitions in the fetch session by making a full fetch request with the designated session ID.

Incremental Fetch Sequence Number

The incremental fetch sequence number is a monotonically increasing counter. It is used for several purposes:

...

Session Epoch

A 32-bit number which identifies the current fetch session epoch.  Valid session epochs are always positive-- they are never 0 or negative numbers.

The fetch session epoch is incremented by one for each fetch request that we send.  Once it reaches MAX_INT, the next epoch is 1.

The fetch epoch keeps the state on the leader and the follower synchronized.  It ensures that if a message is duplicated or lost, the server will always notice.  It is also used to associate requests and responses in the logs. Other numbers, such as IP addresses and ports, or NetworkClient sequence numbers, can also be helpful for this purpose-- but they are less likely to be unique.

Fetch Type

The FetchType is an 8-bit number with the following values:

  • 0: SESSIONLESS
  • 1: FULL
  • 2: INCREMENTAL

FetchRequest Metadata meaning

Request

FetchType

Request

SessionId

Request

SessionEpoch

Meaning
SESSIONLESSignoredignored

Make a full FetchRequest that does not use or create a session.

This is the FetchType of pre-KIP-227 FetchRequests.

FULL00

Make a full FetchRequest.

Create a new incremental fetch session if possible.

FULL$ID0

Close the incremental fetch session identified by $ID.

Make a full FetchRequest.

Create a new incremental fetch session if possible.

INCREMENTAL$ID$EPOCHIf the ID and EPOCH are correct, make an incremental fetch request.

Incremental Fetch Requests

Incremental fetch requests have FetchType set to INCREMENTAL.

A partition is only included in an incremental FetchRequest if:

  • The client wants to notify the broker about a change to the partition's maxBytes, fetchOffset, or logStart
  • The partition was not included in the incremental fetch session before, but the client wants to add it.
  • The partition is in the incremental fetch session, but the client wants to remove it.

If the client doesn't want to change anything, the client does not need to include any partitions in the request at all.

If the client wants to remove a partition, the client will set the partition's maxBytes to 0 in the request

...

.

Schema

FetchRequest => max_wait_time replica_id min_bytes isolation_level fetch_session_id fetch_session_epoch [topic]

...

  fetch_type => INT8

  fetch_session_id => INT64INT32

  fetch_session_sequence_number epoch => INT64INT32

  topic => topic_name [partition]

...

  max_bytes => INT32.

FetchResponse Changes

Top-level error code

Per-partition error codes are no longer sufficient to handle all response errors.  For example, when an incremental fetch session encounters an InvalidFetchSessionExceptionFetchSessionIdNotFoundException, we do not know which partitions the client expected to fetch.  Therefore, the FetchResponse now contains a top-level error code.  This error code is set to indicate that the request as a whole cannot be processed.

When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.

Fetch

...

Type

The FetchResponse now contains an 8-bit fetch type.

Fetch Session ID

The FetchResponse now contains a 32-bit fetch session ID.

Fetch Session Epoch

The FetchResponse now contains a 32-bit fetch session epoch.  This is the epoch which the server expects to see in the next fetch request.

FetchResponse Metadata meaning

Request

FetchType

Request

SessionId

Request

SessionEpoch

Meaning
SESSIONLESSignoredignored

This is a response to a SESSIONLESS fetch request.

The broker also uses this FetchType when there  was a fetch session error.  The error will be described by the top-level response error field.

FULL00No fetch session was created.
INCREMENTAL$ID$EPOCH

The next request can be an incremental fetch request with the given $ID and $EPOCH.

Note that the response to a FULL FetchRequest may have an INCREMENTAL FetchType.  This response contains information about all partitions, but also indicates that the next request which the client makes can be an incremental FetchRequest.

Incremental Fetch Responses

A partition is only included in an incremental FetchResponse if:

  • The broker wants to notify the client about a change to the partition's highWatermark or broker logStartOffset
  • There is new data available for a partition

If the broker has no new information to report, it does not need to include any partitions in the response at all.

The format of the partition data within FetchResponse is unchanged.

Handling Partition Size Limits in Incremental Fetch Responses

Sometimes, the per-fetch-request limit is too small to allow us to return information about every partition.  In those cases, we will limit the number of partitions that we return information about, to avoid exceeding the per-request maximum.  (As specified in KIP-74, the response will always return at least one message, though.)

If we always returned partition information in the same order, we might end up "starving" the partitions which came near the end of the order.  With full fetch requests, the client can rotate the order in which it requests partitions in order to avoid this problem.  However, incremental fetch requests need not explicitly specify all the partitions.  Indeed, an incremental fetch request may contain no partitions at all.

In order to solve the starvation problem, the server must rotate the order in which it returns partition information.  The server does this by maintaining a linked list of all partitions in the fetch session.  When data is returned for a partition, that partition is moved to the end of the list.  This ensures that we eventually return data about all partitions for which data is available

The server uses this field to notify the client when it has created a new session ID.

This field will be 0 if the server has closed the request session, or declined to create it in the first place.  Otherwise, it will be the session ID which the client should use for future requests.

Response Flags

response_flags is a bitfield.

INCREMENTAL_FETCH_RESPONSE(0): This bit is set if the response is an incremental fetch response.  It is cleared if the response is a full fetch response.

INCREMENTAL_FETCH_OK(1): This bit is set if the client can make incremental fetch requests with this session in the future.  It is cleared if the client should make full fetch requests in the future

Incremental Fetch Responses

Incremental fetch responses will only contain information about partitions for which:

  • The logStartOffset has changed, or
  • The highWaterMark has changed, or
  • The lastStableOffset has changed, or
  • The aborted transaction list has changed, or
  • There is partition data available

The format of the partition data within FetchResponse is unchanged.

Handling Partition Size Limits in Incremental Fetch Responses

Sometimes, the per-fetch-request limit is too small to allow us to return information about every partition.  In those cases, we will limit the number of partitions that we return information about, to avoid exceeding the per-request maximum.  (As specified in KIP-74, the response will always return at least one message, though.)

If we always returned partition information in the same order, we might end up "starving" the partitions which came near the end of the order.  With full fetch requests, the client can rotate the order in which it requests partitions in order to avoid this problem.  However, incremental fetch requests need not explicitly specify all the partitions.  Indeed, an incremental fetch request may contain no partitions at all.

In order to solve the starvation problem, the server must rotate the order in which it returns partition information. We do this by lexicographically ordering the partitions, and then starting at the partition whose index is the request sequence number modulo the number of partitions.

For example, suppose that the fetch session has partitions P1, P2, and P3.  In that case, the fetch request with sequence number 1 will start at index 1 % 3 = 1.  Therefore it will return information about P2 first.  If any space is left over, we will return information about P3.  And if any space is left over at the end, we will return information about P1. Similarly, for the request with sequence number 2, the order will be P3, P1, P2.  For the request with sequence number 3, the order will be P1, P2, P3.  And so on.

Schema

FetchResponse => throttle_time_ms error_code error_string fetch_session_id [topic]

...

  error_code => INT16

  error_string => STRING

  fetch_type  => INT8

  fetch_session_id => INT64INT32

  responsefetch_session_flags epoch => INT8INT32

  topic => topic_name [partition]

...

  first_offset => INT64

  records => RECORDS 

Compatibility, Deprecation, and Migration Plan

...