Status

Current state: Under Discussion

Discussion thread: Mailing list discussion - to be updated]

JIRA KAFKA-20047 - Getting issue details... STATUS

Released: XXX

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Economic Challenge of Tiered Storage

KIP-405 (Tiered Storage) transformed Kafka's architecture by decoupling compute from storage, enabling infinite retention through remote object storage (S3, GCS, Azure Blob). While this reduces storage costs by 30-40%, it introduces a critical operational challenge: variable operational expenses for data access.

In traditional Kafka deployments, read operations have zero marginal cost once infrastructure is provisioned. With Tiered Storage, remote fetch operations incur direct costs:

  • API Request Charges: Cloud providers charge per 1,000 GET requests (e.g., $0.0004 on AWS S3)
  • Data Egress Charges: Cross-AZ or cross-region transfers incur substantial fees (e.g., $0.01-$0.09 per GB)

Sequential Fetch Limitation

Due to KAFKA-14915, the broker currently fetches only one partition per remote fetch request, rather than batching across partitions. This architectural constraint means consumers reading from multi-partition topics generate a higher volume of individual API GET requests than theoretically necessary. This amplifies the financial impact of remote storage access and makes precise per-client cost attribution even more critical for identifying which consumers are most affected by this sequential fetch behavior.

The Visibility Gap

Despite KIP-963 providing broker-level metrics for Tiered Storage health monitoring, a critical gap remains: operators cannot attribute remote storage costs to specific consumer applications.

Current limitations:

  • KIP-963 metrics aggregate at the topic level (e.g., RemoteFetchBytesPerSec per topic)
  • No visibility into which client-id or consumer-group is driving remote fetch costs
  • Impossible to implement chargeback or showback models in multi-tenant environment

Real-world impact: A misconfigured consumer performing a full historical scan can generate thousands of dollars in S3 costs without detection until the monthly bill arrives.

Use Cases Requiring Cost Attribution

  1. Multi-Tenant Chargeback: Enterprise clusters serving 100+ teams need to bill specific cost centers for their remote storage consumption
  2. Rogue Consumer Detection: Identify consumers with auto.offset.reset=earliest causing unexpected cost spikes
  3. Optimization Guidance: Detect inefficient fetch patterns (small fetch sizes causing high API request counts)
  4. Compliance Auditing: Track which applications accessed historical archives for regulatory requirements
  5. Financial Quotas: Implement real-time cost-based throttling to prevent bill shock

Business Significance

From a business perspective, KIP-1267 represents the foundational architecture for financial governance in streaming data. It transitions Kafka from a "black box" of infrastructure spend to a transparent, auditable platform compliant with enterprise FinOps standards. This contribution enables organizations to:

  1. Implement Granular Chargeback: Accurately bill specific cost centers for their historical data consumption
  2. Enforce Financial Quotas: Detect and throttle "rogue" consumers based on cost velocity rather than just bandwidth
  3. Optimize Cloud Spend: Identify inefficient consumption patterns (e.g., small fetch sizes causing high API costs) and drive architectural improvements

The following sections detail the technical specification, implementation strategy, and validation plans for this enhancement, serving as a guide for architecting financial accountability in multi-tenant Kafka ecosystems.

Proposed Changes

New Metrics

This KIP proposes a new JMX metric group RemoteFetchMetrics with client-level attribution:

1.RemoteFetchBytesPerSec

  • Type: Rate metric (bytes/second) with total count
  • Tags: client-id, topic, partition (optional)
  • Description: Tracks bytes transferred from remote storage per client
  • Use: Calculate data egress costs

2.RemoteFetchRequestsPerSec

  • Type: Rate metric (requests/second) with total count
  • Tags: client-id, topic
  • Description: Tracks number of remote fetch operations per client
  • Use: Calculate API request costs; identify inefficient fetch patterns

3.RemoteFetchLatency

  • Type: Histogram (p50, p95, p99)
  • Tags: client-id, topic
  • Description: Measures remote fetch operation latency
  • Use: Differentiate between slow storage and slow consumers

