You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current stateUnder Discussion

Discussion threadhttps://lists.apache.org/thread.html/13f306454761ef7318fb9a658b902fb1663a73e3dde542a2c2b29ab4@%3Cdev.kafka.apache.org%3E

JIRA KAFKA-8410 - Getting issue details... STATUS

POC: https://github.com/apache/kafka/pull/6856

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current Processor API is designed with flexibility in mind, but it overlooks an opportunity for type safety that is still compatible with the flexibility objective.

In particular, the Processor interface has `<K,V>` generic paramters, but these only bound the input  types of the processor. This is a very reasonable decision, since the `process` method is actually `void`, and any result from the processing is instead propagated via `ProcessorContext#forward`. In principle, any type can be forwarded, so the context doesn't bound the forward call at all (it's effectively `Object key, Object value`).

However, in practice, most processors will produce keys or values or both of a predictable type, and authors would benefit from the parity-check that type safety provides. This can be achieved by adding generic parameters to ProcessorContext, and also adding "output type" bounds to the Processor interface itself. This does not  restrict the range of use cases at all, though, since heterogeneous-output Processors, which wish to forward variable types, can still bound the "output types" to `<?,?>`.


Some examples of use cases that would benefit from this change:

  • Any processor can benefit. Imagine a pure user of the ProcessorAPI who has very complex processing logic. I have seen several processorimplementation that are hundreds of lines long and call `context.forward` in many different locations and branches. In such an implementation, it would be very easy to have a bug in a rarely used branch that forwards the wrong kind of value. This would structurally prevent that from happening.
  • Also, anyone who heavily uses the ProcessorAPI would likely have developed helper methods to wire together processors, just as we have in the DSL implementation. This change would enable them to ensure at compile time that they are actually wiring together compatible types. This was actually my original motivation, since I found it very difficult and time consuming to follow the Streams DSL internal builders.

Proposed Changes

The high-level approach is to bound the output  types of Processor, which we can enforce by bounding the types that ProcessorContext accepts for forward.

Fortunately, ProcessorContext does not have any generic parameters, so we can just add a new parameter list <K,V> without breaking any code compatibility. Un-parameterized usages will just start to get a "rawtypes" warning.

Unfortunately, Processor already has two generic parameters (the input key and value types), and there is no backward compatible way to add two more (the output key and value types), so we have to create a new Processor interface. The new interface of course needs a different fully qualified name, which can be achieved with:

  1. A new class name (like processor.Processor2 or processor.TypedProcessor)
  2. A new package name (like processor.api.Processor or processor2.Processor)

The current interface is processor.Processor, for reference. The class name seems clunkier, since even after we deprecate and remove Processor, the new names would continue to be visible in source code. This wouldn't be bad if there were some obvious good name for the new interface, but unfortunately Processor seems like the perfect name.

The package name has the drawback that, in Java, you can't import a package, so any references to the new interfaces would need to be fully qualified as long as they co-exist with the old interfaces in source code. On the plus side, once we ultimately remove the old interface, we can then just import org.apache.kafka.streams.processor.api.Processor, and then get back to source code that references only Processor. A relevant precedent would be the nio package in the standard library, which is the "new" version of the io package.

These changes will help users write safer code in their custom processors, and get a proof from the type system that they're forwarding only the types they think they are.

This change also positions us to make other public API improvements, like KAFKA-8396, to collapse transformer types that become redundant after this KIP.

Public Interfaces

(deprecation) org.apache.kafka.streams.processor.{Processor, ProcessorSupplier}

  • these classes are deprecated, which would be propagated to any public APIs that reference them.

(new class) org.apache.kafka.streams.processor.api.Processor

  • Similar to org.apache.kafka.streams.processor.Processor, but adds output generic type parameters
  • Bounds to the forwarding types allowed on the ProcessorContext
  • Add init and close are defaulted to no-op for convenience
  • Javadocs are similar to existing Processor interface
public interface Processor<KIn, VIn, KOut, VOut> {
  default void init(ProcessorContext<KOut, VOut> context) {}
  void process(KIn key, VIn value);
  default void close() {}
}

