Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Refactored metrics slightly (as part of https://github.com/apache/kafka/pull/5795)

...

  • late records (new records older than the grace period) are currently metered as "skipped-records" and logged at WARN level. As noted, this is not correct, so we will change the logs to DEBUG level and add new metrics: 
    • average and max observed lateness of all records: to help configure the grace period
      • (INFODEBUG) record-lateness-[avg | max] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • rate and total of dropped events for closed windows
      • (INFO) late-record-drop-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
  • expired records (new records for segments older than the state store's retention period) are currently not metered and logged at DEBUG level. Since the grace period is currently equivalent to the retention period, this should currently be rare, as such events would never reach the state store but be marked as skipped and dropped in the processor. However, since we are deprecating `Windows.until` and defining retention only on the state store, it would become much more common. Therefore, we'll add some new state store metrics:
    • rate and total of events for expired windows
      • (INFO) expired-window-record-drop-[rate | total] type=stream-[storeType]-state-metrics client-id=<threadId> task-id=<taskId> [storeType]-state-id=[storeName]
  • intermediate event suppression
    • current, average, and peak intermediate suppression buffer size
      • (INFODEBUG) suppression-mem-buffer-size-[current | avg | max] type=stream-processor-nodebuffer-metrics client-id=<threadId> task-id=<taskId> processorbuffer-node-id=<processorNodeId><bufferName>
    • current, average, and peak

      intermediate suppression disk buffer size (only present when using the "SPILL_TO_DISK" buffer-full strategy)

      number of records in the suppression buffer

      • (DEBUG
      • (INFO) suppression-disk-buffer-sizecount-[current | avg | max] type=stream-processor-nodebuffer-metrics client-id=<threadId> task-id=<taskId> processorbuffer-node-id=<processorNodeId><bufferName>
    • intermediate suppression buffer eviction emit rate and total: to how often events are emitted early (only present when using the "EMIT" buffer-full strategy)
      • (INFODEBUG) suppression-mem-buffer-evict-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
      rate and total of intermediate result suppressions
      • (INFO) intermediate-result-suppressionemit-[rate | total] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>
    • min and average intermediate suppression overtime: to determine whether the intermediate suppression emitAfter is delaying longer than necessary. This metric may be unnecessary, since it's equivalent to (timeToWaitForMoreEventsConfig - observedLatenessMetric).
      • (INFO) intermediate-result-suppression-overtime-[min | avg] type=stream-processor-node-metrics client-id=<threadId> task-id=<taskId> processor-node-id=<processorNodeId>

...