...
Code Block | ||
---|---|---|
| ||
final KStream<Long, Purchase> purchases = builder.stream("purchase"); final KTable<Long, Customer> customers = builder.table("customer", "customer-store"); final KTable<Long, Product> products = builder.table("product", "product-store"); // re-key purchases stream on customerId purchases.map((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase)) // join to customers. This will create an intermediate topic to repartition // the purchases stream based on the customerId .leftJoin(customers, EnrichedPurchase::new) // re-key enrichedPurchase based on productId .map((key, enrichedPurchase) -> KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase)) // join to products. This will create an intermediate topic to repartition // the previous intermediate topic based on productId .leftJoin(products, EnrichedPurchase::withProduct); |
As described in the code above and shown in the diagram below, there are multiple repartitioning phases and intermediate topics. Also, we need to co-partition all of the topics involved in the joins.
With the introduction of global tables to Kafka Streams we eliminate the intermediate topics used for repartitioning.
Code Block | ||
---|---|---|
| ||
final KStream<Long, Purchase> purchases = builder.stream("purchase"); final GlobalKTable<Long, Customer> customers = builder.globalTable("customer", "customer-store"); final GlobalKTable<Long, Product> products = builder.globalTable("product", "product-store"); // join to the customers table by providing a mapping to the customerId purchases.leftJoin(customers, ((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase), EnrichedPurchase::new) // join to the products table by providing a mapping to the productId .leftJoin(products, KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase), EnrichedPurchase::withProduct); |
...
Code Block | ||
---|---|---|
| ||
// Represents a Table that is fully replicated on each KafkaStreams instance public interface GlobalKTable<K, V> { <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner final String queryableViewName); <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner, final String queryableViewName); } |
KStream
Add overloaded methods for joining with GlobalKTable
Code Block | ||
---|---|---|
| ||
/** * perform a left join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KStream to the key of the GlobalKTable */ <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> replicatedTable, final KeyValueMapper<K, V, K1> keyValueMapper, final ValueJoiner<V, V1, R> valueJoiner); /** * perform a join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KStream to the key of the GlobalKTable */ <K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> table, final KeyValueMapper<K, V, K1> keyValueMapper, final ValueJoiner<V, V1, V2> joiner); |
KTable
Add overloaded methods for joining with GlobalKTable
Code Block | ||
---|---|---|
| ||
/** * perform a join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KTable to the key of the GlobalKTable */ <K1, V1, R> KTable<K, R> join(final GlobalKTable<K1, V1> globalTable, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner); /** * perform a left join with a GlobalKTable using the provided KeyValueMapper * to map from the (key, value) of the KTable to the key of the GlobalKTable */ <K1, V1, R> KTable<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable, final KeyValueMapper<K, V, K1> keyMapper, final ValueJoiner<V, V1, R> joiner); |
KStreamBuilder
Code Block | ||
---|---|---|
| ||
/** * Add a GlobalKTable to the topology using the provided Serdes */ public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName) /** * Add a GlobalKTable to the topology using default Serdes */ public <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName) |
TopologyBuilder
Code Block |
---|
// add a Global Store that is backed by a source topic to the TopologyBuilder public synchronized TopologyBuilder addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier) // A method to add Global Stores that are not backed by a source topic, i.e., the view that is the result of a join between two GlobalKTables public synchronized TopologyBuilder addGlobalStore(final StateStore store) // All Global State Stores will be part of a single ProcessorTopology // this provides an easy way to build that topology public synchronized ProcessorTopology buildGlobalStateTopology(); // Retrieve the Global State Stores from the builder. public synchronized Map<String, StateStore> builder.globalStateStores() |
...
- None - this is a new feature and doesn’t impact any existing usages.
Test Plan
- Unit tests to validate that all the individual components work as expected.
- Integration and/or System tests to ensure that the feature works correctly end-to-end.
Rejected Alternatives
Replicating per task: Each StreamTask would have its own complete replica of the table. Though this would provide the ability to do best-effort time synchronization it would be too costly in terms of resource usage.
Replicating per thread: Doesn’t provide any benefit beyond per instance, but adds additional replication overhead
Introduce a broadcastJoin() API that uses the existing KTable interface and automatically converts it, internally, to a global table. The feeling is that this would muddy the API, i.e. it is better to be explicit with the type.