Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently in Kafka Streams to enrich a Purchase with both Product and Customer information, we’d do something like:

 

Code Block
languagejava
final KStream<Long, Purchase> purchases = builder.stream("purchase");
final KTable<Long, Customer> customers = builder.table("customer", "customer-store");
final KTable<Long, Product> products = builder.table("product", "product-store");

	
// re-key purchases stream on customerId
purchases.map((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase))
        // join to customers. This will create an intermediate topic to repartition
        // the purchases stream based on the customerId
        .leftJoin(customers, EnrichedPurchase::new)
        // re-key enrichedPurchase based on productId
        .map((key, enrichedPurchase) -> 
                   KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase))
        // join to products. This will create an intermediate topic to repartition
        // the previous intermediate topic based on productId
        .leftJoin(products, EnrichedPurchase::withProduct);

 

As described in the code above and shown in the diagram below, there are multiple repartitioning phases and intermediate topics. Also, we need to co-partition all of the topics involved in the joins.



With the introduction of global tables to Kafka Streams we eliminate the intermediate topics used for repartitioning.

 

Code Block
languagejava
 final KStream<Long, Purchase> purchases = builder.stream("purchase");
 final GlobalKTable<Long, Customer> customers = builder.globalTable("customer", "customer-store");
 final GlobalKTable<Long, Product> products = builder.globalTable("product", "product-store");


 // join to the customers table by providing a mapping to the customerId
 purchases.leftJoin(customers, 
 		            ((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase),
                       EnrichedPurchase::new)
           // join to the products table by providing a mapping to the productId
           .leftJoin(products,
                     KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase),
                     EnrichedPurchase::withProduct);

 

...

 

Code Block
languagejava
// Represents a Table that is fully replicated on each KafkaStreams instance
public interface GlobalKTable<K, V> {
	<K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
                                   	    final KeyValueMapper<K, V, K1> keyMapper,
                                        final ValueJoiner<V, V1, R> joiner
   										final String queryableViewName);


<K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other,
                                        final KeyValueMapper<K, V, K1> keyMapper,
                                        final ValueJoiner<V, V1, R> joiner,
                                        final String queryableViewName);


}


 

KStream

Add overloaded methods for joining with GlobalKTable

Code Block
languagejava
/**
 * perform a left join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KStream to the key of the GlobalKTable
 */
<K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> replicatedTable, 
				                   final KeyValueMapper<K, V, K1> keyValueMapper,
 				                   final ValueJoiner<V, V1, R> valueJoiner);

/**
 * perform a join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KStream to the key of the GlobalKTable
 */
<K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> table, 
					             final KeyValueMapper<K, V, K1> keyValueMapper,
 					             final ValueJoiner<V, V1, V2> joiner);


 

KTable

Add overloaded methods for joining with GlobalKTable 

 

 

Code Block
languagejava
/**
 * perform a join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KTable to the key of the GlobalKTable
 */
<K1, V1, R> KTable<K, R> join(final GlobalKTable<K1, V1> globalTable,
                              final KeyValueMapper<K, V, K1> keyMapper,
                              final ValueJoiner<V, V1, R> joiner);

/**
 * perform a left join with a GlobalKTable using the provided KeyValueMapper
 * to map from the (key, value) of the KTable to the key of the GlobalKTable
 */
<K1, V1, R> KTable<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable,
                                  final KeyValueMapper<K, V, K1> keyMapper,
                                  final ValueJoiner<V, V1, R> joiner);

 

 

KStreamBuilder

Code Block
languagejava
/**
 * Add a GlobalKTable to the topology using the provided Serdes
 */ 
public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, 
						                     final Serde<V> valSerde, 
						                     final String topic, 
                                             final String storeName)

/**
 * Add a GlobalKTable to the topology using default Serdes
 */ 
public <K, V> GlobalKTable<K, V> globalTable(final String topic, 
                                             final String storeName)

TopologyBuilder

Code Block
// add a Global Store that is backed by a source topic to the TopologyBuilder
public synchronized TopologyBuilder addGlobalStore(final StateStore store,                                                   
                                                   final String sourceName,
   												   final Deserializer keyDeserializer,
   												   final Deserializer valueDeserializer,
   												   final String topic,
   												   final String processorName,
   											       final ProcessorSupplier stateUpdateSupplier)
 
// A method to add Global Stores that are not backed by a source topic, i.e., the view that is the result of a join between two GlobalKTables
public synchronized TopologyBuilder addGlobalStore(final StateStore store)
 
// All Global State Stores will be part of a single ProcessorTopology
// this provides an easy way to build that topology
public synchronized ProcessorTopology buildGlobalStateTopology();
 
// Retrieve the Global State Stores from the builder. 
public synchronized Map<String, StateStore> builder.globalStateStores()

 

 

...

  • None - this is a new feature and doesn’t impact any existing usages.

Test Plan

  • Unit tests to validate that all the individual components work as expected.
  • Integration and/or System tests to ensure that the feature works correctly end-to-end. 

Rejected Alternatives

  • Replicating per task: Each StreamTask would have its own complete replica of the table. Though this would provide the ability to do best-effort time synchronization it would be too costly in terms of resource usage.

  • Replicating per thread: Doesn’t provide any benefit beyond per instance, but adds additional replication overhead

  • Introduce a broadcastJoin() API that uses the existing KTable interface and automatically converts it, internally, to a global table. The feeling is that this would muddy the API, i.e. it is better to be explicit with the type.