Status
Current state: "Accepted"
Discussion thread:
JIRA: KAFKA-13846
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Concurrent thread may try to access the Metrics registry to create the same, instance-level metric, however it's get/create APIs are not well suited for it. A common pattern that users follow today is:
metric = metrics.metric(metricName); if (metric == null) { try { metrics.addMetric(..) } catch (IllegalArgumentException e){ // another thread may create the metric at the mean time } }
Here's an example on how the AK codebase uses the above pattern:
The above process basically consists of 2 steps:
- Check if a Metric of interest exists or not.
- If yes, an IllegalArgumentException would be thrown which could be due to race conditions as well. We might just want to catch this exception and swallow it - as long as the metric gets created.
- Else, create the metric.
The main motivation of this KIP is to expose an API which would make these operations atomic. That ways, users won't need to remember these steps and can just focus on having a metric created.
Public Interfaces
A new public facing method getMetricOrElseCreate would be exposed. This would create a non-existing metric or create a return the Metric if it already exists. This way, users don't need to add extra logic to take respective actions in case of presence/absence of metrics. Note that this method takes care of synchronisation while updating/accessing metrics by concurrent threads.
/** * Create or get an existing metric to monitor an object that implements MetricValueProvider. * This metric won't be associated with any sensor. This is a way to expose existing values as metrics. * This method takes care of synchronisation while updating/accessing metrics by concurrent threads. * * @param metricName The name of the metric * @param metricValueProvider The metric value provider associated with this metric * @return Existing KafkaMetric if already registered or else a newly created one */ public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider<?> metricValueProvider) { KafkaMetric metric = new KafkaMetric(new Object(), Objects.requireNonNull(metricName), Objects.requireNonNull(metricValueProvider), config == null ? this.config : config, time); KafkaMetric existingMetric = registerMetric(metric); return existingMetric == null ? metric : existingMetric; }
Proposed Changes
Adding a new function above to the Metrics API. As part of this change, the
registerMetric
method's return type would be changed fromvoid
toKafkaMetric
. It would return a KafkaMetric object if the requested metric already exists or return null if not after creating/registering the metric. For backward compatibility reasons, any place currently which relied onIllegalArgumentException
would now instead check the output ofregisterMetric
and throw anIllegalArguementException
when the returned value ofregisterMetric
is non-null. This change would happen in =>Metrics.addMetric
, 2Sensor.add
methods. On the other hand, getMetricOrElseCreate method would simply return the object returned byregisterMetric if not null.
/** * Register a metric if not present or return an already existing metric otherwise. * When a metric is newly registered, this method returns null * * @param metric The KafkaMetric to register * @return KafkaMetric if the metric already exists, null otherwise */ synchronized KafkaMetric registerMetric(KafkaMetric metric) { MetricName metricName = metric.metricName(); KafkaMetric existingMetric = this.metrics.putIfAbsent(metricName, metric); if (existingMetric != null) { return existingMetric; } // newly added metric for (MetricsReporter reporter : reporters) { try { reporter.metricChange(metric); } catch (Exception e) { log.error("Error when registering metric on " + reporter.getClass().getName(), e); } } log.trace("Registered metric named {}", metricName); return null; }
Compatibility, Deprecation, and Migration Plan
The changes are backward compatible and needs no deprecation/migration.
Rejected Alternatives
N/A