Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Current state: Under Discussion


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Kafka Streams exposes metrics on various levels. The number of metrics grows with the number of stream threads, the number of tasks (i.e., number of subtopologies and number of partitions), the number of processors, the number of state stores and the number of buffers in a Kafka Streams application.


In this KIP, we propose to add an API a metric reporter to the Kafka Streams client that adds a metric that records the aggregation of other metrics. The metric that records the aggregation can then be reported to the monitoring service instead of reporting multiple metrics that would be aggregated in the monitoring service anywaysused to aggregate metrics before they are reported to a monitoring service. In such a way users can avoid exceeding the limit of number of reported metrics of the monitoring service and the associated possible false alerts.

Public Interfaces

Code Block
package org.apache.kafka.streams;

public abstract class KafkaStreamsMetricsAggregations implements AutoCloseableMetricsReporter {
    	public static class ValuesProvider<V> MetricsAggregationConfig<AGG,implements V>Iterable<V> {

        		public final String name;
        public final String description;Iterator<V> iterator();
	public interface MetricRegistrar<AGG, V> {
        publicValuesProvider<V> register(final List<String> tagLabels;
        public final RecordingLevel recordingLevelMap<String, String> tags);
        public final AGG initialAggregatevoid deregister();
        public final BiFunction<AGG, V, AGG> aggregationFunction;

        public MetricsAggregationConfig}
	protected <AGG, V> void addAggregation(final String namenameOfAggregation,
                                        final String description,
                                        final List<String> tagLabelsfinal String groupOfMetricsToAggregate,
                                        final RecordingLevel recordingLevel,
                                        final AGG initialAggregate,
      final String nameOfMetricsToAggregate,
                                  final BiFunction<AGG, V, AGG> aggregationFunction);

    public <AGG, V> void aggregateMetrics(final MetricsAggregationConfig<AGG, V> metricsAggregationConfigCollection<String> tagsForGrouping,
                                           final String groupOfMetricsToAggregateMetricRegistrar<AGG,
                                          final String nameOfMetricsToAggregate);
}V> metricRegistrar);


Proposed Changes

We propose to add the above method to the KafkaStreams class. metrics reporter to the Kafka Streams library. The reporter needs to be extended by the users and the extended class will be passed to the Kafka Streams config metric.reporters. The behavior of the method reporter is described in this section.

As any other metrics reporter passed to a Kafka Streams client, this reporter will be instantiated and its method configure() will be called with a map that contains the application ID of the Kafka Streams as client will be passed to the method.

Method KafkaStreams#addMetricsAggregation() will create one or more metrics on client-level that record the aggregation of the metrics specified by the arguments groupOfMetricsToAggregate and nameOfMetricsToAggregate. Before the specified metrics are aggregated, they will be grouped by the tag labels provided in argument tagLabels. For example, if users want to aggregate state-store-level metric size-all-mem-tables (RocksDB specific metric) grouped by stream threads, they will provide the name size-all-mem-tables as argument nameOfMetricsToAggregate, the type stream-state-metrics as argument groupOfMetricsToAggregate, and the list of tag labels [thread-id] as argument tagLabels. If they additionally want to aggregate the metrics by task, they will provide [thread-id, task-id] as argument tagLabels. If users want to just aggregate by task, they can will provide [task-id] as argument tagLabels


Code Block
	new MetricsAggregationConfig{
		"records the aggregation of the sizes of all mem-tables grouped by stream threads and task",
		Arrays.asList("thread-id", "task-id"),
		() -> BigInteger.valueOf(0),

Compatibility, Deprecation, and Migration Plan

The proposal is backward-compatible because it only adds a new method and does not change any existing methods.

No methods need to be deprecated and no migration plan is required.

Rejected Alternatives

  • Add the method to the StreamsMetrics interface: Adding the method to the StreamsMetrics interface would imply that the method could be called from everywhere within a processor that has access to an implementation of the StreamsMetrics interface. That would require more concurrency control than adding the method to the KafkaStreams class. In our opinion, the value of adding the method to the StreamsMetrics interface does not outweigh the additional costs of concurrency control mechanisms, thus we rejected this approach.