4. RemoteFetchErrorsPerSec

  • Type: Rate metric (errors/second) with total count
  • Tags: client-id, topic, error-type (optional)
  • Description: Tracks failed remote fetch attempts per client
  • Use: Attribute API request costs for failed operations; identify problematic consumers

Note: Cloud providers often charge for API GET requests even when requests fail due to timeouts or storage errors. This metric is essential for complete financial attribution.


JMX ObjectName Structure

kafka.server:type=RemoteFetchMetrics,name=RemoteFetchBytesPerSec,client-id={client_id},topic={topic_name}


Configuration Parameters

ConfigurationTypeDefaultDescription
remote.log.metrics.cost.attribution.enabledBooleanfalseMaster switch to enable client-level metrics
remote.log.metrics.max.client.sensors
Int1000Maximum unique client-ids tracked (LRU eviction)
remote.log.metrics.include.partitionBooleanfalseInclude partition tag (increases cardinality)


Relationship to KIP-963

KIP-963 introduced RemoteFetchRequestsPerSec and RemoteFetchBytesPerSec at the topic level under BrokerTopicMetrics. KIP-1267 does not replace these metrics. Instead, it provides a high-resolution, opt-in view of the same data with client-level attribution. The KIP-963 metrics remain the primary operational health indicators, while KIP-1267 metrics enable FinOps and chargeback use cases that require identity context.

Public Interfaces

Modified Classes

RemoteStorageFetchInfo


Add optional clientId field to propagate request context:

code
public class RemoteStorageFetchInfo {
    private final Optional<String> clientId;

    public RemoteStorageFetchInfo(..., Optional<String> clientId) {
        this.clientId = clientId;
    }

    public Optional<String> clientId() {
        return clientId;
    }
}


New Metrics Registry

New metric group: kafka.server:type=RemoteFetchMetrics

This is separate from BrokerTopicMetrics to isolate high-cardinality client-level data.

Proposed Implementation

Architecture Overview

The implementation follows a "surgical instrumentation" approach with minimal changes to the data path:

  • Context Propagation (ReplicaManager): Extract clientId from FetchParams and inject into RemoteStorageFetchInfo
  • Sensor Management (RemoteLogManager): Maintain ConcurrentHashMap with LRU eviction
  • Metric Recording (RemoteLogManager): Record metrics in the async fetch callback upon successful completion

Key Implementation Points

1.ReplicaManager Modification

  • When constructing RemoteStorageFetchInfo for remote fetches, extract and pass the clientId from the incoming FetchRequest.

2,RemoteLogManager Instrumentation

  • Check remote.log.metrics.cost.attribution.enabled configuration
  • Lookup or create sensor for (clientId, topic) tuple (subject to LRU limits)
  • Record request count immediately upon task submission
  • Record byte count in the completion callback after successful fetch
  • Record error count for failed fetch attempts

Billing Accuracy: RemoteFetchBytesPerSec is recorded in the async completion callback only after successful data transfer. This ensures the metric reflects actual payload bytes that match cloud provider billing (e.g., AWS S3 does not charge egress for failed transfers). Failed fetches increment RemoteFetchErrorsPerSec but not RemoteFetchBytesPerSec.

3.Thread Safety

  • Use Kafka's thread-safe Metrics library (atomic accumulators)
  • Metric recording occurs on RemoteLogManager's thread pool (not network I/O threads)
  • ConcurrentHashMap for sensor cache with LRU eviction


4. MBean Lifecycle Management

When a sensor is evicted from the LRU cache due to the remote.log.metrics.max.client.sensors limit, its corresponding JMX MBean will be immediately unregistered from the JMX registry. This ensures bounded memory usage and prevents MBean leaks as client populations churn over time.

Performance Characteristics

  • CPU Overhead: < 1.2% (hash map lookup + metric recording)
  • Latency Impact: ~0.3ms added to fetch path (negligible compared to 50-500ms S3 latency)
  • Memory Footprint: ~5KB per sensor × 1000 max sensors = ~5MB heap

