Status

Current state: Under Discussion

Discussion thread:

JIRA: KAFKA-15541 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Iterator leaks are caused when a user opens an Iterator on a StateStore, but never closes it. Despite Iterators being AutoCloseable, it's still possible for them to fail to close, if a user does not allocate them in a try-with-resources block. Iterator leaks can directly cause memory leaks, or other performance problems, depending on the StateStore implementation.

In particular, RocksDB Iterators must be closed after use, to prevent memory leaks due to blocks being "pinned" in-memory. Pinned blocks can currently be tracked via the per-store block-cache-pinned-usage metric. However, it's common (and even recommended) to share the Block Cache among all stores in an application, to enable users to globally bound native memory used by RocksDB. This results in the block-cache-pinned-usage reporting the same memory usage for every store in the application, irrespective of which store is actually pinning blocks in the block cache.

We introduce several new metrics to aid users in finding leaked Iterators, as well as identifying the cause of a high number of pinned blocks, or other kinds of memory leaks and performance problems.

Public Interfaces

New Metrics

Each added metric will be on store-level and have the following tags:

  • type = stream-state-metrics
  • thread-id = [thread ID]
  • task-id = [task ID]
  • [store type]-state-id = [store ID]    for key-value stores
  • [store type]-session-state-id = [store ID]    for session stores
  • [store type]-window-state-id = [store ID]    for window stores  
NameRecording LevelMetric TypeDescription
num-open-iteratorsINFOGaugeThe current number of Iterators on the store that have been created, but not yet closed.
iterator-duration-avgDEBUGAverageThe average time spent between creating an Iterator and closing it, in milliseconds.
iterator-duration-maxDEBUGMaximumThe maximum time spent between creating an Iterator and closing it, in milliseconds.
oldest-open-iterator-age-msDEBUGGaugeThe time since the oldest still open Iterator was created, in milliseconds.

Proposed Changes

MeteredKeyValue store, which by default automatically wraps all state stores created by Kafka Streams, already defines a MeteredKeyValueIterator , which automatically wraps all Iterators provided by the underlying StateStore. We will add our new metrics to MeteredKeyValueIterator.



The iterator-duration metrics will be updated whenever an Iterator's close() method is called, recording the time since the Iterator was created. The measurement will be conducted using System.nanoTime() and reported in milliseconds, with a fractional component approximating the sub-millisecond precision. e.g. 2.337583, would be approximately 2 milliseconds, 337 microseconds and 583 nanoseconds. Users would want to monitor this metric and take action if:

  • The avg, max, is consistently high, or continues to climb indefinitely. This would indicate a performance problem with code executed within each iteration of an Iterator.
  • The metric reports 0, or no data, despite the application making use of state store Iterators. This can indicate that the Iterators being used are not having their close() method called by the user, which would cause an iterator leak.


The num-open-iterators metric will be incremented whenever an Iterator is constructed, and decremented whenever #close() is called. Users would want to monitor this metric and take action if:

  • The metric continues to climb indefinitely over time. This would indicate that some Iterators are being opened but not closed.


The oldest-open-iterator-age-ms metric will be updated by tracking all currently open Iterators and taking the min(startTime), where startTime is recorded in the constructor by calling System.currentTimeMillis(). Users will want to take action if:

  • The metric continues to climb indefinitely over time, as it indicates that old Iterators are not being closed.
  • The metric is consistently very high, despite not continuing to climb, which indicates an Iterator that takes a long time to complete iteration and close. This could indicate a performance problem in the application, like iterating over every key in a very large StateStore.

Compatibility, Deprecation, and Migration Plan

  • No impact is expected for existing users, as no existing interfaces or behaviour is being changed.
  • The performance overheads of tracking the opening/closing of Iterators is expected to be negligible to the point that it will not be measurable in real-world applications.

Test Plan

MeteredKeyValueStoreTest will be updated with new tests for each metric.

Rejected Alternatives

  • Originally, this KIP focused on providing metrics exclusively for RocksDB Iterators. However, it was determined that it would be useful to have these metrics available for all StateStores. The original implementation involved using RocksDBs internal Tickers to track open/closed Iterators. Our new implementation maintains these counts in Kafka Streams, which enables these metrics to be reported for any StateStore.
  • No labels