DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: Mailing list discussion - to be updated]
JIRA: KAFKA-20047 - Getting issue details... STATUS
Released: XXX
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Economic Challenge of Tiered Storage
KIP-405 (Tiered Storage) transformed Kafka's architecture by decoupling compute from storage, enabling infinite retention through remote object storage (S3, GCS, Azure Blob). While this reduces storage costs by 30-40%, it introduces a critical operational challenge: variable operational expenses for data access.
In traditional Kafka deployments, read operations have zero marginal cost once infrastructure is provisioned. With Tiered Storage, remote fetch operations incur direct costs:
- API Request Charges: Cloud providers charge per 1,000 GET requests (e.g., $0.0004 on AWS S3)
- Data Egress Charges: Cross-AZ or cross-region transfers incur substantial fees (e.g., $0.01-$0.09 per GB)
Sequential Fetch Limitation
Due to KAFKA-14915, the broker currently fetches only one partition per remote fetch request, rather than batching across partitions. This architectural constraint means consumers reading from multi-partition topics generate a higher volume of individual API GET requests than theoretically necessary. This amplifies the financial impact of remote storage access and makes precise per-client cost attribution even more critical for identifying which consumers are most affected by this sequential fetch behavior.
The Visibility Gap
Despite KIP-963 providing broker-level metrics for Tiered Storage health monitoring, a critical gap remains: operators cannot attribute remote storage costs to specific consumer applications.
Current limitations:
- KIP-963 metrics aggregate at the topic level (e.g., RemoteFetchBytesPerSec per topic)
- No visibility into which client-id or consumer-group is driving remote fetch costs
- Impossible to implement chargeback or showback models in multi-tenant environment
Real-world impact: A misconfigured consumer performing a full historical scan can generate thousands of dollars in S3 costs without detection until the monthly bill arrives.
Use Cases Requiring Cost Attribution
- Multi-Tenant Chargeback: Enterprise clusters serving 100+ teams need to bill specific cost centers for their remote storage consumption
- Rogue Consumer Detection: Identify consumers with auto.offset.reset=earliest causing unexpected cost spikes
- Optimization Guidance: Detect inefficient fetch patterns (small fetch sizes causing high API request counts)
- Compliance Auditing: Track which applications accessed historical archives for regulatory requirements
- Financial Quotas: Implement real-time cost-based throttling to prevent bill shock
Business Significance
From a business perspective, KIP-1267 represents the foundational architecture for financial governance in streaming data. It transitions Kafka from a "black box" of infrastructure spend to a transparent, auditable platform compliant with enterprise FinOps standards. This contribution enables organizations to:
- Implement Granular Chargeback: Accurately bill specific cost centers for their historical data consumption
- Enforce Financial Quotas: Detect and throttle "rogue" consumers based on cost velocity rather than just bandwidth
- Optimize Cloud Spend: Identify inefficient consumption patterns (e.g., small fetch sizes causing high API costs) and drive architectural improvements
The following sections detail the technical specification, implementation strategy, and validation plans for this enhancement, serving as a guide for architecting financial accountability in multi-tenant Kafka ecosystems.
Proposed Changes
New Metrics
This KIP proposes a new JMX metric group RemoteFetchMetrics with client-level attribution:
1.RemoteFetchBytesPerSec
- Type: Rate metric (bytes/second) with total count
- Tags: client-id, topic, partition (optional)
- Description: Tracks bytes transferred from remote storage per client
- Use: Calculate data egress costs
2.RemoteFetchRequestsPerSec
- Type: Rate metric (requests/second) with total count
- Tags: client-id, topic
- Description: Tracks number of remote fetch operations per client
- Use: Calculate API request costs; identify inefficient fetch patterns
3.RemoteFetchLatency
- Type: Histogram (p50, p95, p99)
- Tags: client-id, topic
- Description: Measures remote fetch operation latency
- Use: Differentiate between slow storage and slow consumers
4. RemoteFetchErrorsPerSec
- Type: Rate metric (errors/second) with total count
- Tags:
client-id,topic,error-type(optional) - Description: Tracks failed remote fetch attempts per client
- Use: Attribute API request costs for failed operations; identify problematic consumers
Note: Cloud providers often charge for API GET requests even when requests fail due to timeouts or storage errors. This metric is essential for complete financial attribution.
JMX ObjectName Structure
kafka.server:type=RemoteFetchMetrics,name=RemoteFetchBytesPerSec,client-id={client_id},topic={topic_name}
Configuration Parameters
| Configuration | Type | Default | Description |
|---|---|---|---|
| remote.log.metrics.cost.attribution.enabled | Boolean | false | Master switch to enable client-level metrics |
| Int | 1000 | Maximum unique client-ids tracked (LRU eviction) |
| remote.log.metrics.include.partition | Boolean | false | Include partition tag (increases cardinality) |
Relationship to KIP-963
KIP-963 introduced RemoteFetchRequestsPerSec and RemoteFetchBytesPerSec at the topic level under BrokerTopicMetrics. KIP-1267 does not replace these metrics. Instead, it provides a high-resolution, opt-in view of the same data with client-level attribution. The KIP-963 metrics remain the primary operational health indicators, while KIP-1267 metrics enable FinOps and chargeback use cases that require identity context.
Public Interfaces
Modified Classes
RemoteStorageFetchInfo
Add optional clientId field to propagate request context:
public class RemoteStorageFetchInfo {
private final Optional<String> clientId;
public RemoteStorageFetchInfo(..., Optional<String> clientId) {
this.clientId = clientId;
}
public Optional<String> clientId() {
return clientId;
}
}
New Metrics Registry
New metric group: kafka.server:type=RemoteFetchMetrics
This is separate from BrokerTopicMetrics to isolate high-cardinality client-level data.
Proposed Implementation
Architecture Overview
The implementation follows a "surgical instrumentation" approach with minimal changes to the data path:
- Context Propagation (ReplicaManager): Extract clientId from FetchParams and inject into RemoteStorageFetchInfo
- Sensor Management (RemoteLogManager): Maintain ConcurrentHashMap with LRU eviction
- Metric Recording (RemoteLogManager): Record metrics in the async fetch callback upon successful completion
Key Implementation Points
1.ReplicaManager Modification
- When constructing RemoteStorageFetchInfo for remote fetches, extract and pass the clientId from the incoming FetchRequest.
2,RemoteLogManager Instrumentation
- Check remote.log.metrics.cost.attribution.enabled configuration
- Lookup or create sensor for (clientId, topic) tuple (subject to LRU limits)
- Record request count immediately upon task submission
- Record byte count in the completion callback after successful fetch
- Record error count for failed fetch attempts
Billing Accuracy: RemoteFetchBytesPerSec is recorded in the async completion callback only after successful data transfer. This ensures the metric reflects actual payload bytes that match cloud provider billing (e.g., AWS S3 does not charge egress for failed transfers). Failed fetches increment RemoteFetchErrorsPerSec but not RemoteFetchBytesPerSec.
3.Thread Safety
- Use Kafka's thread-safe Metrics library (atomic accumulators)
- Metric recording occurs on RemoteLogManager's thread pool (not network I/O threads)
- ConcurrentHashMap for sensor cache with LRU eviction
4. MBean Lifecycle Management
When a sensor is evicted from the LRU cache due to the remote.log.metrics.max.client.sensors limit, its corresponding JMX MBean will be immediately unregistered from the JMX registry. This ensures bounded memory usage and prevents MBean leaks as client populations churn over time.
Performance Characteristics
- CPU Overhead: < 1.2% (hash map lookup + metric recording)
- Latency Impact: ~0.3ms added to fetch path (negligible compared to 50-500ms S3 latency)
- Memory Footprint: ~5KB per sensor × 1000 max sensors = ~5MB heap
Financial Governance Framework
This section demonstrates how KIP-1267 enables financial accountability in multi-tenant Kafka environments.
Cost Attribution Model
KIP-1267 provides the telemetry needed to calculate per-client costs using the following formula:
Cost_Client = (V_Egress × R_Egress) + (N_Requests × R_API)
Where:
• VEgress = Total bytes from RemoteFetchBytesPerSec (aggregated count)
• REgress = Cloud provider's egress rate (e.g., $0.09/GB for Internet, $0.01/GB for Inter-AZ)
• NRequests = Total count from RemoteFetchRequestsPerSec + RemoteFetchErrorsPerSec
• RAPI = Cloud provider's API rate (e.g., $0.0004 per 1,000 GET requests)
Governance Models Enabled
Organizations can implement three levels of financial maturity:
Level 1: Showback Model
- Mechanism: Dashboards displaying top consumers by cost
- Goal: Visibility and awareness
- Implementation: Grafana dashboards with no billing system integration required
- Outcome: Teams voluntarily optimize consumption patterns based on visibility
Level 2: Chargeback Model
- Mechanism: Automated monthly cost reports per consumer group
- Goal: Cost recovery and accountability
- Implementation: Export metrics to billing systems for internal invoicing
- Outcome: Platform teams can allocate actual costs to consuming business units, transforming Kafka from a cost center to a cost pass-through service
Level 3: Real-Time Cost Enforcement
- Mechanism: Stream processing of KIP-1267 metrics with automated quota application
- Goal: Cost prevention
- Implementation: Monitor RemoteFetchBytesPerSec per client-id and trigger quota enforcement when thresholds are exceeded
- Example: If client-id=team-a exceeds $50/hour in remote fetch costs, automatically apply throughput quotas to prevent runaway spending
- Outcome: Prevents unexpected cost spikes before monthly billing cycles complete
Enterprise Adoption Impact
KIP-1267 addresses a critical barrier to Tiered Storage adoption in enterprise environments. Without cost attribution, organizations cannot:
- Implement fair billing in multi-tenant clusters
- Detect and prevent cost anomalies in real-time
- Optimize consumption patterns based on financial impact
- Meet compliance requirements for data access auditing
By providing granular cost visibility, this KIP transforms Tiered Storage from a feature with uncertain cost implications into a governable, predictable storage strategy suitable for regulated industries requiring infinite retention capabilities.
Integration with Existing Kafka Features
The metrics provided by KIP-1267 can be integrated with:
- Kafka Quotas: Use cost metrics to inform quota policies
- ACLs: Combine with access control for comprehensive governance
- Monitoring Systems: Export to Prometheus, Datadog, or CloudWatch for alerting
- FinOps Platforms: Feed into enterprise cost management tools
Security Considerations
The metrics introduced by this KIP expose client-id information through JMX endpoints. Organizations should:
- Restrict JMX access to authorized operators only
- Consider implementing authentication for metric exporters (Prometheus, etc.)
- Be aware that
client-idvalues may contain sensitive team/application identifiers - Use Kafka ACLs to control which principals can view cost attribution dashboards
No changes to Kafka's authorization model are required; existing JMX security practices apply.
Compatibility, Deprecation, and Migration Plan
Backward Compatibility
- Wire Protocol: No changes
- Client Libraries: No changes required
- Storage Plugins: No changes to RemoteStorageManager interface
- Existing Metrics: KIP-963 metrics remain unchanged; KIP-1267 metrics are additive
Migration Strategy
Phase 1 - Preparation:
- Upgrade brokers to version containing KIP-1267
- Ensure remote.log.metrics.cost.attribution.enabled=false (default)
- Verify cluster stability
Phase 2 - Canary Activation:
- Enable on single broker: remote.log.metrics.cost.attribution.enabled=true
- Monitor JMX endpoints for new metrics
- Validate metric accuracy
Phase 3 - Observability Integration:
- Configure Prometheus JMX Exporter
- Deploy Grafana dashboards
- Validate metric aggregation
Phase 4 - Full Rollout:
- Enable on all brokers
- Begin chargeback data collection
Rollback Plan
Instant rollback via dynamic configuration: set remote.log.metrics.cost.attribution.enabled=false. This immediately stops metric recording without requiring broker restart.
Test Plan
Unit Tests
- Verify exact byte count attribution for mock remote fetches
- Test LRU eviction with max.client.sensors limit
- Validate context propagation from FetchRequest to RemoteStorageFetchInfo
- Verify MBean unregistration upon sensor eviction
Integration Tests
- Multi-tenant simulation: verify independent attribution for concurrent consumers
- Fault tolerance: ensure failed fetches don't increment byte metrics
- Cardinality safety: verify sensor map size limits
- Error attribution: validate that failed S3 requests are correctly attributed to client-id
Performance Tests
Test Environment:
- Cluster: 3 brokers, 32GB RAM, 8 vCPUs each
- Workload: 10,000 msg/sec, 1KB message size
- Consumers: 50 concurrent consumer groups
- Test duration: 24 hours
- Remote storage: AWS S3 Standard
Targets:
- Benchmark throughput degradation (target: < 1%)
- Measure CPU overhead (target: < 2%)
- Validate latency impact (target: < 1ms)
Operational Guide
Prometheus Integration
JMX Exporter configuration: YAML
rules:
- pattern: kafka.server<type=RemoteFetchMetrics, name=RemoteFetchBytesPerSec, client-id=(.+), topic=(.+)><>Count
name: kafka_server_remote_fetch_bytes_total
labels:
client_id: "$1"
topic: "$2"
type: COUNTER
- pattern: kafka.server<type=RemoteFetchMetrics, name=RemoteFetchRequestsPerSec, client-id=(.+), topic=(.+)><>Count
name: kafka_server_remote_fetch_requests_total
labels:
client_id: "$1"
topic: "$2"
type: COUNTER
- pattern: kafka.server<type=RemoteFetchMetrics, name=RemoteFetchErrorsPerSec, client-id=(.+), topic=(.+)><>Count
name: kafka_server_remote_fetch_errors_total
labels:
client_id: "$1"
topic: "$2"
type: COUNTER
Example PromQL Queries
Total bytes by client (30 days):
sum(increase(kafka_server_remote_fetch_bytes_total[30d])) by (client_id)
Estimated hourly cost:Promql
sum(rate(kafka_server_remote_fetch_bytes_total[1h])) by (client_id) * 0.00000000009
Fetch efficiency (bytes per request):Promql
sum(rate(kafka_server_remote_fetch_bytes_total[1h])) by (client_id) / sum(rate(kafka_server_remote_fetch_requests_total[1h])) by (client_id)
Error rate by client:Promql
sum(rate(kafka_server_remote_fetch_errors_total[1h])) by (client_id)
Rejected Alternatives
Topic-Level Only Attribution
Approach: Map topics to teams via external CMDB.
Rejection Reason: Fails for shared topics consumed by multiple teams. Cannot determine which consumer is driving costs.
Client-Side Telemetry (KIP-714)
Approach: Have clients report their own fetch statistics.
Rejection Reasons:
- Financial billing cannot rely on self-reported data (trust issue)
- Requires client upgrades (KIP-1267 works with all existing clients)
- Broker is the authoritative source for billing
S3 Access Log Parsing
Approach: Parse cloud provider access logs to attribute costs.
Rejection Reason: S3 logs only contain broker IP addresses, not client-ids. Correlation is impossible without broker-side instrumentation.
References
• KIP-405: Kafka Tiered Storage
• KIP-963: Additional metrics in Tiered Storage
• KIP-714: Client Metrics and Observability