Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. 1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms"  so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to roll the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define: 

    Info
    iconfalse
    title maximum time to roll an active segment:

    maxSegmentMs =  if (the log has "compact" enabled) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }


  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed.

    for the first (earliest) log segment:  The estimated earliest timestamp

    The estimated earliest timestamp of a log segment is set to the timestamp of the first message. 

    from the second log segment onwards:  we can use the largestTimestamp of previous segment as an estimation since getting the earliest timestamp of a message requires additional IOs.

    The message timestamp can be out of ordered in a log segment. However, when combining max.compaction.lag.ms" with "log.message.timestamp.difference.max.ms",  Kafka can provide actual guarantee that a new record will be eligible for log compaction after a bounded time as determined by max.compaction.lag.ms" and "log.message.timestamp.difference.max.ms". 

  3. Let log cleaner pick up logs that have reached max compaction lag for compaction.  
    For any given log partition with compaction enabled, as long as the estimated earliest message timestamp of first un-compacted segment is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms" to determine the log's eligibility for compaction as it does today.  

  4. Add one Metric to track the max compaction delay (as described in the next section)

...

  • One way to force compaction on any cleanable log segment is setting “min.cleanable.dirty.ratio” to 0. However, compacting a log partition whenever a segment become cleanable (controlled by "min.compaction.lag.ms") is very expensive.  We still want to accumulate some amount of log segments before compaction is kicked out.  In addition, in order to honor the max compaction lag requirement, we also need to force a roll on active segment if the required lag has passed. So the existing configuration doesn't meet requirements to ensure a maximum compaction lag.  

  • In Item 2 -a of the proposed change section, if first message timestamp is not available,  we use "segment.largestTimestamp - maxSegmentMs” as an estimation of earliest timestamp. The actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  However, since this estimation is not very accurate and may cause unnecessary compaction, we decide to make this feature depends on the availability of first message timestamp

  • In Item 2 -b of the proposed change section, use the timestamp of the first message to estimate earliest timestamp of un-compacted segments.  Since getting the timestamp of a message requires additional IOs, the estimation based on previous segment's largestTimestamp is sufficient  use the largestTimestamp of previous segment as an estimation of next segment's earliest timestamp.  Since the estimation may not be very accurate,  we decide to keep it simple and always use the first message timestamp as an estimation of a log segment's earliest message timestamp