Status

Current state: Under Discussion

Discussion thread: here

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Although compression is not a new problem, it has continued to be an important research topic.

The integration and testing of new compression algorithms into Kafka currently requires significant code changes and rebuilding of the distribution package for Kafka.

This  KIP give users the ability to use different compressors without needing future changes in Kafka.

This proposal suggest modifying the message format by requesting 4 bits in the attribute for compression plugin selection as well as adding a new codec (bits 0~2) to implement the compression pluggable interface.

This KIP does not supercede KIP-390 and KIP-780.  It can build on top of KIP-780 as the scope of this KIP goes beyond the scope of KIP-390/KIP-780

Public Interfaces

Current compression is NOT changed. The existing codecs are not affected because their code path is NOT affected. 

The following public packages are affected: 

  • org/apache/kafka/common

  • org/apache/kafka/clients/producer

  • org/apache/kafka/clients/consumer 

  • org/apache/kafka/storage/internals
  • org/apache/kafka/tools


Proposed Changes

We propose an update the message format to support the compression plugin interface.

Kafka internal topics are used to hold plugin information. Each Kafka language client has its own internal topic. 

Our initial assumption is that all plugins are installed manually across the cluster. Installation here means copying the necessary files (e.g jars, modules, libraries) in the classpath.

A producer can select a compression plugin through configuration the same way it chooses a built-in codec. When the codec is not a built-in, the producer will consume the plugin list from the topic and retrieve the plugin information. 

The producer will use a pluginID to coordinate with the broker and consumer. 

There are 2 main phases in order to facilitate the coordination of a plugin across clients and cluster, the registration of the plugin phase and the selection of the plugin phase. 

Registering plugins

We propose the creation of an admin tool called Kafka-plugin-tool.sh that take a list of plugins interactively from users or from a file and store it as a Kafka topic into the cluster. 

For each language supported by Kafka, a separate topic will be used to store plugin information for that particular language-client. This ensure support for non-java clients.

The admin tools launches a special producer that produces a key/value pair where the key is the plugin alias and the value is a JSON formatted structure of plugin information. 

The plugin information schema currently includes the following fields:

  1. pluginID: an automatically generated integer value that represent the plugin and shared across all the language clients. It is used to communicate the selected plugin to the broker rather than using the alias.
  2. pluginAlias: a string value holding the name of the plugin and shared across all the language client. It is the value for the plugin used in compression.type configurations.
  3. pluginClassName: a string value holding the class name to be instantiated when pluggable interface is used. The name here is java centric, but the idea is that for any other language, this is where the code for using the plugin will live
  4. pluginVersion: a string value holding the version number of the plugin

The fields listed above describe a minimum set but can be expanded as needed. 

An example plugin registration would be: 

Insert following snappy plugin in plugin-java-topic :

Record KeyRecord Value
snappyPlugin

  "pluginID":1

  "pluginAlias":"snappyPlugin"

   "pluginClassName":"Java.classname"

   "pluginVersion": "v1.0"

}


The corresponding python plugin for the python client can be entered in the plugin-python-topic:

Record KeyRecord Value
snappyPlugin

  "pluginID":1

  "pluginAlias":"snappyPlugin"

   "pluginClassName":"python.classname"

   "pluginVersion": "v1.0"

}



Topics

Kafka Topics are used to register a plugin. Each language-client has its own topic in the cluster. This ensure the following:  

  1. Support of all the language-clients.
  2. Plugin list is persisted throughout the lifetime of the cluster.  

Constraints for registering plugins and ensure compatibility :

  1. All entries for different language-client must be installed upfront ensuring all clients are compatible
  2. Non java plugins may not be registered without the java equivalent because the broker needs to be able to decompress the data 
  3. All plugin with same alias must be compatible
  4. Update to an existing plugin that is not compatible must be a new plugin

Message Format Update

Request to use 4 bits in the next available byte for attributes so we can store a pluginID

4 bits are used to support up to 16 plugins per cluster, remaining bits left unused.

