You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion threadhere

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

kafka.common.MessageReader is a input argument of kafka-console-producer and we expect users can have their custom reader to produce custom records. Hence, MessageReader is a public interface and we should offer a java version to replace current scala code. Also, the new MessageReader should be placed at clients module. (kafka.common.MessageReader is in core module)

Public Interfaces

New org.apache.kafka.common.RecordReader interface


/**
 * Typical implementations of this interface convert data from an `InputStream` received via `configure` into a
 * `ProducerRecord` instance on each invocation of `readRecord`. Noted that the implementations to have a public
 * nullary constructor.
 *
 * This is used by the `kafka.tools.ConsoleProducer`.
 */
public interface RecordReader extends Closeable, Configurable {
    /**
     * read byte array from input stream and then generate a producer record
     * @param inputStream of message 
     * @return a producer record
     */
    ProducerRecord<byte[], byte[]> readRecord(InputStream inputStream);


    /**
     * Closes this reader
     */
    default void close() {}
}


Proposed Changes

Deprecate kafka.common.MessageReader


@deprecated("This class has been deprecated and will be removed in 4.0. Please use org.apache.kafka.common.RecordReader instead", "3.5.0")
trait MessageReader


Compatibility, Deprecation, and Migration Plan

  1. backward compatibility
    kafka.common.MessageReader implementations can keep working without recompilation. 
  2. deprecation
    1. kafka.common.MessageReader is deprecated
    2. the method init(InputStream inputStream, Properties props) is deprecated and replacement is configure(InputStream inputStream, Map<String, ?> configs)
  3. migration plan: users have address following changes to complete code migration
    1. change inheritance from kafka.common.MessageReader to org.apache.kafka.common.RecordReader
    2. change method signature from init(InputStream inputStream, Properties props) to configure(InputStream inputStream, Map<String, ?> configs)
    3. change method signature from readMessage to readRecord

Rejected Alternatives

  1. support the usage of Serializers (from Juma): this support could complicate the configs, since users have to define both MessageReader and serializer. It seems to me the mechanism of MessageReader should include serialization.
  • No labels