Financial Governance Framework

This section demonstrates how KIP-1267 enables financial accountability in multi-tenant Kafka environments.

Cost Attribution Model

KIP-1267 provides the telemetry needed to calculate per-client costs using the following formula:

code
Cost_Client = (V_Egress × R_Egress) + (N_Requests × R_API)


Where:
VEgress = Total bytes from RemoteFetchBytesPerSec (aggregated count)

REgress = Cloud provider's egress rate (e.g., $0.09/GB for Internet, $0.01/GB for Inter-AZ)
NRequests = Total count from RemoteFetchRequestsPerSec + RemoteFetchErrorsPerSec

RAPI = Cloud provider's API rate (e.g., $0.0004 per 1,000 GET requests)


Governance Models Enabled

Organizations can implement three levels of financial maturity:

Level 1: Showback Model

  • Mechanism: Dashboards displaying top consumers by cost
  • Goal: Visibility and awareness
  • Implementation: Grafana dashboards with no billing system integration required
  • Outcome: Teams voluntarily optimize consumption patterns based on visibility

Level 2: Chargeback Model

  • Mechanism: Automated monthly cost reports per consumer group
  • Goal: Cost recovery and accountability
  • Implementation: Export metrics to billing systems for internal invoicing
  • Outcome: Platform teams can allocate actual costs to consuming business units, transforming Kafka from a cost center to a cost pass-through service

Level 3: Real-Time Cost Enforcement

  • Mechanism: Stream processing of KIP-1267 metrics with automated quota application
  • Goal: Cost prevention
  • Implementation: Monitor RemoteFetchBytesPerSec per client-id and trigger quota enforcement when thresholds are exceeded
  • Example: If client-id=team-a exceeds $50/hour in remote fetch costs, automatically apply throughput quotas to prevent runaway spending
  • Outcome: Prevents unexpected cost spikes before monthly billing cycles complete

Enterprise Adoption Impact

KIP-1267 addresses a critical barrier to Tiered Storage adoption in enterprise environments. Without cost attribution, organizations cannot:

  • Implement fair billing in multi-tenant clusters
  • Detect and prevent cost anomalies in real-time
  • Optimize consumption patterns based on financial impact
  • Meet compliance requirements for data access auditing

By providing granular cost visibility, this KIP transforms Tiered Storage from a feature with uncertain cost implications into a governable, predictable storage strategy suitable for regulated industries requiring infinite retention capabilities.

Integration with Existing Kafka Features

The metrics provided by KIP-1267 can be integrated with:

  • Kafka Quotas: Use cost metrics to inform quota policies
  • ACLs: Combine with access control for comprehensive governance
  • Monitoring Systems: Export to Prometheus, Datadog, or CloudWatch for alerting
  • FinOps Platforms: Feed into enterprise cost management tools

Security Considerations

The metrics introduced by this KIP expose client-id information through JMX endpoints. Organizations should:

  • Restrict JMX access to authorized operators only
  • Consider implementing authentication for metric exporters (Prometheus, etc.)
  • Be aware that client-id values may contain sensitive team/application identifiers
  • Use Kafka ACLs to control which principals can view cost attribution dashboards

No changes to Kafka's authorization model are required; existing JMX security practices apply.

Compatibility, Deprecation, and Migration Plan

Backward Compatibility

  • Wire Protocol: No changes
  • Client Libraries: No changes required
  • Storage Plugins: No changes to RemoteStorageManager interface
  • Existing Metrics: KIP-963 metrics remain unchanged; KIP-1267 metrics are additive

Migration Strategy

Phase 1 - Preparation:

  1. Upgrade brokers to version containing KIP-1267
  2. Ensure remote.log.metrics.cost.attribution.enabled=false (default)
  3. Verify cluster stability

Phase 2 - Canary Activation:

  1. Enable on single broker: remote.log.metrics.cost.attribution.enabled=true
  2. Monitor JMX endpoints for new metrics
  3. Validate metric accuracy

