Table of Contents |
---|
Status
Current state: Under Discussion
Discussion thread: here
JIRA:
Motivation
Messages stored in Kafka will expire and disappear silently based on retention time and size configuration. Consumers have no way to know they have missed messages.
...
The compacted topics are out of scope, because when users chose compacted topics they are interested on the last value of they key, not on the intermediary states.
Public Interfaces
A new property at topic level should be created:
...
Metric / Attribute Name | Description | MBEAN NAME |
---|---|---|
non-consumed-total | Number of of messages expired without being consumed by a consumer group, per topic and partition |
Proposed Changes
Currently the LogManager schedule the "kafka-delete-logs" thread, that will call the deleteLogs() method. Is possible to add into that method the metric to expose the number of offsets non consumed by a list of consumer groups.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private def deleteLogs(): Unit = { var nextDelayMs = 0L try { def nextDeleteDelayMs: Long = { if (!logsToBeDeleted.isEmpty) { val (_, scheduleTimeMs) = logsToBeDeleted.peek() scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds() } else currentDefaultConfig.fileDeleteDelayMs } while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) { val (removedLog, _) = logsToBeDeleted.take() if (removedLog != null) { try { removedLog.delete() info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") // // KIP-490: log when consumer groups lose a message because offset has been deleted // val consumerGroupsSubscribed : Seq[String] = getConsumerGroups(removedLog.topicPartition.topic() ); val groupsToNotify : Seq[String] = consumerGroupsSubscribed intersect groupsTobeNotify // value get from topic config property 'retention.notify.groups' groupsToNotify.forEach( { val lastCosumedOffsetGroup : Integer = getLastOffsetConsumed( _, removedLog.topicPartition); if(lastCosumedOffsetGroup < removedLog.nextOffsetMetadata) { // increment and expose JMS metric non-consumed-total. } }) } catch { case e: KafkaStorageException => error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e) } } } } catch { case e: Throwable => error(s"Exception in kafka-delete-logs thread.", e) } finally { try { scheduler.schedule("kafka-delete-logs", deleteLogs _, delay = nextDelayMs, unit = TimeUnit.MILLISECONDS) } catch { case e: Throwable => if (scheduler.isStarted) { // No errors should occur unless scheduler has been shutdown error(s"Failed to schedule next delete in kafka-delete-logs thread", e) } } } } |
Compatibility, Deprecation, and Migration Plan
There is no impact on existing features.
Rejected Alternatives
Logging on broker side
To write logs on the broker to side to alert that messages have not been consumed has been rejected in favour of standard monitoring. Main reason is the big amount of data that could be generated.
...