Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When a user adds a global state store using StreamBuilder#addGlobalStore they must provide a custom processor to load the data in the store. Depending on the way the processor is implemented it can cause issues with the restore process. This issue occurs if they change the format of the records stored in the table from those stored in the acting change log. The issues was mitigated with KAFKA-7663 - Getting issue details... STATUS . Now the restore is done by reprocessing using an instance from the customer processor supplier. However this limits the flexibility of streams as users can no longer just load the bytes directly into the store. The current solution has lower performance as it requires an extra deserliazition step.

This Kip seeks to provide options use users to give a custom processor and still load the bytes directly. If a user is savy enough they will understand not the change the format of the record. One exmaple of this is maybe the custom processor only collects metrics and does nothing else.

Public Interfaces

StreamBuilder.java
New Methods:    
/**
     * Adds a global {@link StateStore} to the topology.
     * The {@link StateStore} sources its data from all partitions of the provided input topic.
     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
     * <p>
     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
     * of the input topic.
     * <p>
     * The provided {@link ProcessorSupplier} will be used to create an
     * {@link Processor} that will receive all records forwarded from the {@link SourceNode}.
     * The supplier should always generate a new instance. Creating a single {@link Processor} object
     * and returning the same object reference in {@link ProcessorSupplier#get()} is a
     * violation of the supplier pattern and leads to runtime exceptions.
     * This {@link Processor} should be used to keep the {@link StateStore} up-to-date.
     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
     * <p>
     * It is not required to connect a global store to the {@link Processor Processors},
     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
     *
     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
     * @param topic                 the topic to source the data from
     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
	 * @param reprocessOnRestore	restore by reprocessing the data using an processor supplied by stateUpdateSupplier or loads the data in byte for byte
     * @return itself
     * @throws TopologyException if the processor of state is already registered
     */
    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                                 final String topic,
                                                                 final Consumed<KIn, VIn> consumed,
                                                                 final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier,
																 final boolean reprocessOnRestore);

    /**
     * Adds a global {@link StateStore} to the topology.
     * The {@link StateStore} sources its data from all partitions of the provided input topic.
     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
     *
     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
     * @param topic                 the topic to source the data from
     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
     * @return itself
     * @throws TopologyException if the processor of state is already registered
     */
    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                                 final String topic,
																 final Consumed<KIn, VIn> consumed);
 Deprecate:

    /**
     * Adds a global {@link StateStore} to the topology.
     * The {@link StateStore} sources its data from all partitions of the provided input topic.
     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
     * <p>
     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
     * of the input topic.
     * <p>
     * The provided {@link ProcessorSupplier} will be used to create an
     * {@link Processor} that will receive all records forwarded from the {@link SourceNode}.
     * The supplier should always generate a new instance. Creating a single {@link Processor} object
     * and returning the same object reference in {@link ProcessorSupplier#get()} is a
     * violation of the supplier pattern and leads to runtime exceptions.
     * This {@link Processor} should be used to keep the {@link StateStore} up-to-date.
     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
     * <p>
     * It is not required to connect a global store to the {@link Processor Processors},
     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
     *
     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
     * @param topic                 the topic to source the data from
     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
     * @return itself
     * @throws TopologyException if the processor of state is already registered
     * @deprecated Since 3.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier, boolean)} or  {@link #addGlobalStore(StoreBuilder, String, Consumed}  instead.
     */
    @Deprecated
    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                                 final String topic,
                                                                 final Consumed<KIn, VIn> consumed,
                                                                 final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier)  