(new class) org.apache.kafka.streams.processor.api.ProcessorSupplier

  • Just a Supplier for the new Processor type
  • could potentially be replaced with `Supplier<Processor<KIn, VIn, KOut, VOut>` 
public interface ProcessorSupplier<KIn, VIn, KOut, VOut> {
  Processor<KIn, VIn, KOut, VOut> get();
}

(change) org.apache.kafka.streams.processor.ProcessorContext

  • Added generic parameters <K, V>
  • Adjusted bounds on forward methods (only showing first diff for brevity
  • These changes are fully backward compatible, since the type bounds have not changed for a raw-type usage of ProcessorContext. Existing implementations will start to get a warning, since they are now using a "raw type" of a generically typed class.
public interface ProcessorContext<K, V> {
  ...
- <K,            V>            void forward(final K  key, final V  value);
+ <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value);
. <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final To to);

// the following are deprecated. We'll update the generic bounds, and keep them deprecated
. @Deprecated
. <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final int childIndex);
. @Deprecated
. <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, final String childName);
}


(new method) org.apache.kafka.streams.StreamsBuilder

  • These changes are fully backward compatible
public synchronized <KIn, VIn, KOut, VOut> StreamsBuilder addGlobalStore(
  final StoreBuilder storeBuilder,
  final String topic,
  final Consumed consumed,
  final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier
);


(new method) org.apache.kafka.streams.Topology

  • These changes are fully backward compatible
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(
  final String name,
  final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
  final String... parentNames
);

public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
  final StoreBuilder storeBuilder,
  final String sourceName,
  final Deserializer<KIn> keyDeserializer,
  final Deserializer<VIn> valueDeserializer,
  final String topic,
  final String processorName,
  final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
);

public synchronized <KIn, VIn, KOut, VOut> Topology addGlobalStore(
  final StoreBuilder storeBuilder,
  final String sourceName,
  final TimestampExtractor timestampExtractor,
  final Deserializer<KIn> keyDeserializer,
  final Deserializer<VIn> valueDeserializer,
  final String topic,
  final String processorName,
  final processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> stateUpdateSupplier,
);

(new method) org.apache.kafka.streams.kstream.KStream

  • These changes are fully backward compatible
void process(
  final processor.api.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier, 
  final String... stateStoreNames
)

void process(
  final processor.api.ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier,
  final Named named,
  final String... stateStoreNames
)

(unchanged) org.apache.kafka.streams.kstream.Transformer

Just explicitly stating that the Transformer interface would not be changed at all. The generics case for Transformer is a little more complicated, and I'd like to give it the consideration it really deserves within the scope of https://issues.apache.org/jira/browse/KAFKA-8396 .  This means that, as an intermediate state, there would be a `rawtypes` warning on the ProcessorContext parameter to `Transformer.init`.

(modification) org.apache.kafka.streams.kstream.{ValueTransformer, ValueTransformerWithKey}

  • These changes are fully backward compatible. Existing implementations will start to get a warning, since they are now using a "raw type" of a generically typed class.


public interface ValueTransformer*<K, V, VR> {
- void init(final ProcessorContext context);
+ void init(final ProcessorContext<Void, Void> context);
}

Compatibility, Deprecation, and Migration Plan

We'd deprecate the existing Processor class and any public APIs that depend on it. In future major releases, we'll discuss whether it's been deprecated long enough to be removed.

The most impacted usages would be usages of the Processor API itself. Code would need to be migrated to the new interfaces. Since all the methods are the same, this is a simple process of swapping out the package name and then adding the new generic type arguments.

There are two usage types in the Streams DSL that need to be migrated:

  • KStream#process : would swap out the package and then add the <Void, Void> output type, which has the side benefit of notifying the user if they were pointlessly and erroneously calling context.forward.
  • ValueTransformer (and company): would add <Void, Void> bounds to their ProcessorContext, which add the side benefit of notifying the user if they were pointlessly and erroneously calling context.forward.

Rejected Alternatives

  • No labels