Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleProducer
    CompletionStage<RecordMetadata> send(SendTarget target, SendRecord<K, V> record);

  


Code Block
interface SendTarget {
  @Deprecated
  static TopicPartition Future<RecordMetadata> send(ProducerRecord<Kof(String topic, V>int recordpartition);

  static  @Deprecated
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);TopicTarget of(String topic);
}

class TopicTarget implements SendTarget

class TopicPartition implements SendTarget 



Code Block
languagejava
interface SendRecord<K, V> {

  String topic();

  Headers headers();

  K key();

  V value();

  Long timestamp();

  Integer partition();

  static class Builder<K, V> {
    
    Builder<K, V> topic(String value);
    Builder<K, V> headers(Iterable<Header> value);
    Builder<K, V> key(String value);
    Builder<K, V> value(String value);
    Builder<K, V> timestamp(Long value);
    Builder<K, V> partition(Integer value);

    SendRecord<K, V> build();
  }  
}
Code Block
languagejava

public class ProducerRecord<K, V> implements SendRecord<K, V> {

  @Deprecated
  public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)

  @Deprecated
  public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)

  @Deprecated
  public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)

  @Deprecated
  public ProducerRecord(String topic, Integer partition, K key, V value) 

  @Deprecated
  public ProducerRecord(String topic, K key, V value)

  @Deprecated
  public ProducerRecord(String topic, V value)
}


Code Block
languagejava
titleCallback
package org.apache.kafka.clients.producer;

@Deprecated
public interface Callback {


Proposed Changes

  1. introduce new interface - SendTarget and SendRecord 
  2. introduce new method send(SendTarget, SendRecord) to producer
  3. deprecate all public constructors of ProducerRecord
  4. deprecate send(ProducerRecord)
  5. deprecate send(ProducerRecord, Callback)
  6. deprecate Callback
  7. remove FutureRecordMetadata
  8. introduce CompletableFuture to send flow

Compatibility, Deprecation, and Migration Plan

...

After we introduce new record class "SendRecord", we should NOT use ProducerRecord in our production code. There are some public interface (ProducerInterceptor, for example) still accept ProducerRecord and so we need other KIPs as follow-up to deprecate them. 

Rejected Alternatives

  • Return CompletableFuture from KafkaProducer.send
  • Change send() to return CompletableFuture. This is not a compatible change.
  • Return CompletionStage instead of CompletableFuture. The former doesn't extend the Future interface, so it's incompatible unless we introduce new methods.