Phase 3 - Observability Integration:

  1. Configure Prometheus JMX Exporter
  2. Deploy Grafana dashboards
  3. Validate metric aggregation

Phase 4 - Full Rollout:

  1. Enable on all brokers
  2. Begin chargeback data collection

Rollback Plan

Instant rollback via dynamic configuration: set remote.log.metrics.cost.attribution.enabled=false. This immediately stops metric recording without requiring broker restart.

Test Plan

Unit Tests

  • Verify exact byte count attribution for mock remote fetches
  • Test LRU eviction with max.client.sensors  limit
  • Validate context propagation from FetchRequest to RemoteStorageFetchInfo
  • Verify MBean unregistration upon sensor eviction


Integration Tests

  • Multi-tenant simulation: verify independent attribution for concurrent consumers
  • Fault tolerance: ensure failed fetches don't increment byte metrics
  • Cardinality safety: verify sensor map size limits
  • Error attribution: validate that failed S3 requests are correctly attributed to client-id


Performance Tests

  • Test Environment:

    • Cluster: 3 brokers, 32GB RAM, 8 vCPUs each
    • Workload: 10,000 msg/sec, 1KB message size
    • Consumers: 50 concurrent consumer groups
    • Test duration: 24 hours
    • Remote storage: AWS S3 Standard

    Targets:

    • Benchmark throughput degradation (target: < 1%)
    • Measure CPU overhead (target: < 2%)
    • Validate latency impact (target: < 1ms)

Operational Guide

Prometheus Integration

JMX Exporter configuration: YAML

rules:
  - pattern: kafka.server<type=RemoteFetchMetrics, name=RemoteFetchBytesPerSec, client-id=(.+), topic=(.+)><>Count
    name: kafka_server_remote_fetch_bytes_total
    labels:
      client_id: "$1"
      topic: "$2"
    type: COUNTER
  - pattern: kafka.server<type=RemoteFetchMetrics, name=RemoteFetchRequestsPerSec, client-id=(.+), topic=(.+)><>Count
    name: kafka_server_remote_fetch_requests_total
    labels:
      client_id: "$1"
      topic: "$2"
    type: COUNTER
  - pattern: kafka.server<type=RemoteFetchMetrics, name=RemoteFetchErrorsPerSec, client-id=(.+), topic=(.+)><>Count
    name: kafka_server_remote_fetch_errors_total
    labels:
      client_id: "$1"
      topic: "$2"
    type: COUNTER



Example PromQL Queries

Total bytes by client (30 days):

sum(increase(kafka_server_remote_fetch_bytes_total[30d])) by (client_id)


Estimated hourly cost:Promql

sum(rate(kafka_server_remote_fetch_bytes_total[1h])) by (client_id) * 0.00000000009


Fetch efficiency (bytes per request):Promql

sum(rate(kafka_server_remote_fetch_bytes_total[1h])) by (client_id) 
/ 
sum(rate(kafka_server_remote_fetch_requests_total[1h])) by (client_id)


Error rate by client:Promql

sum(rate(kafka_server_remote_fetch_errors_total[1h])) by (client_id)



Rejected Alternatives

Topic-Level Only Attribution

Approach: Map topics to teams via external CMDB.

Rejection Reason: Fails for shared topics consumed by multiple teams. Cannot determine which consumer is driving costs.

Client-Side Telemetry (KIP-714)
Approach: Have clients report their own fetch statistics.

Rejection Reasons:

  • Financial billing cannot rely on self-reported data (trust issue)
  • Requires client upgrades (KIP-1267 works with all existing clients)
  • Broker is the authoritative source for billing

S3 Access Log Parsing

Approach: Parse cloud provider access logs to attribute costs.

Rejection Reason: S3 logs only contain broker IP addresses, not client-ids. Correlation is impossible without broker-side instrumentation.

References

• KIP-405: Kafka Tiered Storage
KIP-963: Additional metrics in Tiered Storage
KIP-714: Client Metrics and Observability

  • No labels