Topology.java
New Methods:
 /**
     * Adds a global {@link StateStore} to the topology.
     * The {@link StateStore} sources its data from all partitions of the provided input topic.
     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
     * <p>
     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
     * of the input topic.
     * <p>
     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
     *
     * @param storeBuilder          user defined state store builder
     * @param sourceName            name of the {@link SourceNode} that will be automatically added
     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
     * @param topic                 the topic to source the data from
     * @return itself
     * @throws TopologyException if the processor of state is already registered
     */
    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                           final String sourceName,
                                                           final Deserializer<KIn> keyDeserializer,
                                                           final Deserializer<VIn> valueDeserializer,
                                                           final String topic)
 /**
     * Adds a global {@link StateStore} to the topology.
     * The {@link StateStore} sources its data from all partitions of the provided input topic.
     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
     * <p>
     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
     * of the input topic.
     * <p>
     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
     * records forwarded from the {@link SourceNode}.
     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
     *
     * @param storeBuilder          user defined state store builder
     * @param sourceName            name of the {@link SourceNode} that will be automatically added
     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
     * @param topic                 the topic to source the data from
     * @param processorName         the name of the {@link ProcessorSupplier}
     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
 	 * @param reprocessOnRestore 	restore by reprocessing the data using an processor supplied by stateUpdateSupplier or loads the data in byte for byte
     * @return itself
     * @throws TopologyException if the processor of state is already registered
     */
    public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                           final String sourceName,
                                                           final Deserializer<KIn> keyDeserializer,
                                                           final Deserializer<VIn> valueDeserializer,
                                                           final String topic,
                                                           final String processorName,
                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier,
														   final boolean reprocessOnRestore)


  /**
     * Adds a global {@link StateStore} to the topology.
     * The {@link StateStore} sources its data from all partitions of the provided input topic.
     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
     * <p>
     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
     * of the input topic.
     * <p>
     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
     * records forwarded from the {@link SourceNode}.
     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
     *
     * @param storeBuilder          user defined key value store builder
     * @param sourceName            name of the {@link SourceNode} that will be automatically added
     * @param timestampExtractor    the stateless timestamp extractor used for this source,
     *                              if not specified the default extractor defined in the configs will be used
     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
     * @param topic                 the topic to source the data from
     * @param processorName         the name of the {@link ProcessorSupplier}
     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
  	 * @param reprocessOnRestore 	restore by reprocessing the data using an processor supplied by stateUpdateSupplier or loads the data in byte for byte
     * @return itself
     * @throws TopologyException if the processor of state is already registered
     */       
     public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                           final String sourceName,
                                                           final TimestampExtractor timestampExtractor,
                                                           final Deserializer<KIn> keyDeserializer,
                                                           final Deserializer<VIn> valueDeserializer,
                                                           final String topic,
                                                           final String processorName,
                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier, 														   
														   final boolean reprocessOnRestore)
 
Deprecate:

 /**
     * Adds a global {@link StateStore} to the topology.
     * The {@link StateStore} sources its data from all partitions of the provided input topic.
     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
     * <p>
     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
     * of the input topic.
     * <p>
     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
     * records forwarded from the {@link SourceNode}.
     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
     *
     * @param storeBuilder          user defined state store builder
     * @param sourceName            name of the {@link SourceNode} that will be automatically added
     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
     * @param topic                 the topic to source the data from
     * @param processorName         the name of the {@link ProcessorSupplier}
     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
     * @return itself
     * @throws TopologyException if the processor of state is already registered
     * @deprecated Since 3.7.0; use {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier, boolean)} 
	 * or  {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String)}  instead.
 	 */
     @Deprecated
     public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                           final String sourceName,
                                                           final Deserializer<KIn> keyDeserializer,
                                                           final Deserializer<VIn> valueDeserializer,
                                                           final String topic,
                                                           final String processorName,
                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier)  

 /**
     * Adds a global {@link StateStore} to the topology.
     * The {@link StateStore} sources its data from all partitions of the provided input topic.
     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
     * <p>
     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
     * of the input topic.
     * <p>
     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
     * records forwarded from the {@link SourceNode}.
     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
     *
     * @param storeBuilder          user defined key value store builder
     * @param sourceName            name of the {@link SourceNode} that will be automatically added
     * @param timestampExtractor    the stateless timestamp extractor used for this source,
     *                              if not specified the default extractor defined in the configs will be used
     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
     * @param topic                 the topic to source the data from
     * @param processorName         the name of the {@link ProcessorSupplier}
     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
     * @return itself
     * @throws TopologyException if the processor of state is already registered
     * @deprecated Since 3.7.0; use {@link #addGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier, boolean)} 
	 * or  {@link #addGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String)}  instead. 
     */       
	 @Deprecated
     public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                           final String sourceName,
                                                           final TimestampExtractor timestampExtractor,
                                                           final Deserializer<KIn> keyDeserializer,
                                                           final Deserializer<VIn> valueDeserializer,
                                                           final String topic,
                                                           final String processorName,
                                                           final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier)


Proposed Changes

I propose that we add and deprecate the variations of addGlobalStore above. From both the StreamBuilder class and Topology class.

The new methods will allow for each case of proper use of the global table to be used correctly without error. First, there should be a method with an option to reprocess or not during a restore. Second, a method should be added without the option to reprocess but also with no custom processor. Instead we will use the same source processor in the DSL that simply moves the data over to the global store. Then we will know that the record format in the tables will not change and we can load directly from the topic without concern. The default processor should be used by the majority of users. 

Compatibility, Deprecation, and Migration Plan

Existing users will have to move on to one of the new methods but there will not be any plans to fully remove the deprecated methods until 5.0. It should be a simple upgrade. There will be no need for any special migration plans.

Test Plan

Testing will be done through unit testing of each option to ensure the proper restore behavior is being followed. The change is small and will not require extensive changes.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels