Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP attempts to solve the OOM issue associated with the number of PIDs and PID/epoch pairs in the system.

In addition there appears to be a significant lag between when a PID is received and when it is persistently recorded.  This KIP attempts to reduce the size of that window by writing the PID info earlier in the process

Current Status

The server implementation section of the Idempotent Producer wiki entry states that PIDs will be tracked in a simple array/list of PID-entries with new entries added at one end and expired at the other with look-ups based on a binary search.  Servers will maintain a fixed amount of memory for PIDs by making the PID array of fixed size and use it as a circular buffer.

The issue is that the number of PIDs that need to be tracked has exploded and has resulted in OOM failures that cause the entire cluster to crash.  There are multiple efforts underway to mitigate the OOM problem through cache cleanup and throttling of clients.

The list of active PID information is snapshotted to disk periodically to assist in recovery, fail-over and hand-off to a new leader.  The structure of the snapshotted data is well understood.

The in memory PIDs are not written to disk until they are snapshotted so there is a non-zero probability that the PID information will be lost in the event of a system failure.

An Observation

The in memory list of PIDs is often referred to as a cache when in fact it is a ledger.  Data are written to the ledger (snapshot) and not modified thereafter, except for deletion of the snapshot files.

It makes sense then that the circular buffer be replaced with a performant ledgering system front ended by a standard write through caching system.

Public Interfaces

The goal of this KIP is to implement the necessary changes without changing the API or interfaces outside of the server PID tracking component. 

Proposed Changes

Implement a performant ledger system that will seamlessly work with the current implementation.  This proposal attempts to solve the OOM problem without resorting to throttling of clients and to reduce the probability that PID information will be lost in the event of a system failure.

This proposal comprises two parts: the ledger system itself and the cache that fronts it. 

The Ledger

The ledger will include the snapshot files that are already written to disk.  These files will not be changed in any way.  This will allow older systems to utilize the ledger for recovery using current software.  It will also allow the ledger system to initialize with the older system data.

In addition to the snapshot files a Bloom filter will be created for each snapshot file and will be stored on disk with the same name as the snapshot file but with the extension “bloom”.  The structure of the file will be two integers followed by a calculated number of longs.  The integers will be the number of bits in the filter, and the number of hash functions used to create it.  This data is readily available in the commons-collections Shape object.  The remainder will be the BitMap structure as defined in commons-collections BitMapProducer javadoc.

A set of Bloom-filter/snapshot-file-name pairs will be stored in memory and comprise an index to the on-disk data.  The list will be sorted by snapshot file timestamp in descending order.

The ledger will also utilize a memory mapped, random-access, or other rapid-write and searchable file to track the most recent PIDs.  This file will have the same structure as the snapshot file and will have an associated Bloom filter.

The ledger API will have 3 methods: read(), update(), and flush().  Read will attempt to locate the PID data in the memory mapped file and then, if not found, across the snapshot files.  Update will update the data in the memory mapped  file or insert a new entry if required.  Flush will write the current memory mapped snapshot data to disk and start a new one.

Update strategy

When an update() is called a commons-collections Hasher is created from the PID information..  The hasher has the advantage of being able to create a Bloom filter of any Shape for the object that was hashed.  The Bloom filter associated with the memory mapped file is checked to see if the hasher is contained within it.  If so the memory mapped file is searched for the matching PID, if located the data is updated and the updated record returned.

If the Bloom filter did not record a match, a new entry is added to the end of the ledger file and the associated Bloom filter is updated and the new record is returned.

The update process must be thread safe.

Read strategy

The read() strategy is initially similar to the update strategy in that the Bloom filter is checked, and the memory mapped file may be scanned.  If the record is located it is returned.

If the record is not located, the list of Bloom-filter/snapshot-file-name pairs will be scanned and each Bloom filter checked in turn to see if it matches the PID info hasher.  If so the associated snapshot will be scanned looking for the PID information.  If found the record is returned.  Since Bloom filters may yield false positives,  if the PID is not found the scan continues until the last Bloom filter is checked or a match is found.  If no match is found null or an empty Optional is returned.

Flush strategy

The flush writes the data to a snapshot file.  This can occur on a timer and may also occur when the associated Bloom filter is “full”.  The Bloom filter object has an isFull() method but this is not how we want to measure full.  In our case we want to ensure that the false positive  rate of the Bloom filter does not fall below our specified value.  This can be checked by checking if the filter.estimateN < shape.estimateMaxN().  These values can be tracked directly or the calculations used. 

The flush operation will start a new memory mapped file and bloom filter so that update processing can continue.  During the flush operation the read and update operations have to deal with the original memory mapped file as a read only file similar to the existing snapshot files.  The steps in the flush are:

  1. Create a new memory mapped file and associated Bloom filter.
  2. Configure the write path to write to the new memory mapped file..
  3. Configure the read path to read the new memory mapped file, then the old memory mapped file, and then the journal entries.
  4. Write the Bloom filter for the old memory mapped file to disk.
  5. Calculate the CRC for the old memory mapped file and flush the file to disk.
  6. Add the Bloom filter/file name pair to the index of on-disk data.
  7. Remove the old memory mapped file from the read path.
  8. Release the memory mapping for the old memory mapped file.

Using this process there should not be a point in time where a known PID is unreachable.

Whether the memory mapped files have the .snapshot extension or have some other extension and are renamed during flush is an open question.  If  they have the .snapshot extension then current implementations that read .snapshot files have to be able to deal with files that do not have the CRC populated.

The Cache

The PID cache is a simple out-of-the-box, write through, thread safe cache.  If the PID is not in the cache, it calls ledger.read() to retrieve the values from the ledger and populates the cache or informs the cache that the record does not exist.  When the cache is written to, it calls ledger.update() and then updates itself.

Compatibility, Deprecation, and Migration Plan

This change should be 100% compatible with existing implementations.  It should be able to read an existing snapshot or series of snapshots and begin operation.  If rolled back to an earlier implementation, the on-disk storage should be readable by the earlier implementation without issue.

KIP-936: Throttle number of active PIDs.  This KIP is an attempt to circumvent the problem of poorly configured clients that initialize too many PIDs.  It would reduce the number of PIDs flowing into the cache by throttling producers.  The result is that number of new PIDs entering the cache will be reduced.   This impacts the PIDs that are recorded by the system.

KIP-854: Separate configuration for producer ID expiry.  This KIP attempted to solve the problem of tracking excessive PIDs by establishing a 24-hour limit, and thus the 1 day limit noted in the snapshot retention notes above.  KIP-854 was accepted and has been implemented.  However it did not solve the problem, it just delays it for some cases.  These are the cases that KIP-936 is attempting to resolve.

Test Plan

An initial system test will be constructed that causes the OOM error using the unmodified code base.  This change should resolve that failure without significant changes to exiting performance tests.  No additional stable tests should fail.

Rejected Alternatives

This change can play in concert with other proposed changes by may make them unnecessary.

Benefits

  1. There is no need to throttle producers because the server does not have enough memory to handle the number of PIDs produced, and so makes no demands on or for principal-based quotas.
  2. There is no risk of losing the PIDs from infrequently producing long running producers, except when the ledger files are deleted.
  3. There is no change to the server API.
  4. There is no change to the internal server call as the cache can be wrapped with a simple facade.
  5. There is no change to snapshot and recovery processes.
  6. There is a reduction in the probability of losing a PID transaction due to system failure.
  7. There is no need for special cache handling for outlying producers.
  8. There can be a reduction in memory useage.


  • No labels