Status
Current state: [Under Discussion]
Discussion thread: here []
JIRA: here []
Motivation
Compression improves network and disk utilization and leads to better cluster throughput at the expense of latency. Several compression algorithms (and compression levels) exist which trade off compression ratio for speed. With the use of accelerators, we can speed up compression (improve latency) without affecting compression ratio too much.
Kafka currently supports 4 compression algorithms; Gzip, Zstd, Snappy and LZ4. This KIP proposes a framework that supports the acceleration of any of these algorithms in Kafka with any hardware accelerator provider. This KIP does not introduce a new compression algorithm but instead allows for compression acceleration on producers, brokers or consumers. Compression, decompression and recompression can be independently accelerated. This means that the accelerated compression strategy MUST be compatible with existing strategies.
This KIP also introduces an accelerated GzipCompressor that uses the QAT (https://www.intel.com/content/www/us/en/products/docs/accelerator-engines/what-is-intel-qat.html) hardware accelerator
Public Interfaces
org.apache.kafka.common.compress.Compression.java
org.apache.kafka.common.compress.GzipCompression.java
org.apache.kafka.common.compress.SnappyCompression.java
org.apache.kafka.common.compress.Lz4Compression.java
org.apache.kafka.common.compress.ZstdCompression.java
org.apache.kafka.common.utils.ByteBufferOutputStream
org.apache.kafka.common.utils.BufferSupplier
org.apache.kafka.common.utils.ByteBufferInputStream
org.apache.kafka.common.utils.ChunkedByteStream
org.apache.kafka.common.record.CompressionType
>> sample Producer.properties file >>
compression.type=gzip
compression.gzip.level=1
compression.gzip.provider=com.intel.qat.kafka.QatDeflateCompression$Builder
Proposed Changes
This KIP leverages Java’s Service Loader API to offload compression/decompression for a specific codec to a provided alternative. Each existing compression codec (specifically its builder) represents a service. A service provider is any alternative implementation of the specific service (codec). On startup the service loader will attempt to load any discovered service providers for each service and will potentially replace the base implementation with the provided implementation. As an example, GzipCompression$Builder is one of four services. A service provider would have to ‘register’ itself as a provider of this service and would return a GzipCompression$Builder when loaded which would be used instead of the base implementation. An optional configuration ‘compression.[gzip].service’ exists in the Producer and Server configurations that indicates a preferred service provider to be loaded if found. Any new service provider should extend the base implementation and thus return the same “CompressionType”. A new method isAvailable() will be added to the “Compression$Builder” interface that returns true if the accelerated codec is available at runtime.
The modifications to the base implementations include static initializers to load service providers and a static method to return a builder from the base implementation of service provider.
Compatibility, Deprecation, and Migration Plan
A specific accelerated codec must be compatible with the codec it is accelerating in all directions. Since compression and decompression typically occur in separate processes, the accelerated codec need not be available in all nodes of the cluster for compression/decompression to occur correctly. This KIP does not affect the message format and is purely opportunistic; Compression and decompression will continue to operate normally even if no accelerators are present.
Due to the nature of the service loader API it is possible to provide and load 3rd party service providers at runtime as long as they are on the class path. Due to security and/or compatibility concerns it is possible to limit this feature to only load approved service providers (e.g those distributed with Kafka)
Test Plan
Tests must prove compatibility between an existing codec and a corresponding accelerated codec.