DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Draft
Motivation
Apache Kafka has become the de facto standard for event streaming, with a growing ecosystem of Kafka-compliant services and implementations. While these services conform to the wire protocol, they differ drastically in their Quality of Service (QoS) characteristics—including latency, throughput, elasticity, storage architecture, and observability.
Today, users and applications operate with implicit assumptions or vendor-specific guarantees regarding performance and reliability. However, Kafka lacks a standard mechanism to declare, negotiate, and observe QoS characteristics. This results in a fragmented landscape with varying, often opaque, performance characteristics.
This KIP proposes the definition and implementation of a QoS framework to:
- Declare desired service characteristics (asks/offers)
- Measure actual performance metrics (observations)
- Enable compatibility and SLA alignment between producers, brokers, and consumers
- Lay the foundation for automation, governance, and cost transparency
Two types of QoS grammars need to be developed: the first is a form of asks or offers — an ideal or desired QoS, such as to meet a certain latency SLA, or to prepare a Kafka cluster for an anticipated volume of traffic. A second would be to measure actual QoS, as would be conducted by observability tools, methods and systems. Comparisons could then be made between desired states and actual performance.
Any QoS implementation protocols and methods should be open standards, free of vendor bias as much as possible, while still allowing for customization and extensibility for advanced features that one vendor or implementation might support that others do not (or do not yet).
Proposed Changes
- QoS Declarations: Allow producers and consumers to declare desired QoS in their configurations.
- Cluster Capabilities Description: Brokers will expose supported QoS ranges, capabilities (e.g., self-balancing, storage tiering, autoscaling), and current limits.
- QoS Negotiation: A negotiation mechanism to reconcile producer/consumer expectations with broker capabilities.
- Observability Integration: Define standard metrics to report actual observed QoS (e.g., end-to-end latency, data freshness, throughput).
- QoS in Topic Configuration: Enable topic-level QoS annotations that can act as policy templates or governance guides.
Proposed Public Interfaces
This KIP introduces the concept of a QoS grammar that can be expressed in topic-level configurations, cluster-level capabilities, producer/consumer client metadata, and observability endpoints.
QoS Metrics Measurements and Responses
The system measures and responds to Quality of Service (QoS) requirements in order to optimize cost, latency, and availability on a per-topic basis. Each topic has a requestedQoS policy attached, which may specify targets for characteristics such as:
- Latency (e.g., represented as a desired percentile, such as p99, and a desired end-to-end latency measured in milliseconds such as < 50ms)
- Throughput (e.g., number of objects per second, either fixed, or a variable rate with an anticipated not-to-exceed value)
- Payload size (e.g., either a fixed size, or a variable size with an anticipated not-to-exceed value)
- Availability (e.g., >= 99.99% message delivery success)
- Durability (e.g., replication factor = 3, acknowledged by majority)
- Retention Period (e.g., message time-to-live, such as 3 days)
- Cost sensitivity (e.g., prefer low-cost object storage when possible)
There are also metadata associated with policies so they can be properly ascribed to topics and versioned:
- Topic Name (the topic name associated with the policy)
- Policy name and/or ID (methods to identify one specific policy from another, either by an alphanumeric tag or some form of unique ID such as UUID, ULID or NanoID)
- Policy version (methods to identify different versions of the same policy over time)
- Policy creation and update datetime stamps, in ISO 8601 format
This enumeration of characteristics should be considered illustrative but not exhaustive. Additional characteristics for topic-level QoS can be added over time, either through revision of this KIP or through the implementation of new related KIPs.
As well, which policy elements are REQUIRED, RECOMMENDED, or OPTIONAL will need to be defined in the final KIP proposal.
In practice, a producer should provide a requestedQoS policy when it first creates a topic. The Kafka cluster can interrogate the policy to see if it can meet the policy requirements. Some Kafka clusters may be fixed in capability and unable to meet the policy requirements. Other Kafka clusters may have more adaptive capabilities, and will be able to reconfigure or further provision resources to be able to meet the policy requirements.
If a cluster is unable to meet the requestedQoS the producer may be able to find a different Kafka-compatible cluster that can meet its topic-level requirements, or it may enter into a dynamic negotiation with the cluster to determine which, if any, of the desired policy requirements can be met.
Further downstream, consumers can also provide their own requestedQoS policies. The producer and the consumer may need to negotiate, through the cluster, how to establish the subscription. Fast producers and slow consumers are a common mismatch in Kafka connections; a consumer may only be able to ingest data at a certain fixed rate slower than a producer may be generating data. The QoS system could, in theory, help negotiate this, for example, via downsampling using Apache Flink.
To ensure compliance with these policies, each topic continuously reports both the requested QoS and the achieved QoS. The achieved QoS is calculated by the system based on runtime metrics collected by the control plane and brokers:
QoS Metadata Format (example schema)
{
"desired-latency": {
"p99": "50ms"
},
"desired-max-freshness": "00:01:00:00",
"expected-throughput": "2M/s",
"expected-payload-size": {
"min": "500B",
"max": "10MB",
"mode": "<1KB"
},
"elasticity": {
"min-partitions": 10,
"max-partitions": 1000
},
"priority": "high",
"retention": {
"duration": "30d",
"tier": "hot"
},
"schema-evolution-aware": true
}
Achieved QoS Measurements
Metrics collected per topic may include:
- End-to-end latency percentiles, including network delay, storage delay, and broker queuing time.
- Message durability and persistence delay, based on replication acknowledgments and storage tier write acknowledgments.
- Availability and failure rates, including delivery failures, retries, and degraded read paths.
- Throughput and backpressure, identifying if a topic is throttled due to under-provisioning or cost tradeoffs.
- Storage tier latency vs. cost profiles, showing whether messages were served from local disk, remote cache, or object storage.
QoS metrics should be emitted via standardized topic-level metrics available through existing metrics systems (e.g., JMX, Prometheus exporters) or a new dedicated internal QoS topic (e.g., __qos-metrics). These should include:
- Desired vs. actual latency (p95, p99, p999)
- Desired vs. actual throughput (events/sec)
- Max payload size observed vs. max allowed/requested
- Message loss rate (if any)
- Data freshness and ingestion delay
- Partition skew or imbalance
- Replica lag and ISR (in-sync replicas) health
Topics could self-report these metrics periodically or brokers could aggregate and emit them. Consumers and producers could subscribe to these metrics via an internal API or external metrics collection endpoint. These reports may include timestamps, moving average windows, and confidence levels.
QoS Metrics Format (example schema)
{
"topic": "sensor-stream",
"window": "5m",
"desired-latency-p99": "100ms",
"actual-latency-p99": "127ms",
"desired-throughput": "500k msgs/sec",
"actual-throughput": "491k msgs/sec",
"max-payload-size-requested": "1MB",
"max-payload-size-observed": "987KB"
}QoS Evaluation and Reporting
Each topic periodically emits a QoS Status Report containing:
- requestedQoS: The original policy for latency, availability, durability, etc.
- achievedQoS: Actual measured values over a time window.
- complianceStatus: One of:
- IN_COMPLIANCE
- NEEDS_ATTENTION (drift observed but within acceptable tolerance)
- VIOLATED (outside of bounds)
These reports are exposed via the system dashboard, APIs, and alerting tools. This allows users and the control plane to understand whether a topic's SLAs are being met.
System Response to QoS Drift or Violation
When a drift between requested and achieved QoS is detected, the control plane may take corrective action:
- Storage Tier Promotion/Demotion: Automatically promote hot topics from object storage to SSD or memory cache if latency SLAs are unmet.
- Replica Adjustment: Increase replication or change acknowledgment quorum if durability or availability is not achieved.
- Dynamic Resource Allocation: Scale up compute or networking resources for affected topics.
- User Notification and Guidance: Inform topic owners when they are consistently out of bounds and suggest policy or configuration changes (e.g., increase cost tolerance or adjust throughput targets).
Kafka Cluster Characteristics
While individual Kafka topics will be the primary QoS policy level, clusters themselves can provide descriptions of their overall QoS based upon various factors: topology, capacity, performance, scalability, availability and even affordability. For example, a cluster may have an overall network bandwidth capacity, against which each individual topic that is supported deprecates a remaining availability metric.
Examples:
- A cluster is capable of processing 1 GB of data per second, and a total of 100 TB of storage.
- Topic A produces a fixed rate of 500k messages per second, each of which is a fixed 1 KB in size.
- Topic A would require 0.5 GB of data per second, leaving an available cluster throughput capacity of 0.5 GB data per second (50% capacity remaining). It would fill 43.2 TB of data per day, meaning it would require 43.2% of total cluster capacity if the topic had a TTL of 24 hours.
- Topic B produces a variable rate, with bursts of up to 1 million messages per second, each of which can be up to 1 MB in size. However, most messages (the mode for the topic) are ≤1 KB, and the general rate of throughput is usually <100k messages per second.
- Topic B, at maximum throughput, would require 1,000 GB (1 TB) of throughput per second. It would fill the entire cluster up in 100 seconds even if it was entirely empty at the start of ingestion. This topic is capable of far exceeding cluster capacity — oversubscription — requiring a negotiation for times of high throughput, such as throttling or downsampling the topic. However, generally it would require only 100 MB/sec throughput; only 10% of overall cluster throughput capacity. This would amount to 8.64 TB/day; if given a 24 TTL, that would be less than 10% of total storage capacity. If it exceeded a storage capacity limit, various storage retention policies could be applied.
Clusters may also be able to describe different service levels for IO latency and throughput if they use different memory and storage media, such as in-memory (cached) topics, locally-attached NVMe Solid State Drives (SSD), network attached storage such as AWS Elastic Block Storage (EBS), distributed object storage such as Amazon S3 or Google Cloud Storage, or even rotating Hard Disk Drives (HDD).
In theory, a cluster with dynamic provisioning capabilities could deploy different types of servers and storage capacity to meet requestedQoS demands per topic as needed.
Even individual topics may be able to be deployed to a hybrid storage architecture, with some element of the topic (say, the past 24 hours of data) to fast NVMe, with the older data in the topic (from 24 hours to 72 hours) to object storage, after which the data in the topic is set with a TTL to be deleted.
Producers may or may not be able to specify the storage media their topics are deployed to in the cluster. For example, serverless deployments may entirely obscure such details. The producer should at least be able to request general requirements, such as throughput and latency, leaving the server to interpret how to fulfill them in a serverless manner, opaque to the producer.
Compatibility, Deprecation, and Migration Plan
- No existing APIs are broken.
- QoS capabilities will be opt-in and backward compatible.
- Clients and brokers that don’t support QoS negotiation or observability will function as-is.
Use Cases
- Vendor Differentiation: Describe the capabilities of a Kafka-compliant service transparently.
- SLA Negotiation: Form the basis of SLO/SLA/SLC between producers, brokers, and consumers.
- Throttling and Backpressure: Use QoS metrics to drive intelligent throttling.
- Autoscaling: Preemptively scale infrastructure based on declared throughput expectations.
- Storage Tiering: Route data to appropriate storage based on lifecycle and access patterns.
- Cost Optimization: Align cost and performance by selecting appropriate QoS levels.
Prior Art and References
- KIP-864
- OpenMessaging Benchmark
- RFC 3317: QoS Policy Info Base
- RFC 3644: QoS Information Model
Future Work
- Define a formal grammar (e.g., JSON Schema or protobuf) for QoS metadata
- Add QoS negotiation APIs to Kafka clients
- Integrate QoS metrics into JMX and logging outputs
- Establish community-led working groups for specific QoS dimensions (latency, elasticity, retention, etc.)
- Explore alignment with schema evolution and data contracts
Conclusion
As Kafka evolves to serve more diverse workloads and industries, the need for transparent, standardized, and negotiable QoS has become essential. This KIP aims to establish the foundational work for a vendor-neutral, extensible QoS framework for Apache Kafka.
We invite the Kafka community to comment, contribute, and co-author this vision.