Status

Current state[Discussion]

Discussion thread: here []

JIRA: here []


Motivation

Compression improves network and disk utilization and leads to better cluster throughput at the expense of latency. Several compression algorithms (and compression levels) exist which trade off compression ratio for speed. With the use of accelerators, we can speed up compression (improve latency) without affecting compression ratio too much. 
Kafka currently supports 4 compression algorithms; Gzip, Zstd, Snappy and LZ4. This KIP proposes a framework that supports the acceleration of any of these algorithms in Kafka with any hardware accelerator provider. This KIP does not introduce a new compression algorithm but instead allows for compression acceleration on producers, brokers or consumers. Compression, decompression and recompression can be independently accelerated. This means that the accelerated compression strategy MUST be compatible with existing strategies.

Public Interfaces

Compression Service

The 'CompressionService' interface encapsulates the basic functions that an accelerated codec must provide. A service provider will supply a specific implementation of this service for a specific 'compressionType'.


public interface CompressionService {

    /**
     * The compression type for this compression codec. 
     * @see org.apache.kafka.common.config.TopicConfig#COMPRESSION_TYPE_DOC      
     */
    String type();

    /**
     * Throws UnavailableServiceException if service is not available
     */
    public void checkAvailable();

    /**
     * returns an OutputStream that will compress data compatible with the compression type.
     *
     * @param bufferStream The buffer to write the compressed data to
     * @param compressionLevel an optional parameter that sets the compression level if applicable
     */
    OutputStream compressedOutputStream(OutputStream bufferStream, Optional<Integer> compressionLevel);

    /**
     * returns an InputStream that will decompress data compatible with the compression type.
     *
     * @param buffer The {@link ByteBuffer} instance holding the data to decompress.
     * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported.
     */
    InputStream decompressedInputStream(ByteBuffer buffer, BufferProvider decompressionBufferProvider);

	public interface BufferProvider {
        public ByteBuffer get(int capacity);
        public void release(ByteBuffer buffer);
    } 
}



Proposed Changes

This KIP leverages Java’s Service Loader API to offload compression/decompression for a specific codec to a provided alternative. Each existing compression codec represents a service. A service provider is any alternative implementation of the specific service that implements the CompressionService interface. On startup the service loader will attempt to load service providers for each configured service and will replace the default implementation with the provided implementation. 

Compression will only be offloaded to a compression provider only if it is specified in the corresponding configuration file of the producer, server or consumer. If no service provider is configured explicitly then the base implementation will be used. A new configuration is added for each existing codec, 'compression.{codec}.provider' whose value is a class name of the service provider. A ConfigException will be thrown if the provided class is not found on the class path. 


 

Compatibility, Deprecation, and Migration Plan

A specific accelerated codec must be compatible with the codec it is accelerating in all directions. Since compression and decompression typically occur in separate processes, the accelerated codec need not be available in all nodes of the cluster for compression/decompression to occur correctly. This KIP does not affect the message format and is purely opportunistic; Compression and decompression will continue to operate normally even if no accelerators are present.

Due to the nature of the service loader API, any untrusted and untested jar file on the application class path could be dynamically loaded by the Kafka process. This introduces compatibility as well as security concerns

Test Plan

New unit tests and integration tests will be included as part of this feature. Tests must prove 

  • Compatibility between an existing codec and a corresponding accelerated codec.
  • The default behavior with this feature disabled is unchanged
  • Correct error handling in failure scenarios
  • Multiple compression service types can be correctly used and managed simultaneously
  • Validate compression options (e.g compression levels) are honored were applicable 


Performance Impact

This feature was tested using a custom deflate compressor plugin that leverages the QAT compression accelerator. The performance was tested against the built-in gzip and zstd Kafka compressors as well as zlib-ng. This QAT-deflate plugin was developed using the QAT-java library. zlib-ng data was collected by using LD_PRELOAD and zlib-ng version 1.3.1. Benchmarking shows QAT-deflate results in

  • almost 6x gain in throughput relative to gzip
  • up to 4x gain in throughput relative to zlib-ng
  • about 40% gain in throughput relative to zstd
Compression TypeNormalized Throughput at SLA (20ms)compression ratio

gzip

10.446
QAT-deflate5.930.442
zlib-ng1.410.441
zstd3.970.442

Environment

  • Kafka 4.2.0 snapshot
  • OpenJDK 21
  • Ubuntu 24.04.3 LTS
  • AWS ec2 i7i-metal-24xl broker (2 Kafka brokers)
  • AWS ec2 m7i-metal-48xl producer
  • QATjava-2.3.2, Qatzip-1.3.1
  • Producer performance test
    • Batch size = 112K
    • Record size = 1024 bytes
    • replication = 2
    • Security protocol = SSL

Notes

  • Each producer runs at a fixed throughput of 50K record per second. The number of producers is scaled up until the p99 latency reaches 20ms and the combined throughput is the KPI.
  • Consumers are not included in this benchmarking.
  • compression ratios are as reported by the producer performance test.
  • These tests were run with brokers on a single node and producers on a separate node.
  • QAT is only enabled on metal instances (on AWS) which are relatively expensive. Data for gzip, zlib-ng and Zstd were also collected on smaller instances to match costs (6 X i7i-4xl instances). Performance on the smaller instances, while different, does not change the relative performance results.

Rejected Alternatives

See (KIP-984: Add pluggable compression interface to Kafka)

  • No labels