You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »


Status

Current state"Under Discussion"

Discussion thread:  

JIRA:

Motivation

Compaction enables Kafka to remove old messages that are flagged for deletion while other messages can be retained for a relatively longer time.  Today, a log segment may remain un-compacted for a long time since the eligibility for log compaction is determined based on compaction ratio (“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") setting.  Ability to delete a log message through compaction in a timely manner has become an important requirement in some use cases (e.g., GDPR).  The goal of this change is to provide a time-based compaction policy that ensures the cleanable section is compacted after the specified time interval regardless of dirty ratio and “min compaction lag”.  However, dirty ratio and “min compaction lag” are still honored if the time based compaction rule is not violated. In other words, if Kafka receives a deletion request on a key (e..g, a key with null value), the corresponding log segment will be picked up for compaction after the configured time interval to remove the key.

Proposed Changes

We propose adding a new topic level configuration: “max.compaction.lag.ms”, which controls the max time a message/segment can be skipped for log compaction.  With this configuration and compaction enabled, log cleaner is required to pick up all log segments that contain messages older than “max.compaction.lag.ms” for compaction. A log segment has a guaranteed upper-bound to become mandatory for compaction despite compaction ratio and min compaction lag. The clock starts when a log segment is first created as an active segment.

Here are a list of changes to enforce such a time based compaction policy:

  1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms" (or if the creation time of active segment is older than “max.compaction.lag.ms” when record timestamp is not available) so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to seal the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.

  2. Estimate the earliest message timestamp of an un-compacted log segment.

    1. for the first (earliest) log segment:  The estimated earliest timestamp is set to the timestamp of the first message if timestamp is present in the message. Otherwise, the estimated earliest timestamp is set to "segment.largestTimestamp - min(“segment.ms”, “max.compaction.lag.ms")”  (segment.largestTimestamp is lastModified time of the log segment or max timestamp we see for the log segment). In the second case, the actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction sooner.  However, we only need to estimate earliest message timestamp for un-compacted log segments because the deletion requests that belong to compacted segments have already been processed.

    2. from the second log segment onwards:  there are two methods to estimate the earliest message timestamp of a log segment. First method is to use the largestTimestamp (lastmodified time) of previous segment as an estimation. Second method is to use the timestamp of the first message if timestamp is present in the message.  Since getting the timestamp of a message requires additional IOs, the first method of estimation may be sufficient in practice.

  3. Let log cleaner pick up all logs with estimated earliest timestamp earlier than “now - max.compaction.lag.ms” for compaction.  
    The Rule is simple,  as long as there is an un-compacted log segment whose estimated timestamp is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms"  to determine a log's eligibility for compaction as it does today. The logs to be compacted are currently sorted based on dirty ratio. With the change, the logs are sorted based on "must clean dirty ratio" first and then by dirty ratio.  "must clean dirty ratio" is calculated similar as dirty ratio except only the logs that are required to be compacted contribute to the "must clean dirty ratio". More specifically, “must clean dirty ratio” is the total size of cleanable segments whose records are older than “max.compaction.lag.ms” divided by total size (clean segment size + cleanable segment size). The reason is to compact the logs that are required to be cleaned by this time-based policy first.

Public Interfaces

  • Adding topic level configuration "max.compaction.lag.ms",  and corresponding broker configuration "log.cleaner.max.compaction.lag.ms", which is set to 0 (disabled) by default.

  • "segment.ms" : no change in meaning.  However, if "max.compaction.lag.ms" is set to a non-zero smaller value than segment.ms, segment.ms doesn't really control the rolling time. The active segment is forced to roll when either "max.compaction.lag.ms" or "segment.ms" (log.roll.ms and log.roll.hours) has reached.  We recommend setting "max.compaction.lag.ms" to a value greater than "segment.ms" in practice.

  • min.cleanable.dirty.ratio : no change in meaning. However, the compaction decision that made based on "max.compaction.lag.ms" will override the compaction decision made based on "min.cleanable.dirty.ratio".

  • min.compaction.lag.ms : no change in meaning. We recommend setting "min.compaction.lag.ms" to a smaller value than "max.compaction.lag.ms".  If not, "max.compaction.lag.ms" has higher priority.

  • All above changes are only applicable for topics when compaction is enabled.

Compatibility, Deprecation, and Migration Plan

  • By default "max.compaction.lag.ms" is set to 0 and this time-based log compaction policy is disabled.  There are no compatibility issues and no migration is required. This Change focuses on when to compact a log segment, and it doesn’t conflict with KIP-280, which focuses on how to compact log.

Rejected Alternatives

  • One way to force compaction on any cleanable log segment is setting “min.cleanable.dirty.ratio” to 0. However, compacting a log partition whenever a segment become cleanable (controlled by "min.compaction.lag.ms") is very expensive.  We still want to accumulate some amount of log segments before compaction is kicked out.

  • Prefer "min.compaction.lag.ms" over "max.compaction.lag.ms".  We decide not to honor "min.compaction.lag.ms" if "min.compaction.lag.ms" is conflict with "max.compaction.lag.ms". The reason is that we need to provide a stronger guarantee that deletion request can be fulfilled on time.  For example, if "min.compaction.lag.ms" is set to two days and "max.compaction.lag.ms" is set to one day, the log will not be compacted in the first day but the log will be compacted after first day.  However, this conflict setting may lead to frequent compaction since there is no time interval to accumulate cleanable segments. Considering the previous example, a log segment is uncleanable in the first day, but it will trigger a compaction as soon as it enters the second day time window.  Due to this reason, we strongly recommend to set "max.compaction.lag.ms" higher than "min.compaction.lag.ms", and leave some time interval in between to accumulate some cleanable log segments that can be compacted in a single compaction run.



  • No labels