The position of the 4 bits requested within the attributes int16 unused bits is arbitrary from the perspective of this proposal.

We left the last bit of the fist byte to allow further expansion.

Selection of the plugins

Once a plugin is registered in a Kafka topic, a producer/broker/consumer can consume the list of plugins by querying the topic corresponding to that language-client.

The assumption at the moment is that the plugin is manually installed. Automatic plugin installation is a stretch goal at this time.

Selecting plugins from Producer

  1. A Kafka producer uses compression.type configuration to select a plugin by selecting the pluginAlias. 
  2. When the compression.type value is not a built-in codec, then the producer will consume the plugin list from the relevant plugin topic and match the pluginAlias to the plugin list. 
  3. If the producer select a plugin which is not in the plugin list, then it will receive an "Unknown compression name" illegal argument exception.
  4. The producer instantiates a plugin compressionType object and compresses the records. 
  5. The producer updates the following record batch header attributes related to compression : codecID and pluginID
  6. codecID represents the ENUM value for the pluggable interface as described by the proposed change (value of 5)
  7. pluginID represents the plugin identification number as described in the plugin information schema.

Selecting plugins from Broker

  1. A Kafka broker decodes the batch headers and the codecID is 5 which signals a plugin was used
  2. The broker then decodes the pluginID
  3. The broker consumes the plugin list from the plugin-java-topic and matches the pluginID to the plugin list.  
  4. If the broker select a plugin which is not in the plugin list, then it will receive an "Unknown compression name" illegal argument exception.
  5. The broker instantiates a plugin compressionType object then validates and analyzes the offsets.
  6. On recompression, the broker will consume the plugin list and match the pluginAlias to the plugin list

Selecting plugins from Consumer

  1. A Kafka consumer decodes the batch headers and the codecID is 5 which signals a plugin was used
  2. The broker then decodes the pluginID
  3. The consumer launches a special consumer to consume the plugin list from the relevant language-client topic and match the pluginID to the plugin list.  
  4. If the consumer select a plugin which is not in the plugin list, then it will receive an "Unknown compression name" illegal argument exception.
  5. The consumer instantiates a plugin compressionType and decompresses the records

Compatibility, Deprecation, and Migration Plan

  1. The current proposal does not have any impact on existing users. 
  2. We are not changing the behavior of current compression. We are simply adding a new compression interface. 
  3. Regression testing shows that pluggable interface did not affect default performance using Snappy as a plugin

The table below show 3 consecutive producer performance tests using built-in snappy 

Snappy Built-inrecords  sentrecord/secMB/s50th % (ms)Mean Latency (ms)95th % (ms)99th % (ms)99.9th % (ms) Max Latency (ms) 
run1 1,000,000.00   9,999.70         9.86            3.00                         3.42             6.00             7.00              27.00                      352.00
run2 1,000,000.00   9,999.60         9.86            4.00                         3.93             7.00             7.00              14.00                      214.00
run3 1,000,000.00   9,999.90         9.86            3.00                         3.41             6.00             7.00                7.00                      226.00

and the table below show 3 consecutive producer performance tests using snappy plugin

SnappyPluginrecords  sentrecord/secMB/s50th % (ms)Mean Latency (ms)95th % (ms)99th % (ms)99.9th % (ms) Max Latency (ms) 
run1  1,000,000.00   9,999.80         9.86            3.00                          3.54             6.00             7.00              31.00                      120.00
run2  1,000,000.00   9,999.80         9.86            3.00                          3.40             6.00             7.00                9.00                        41.00
run3  1,000,000.00   9,999.80         9.86            4.00                          3.55             6.00             7.00              13.00                        59.00

The pluggable compression interface is performing virtually the same. There is no loss of performance when using a built-in algorithm as a plugin.

Rejected Alternatives

First approach was to change the message format to use the compression bits in attribute to add a new codec. 

This new codec implement the pluggable interface by instantiating the compression class

The class name is communicated to the broker by adding it as metadata in the record batch



  • No labels