Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update deprecation verbiage, remove extraneous TableJoined static methods

...

The following KTable interfaces will be added. They are analogous to existing methods that accept Named, except with Named replaced by TableJoined. The existing methods which accept Named will be marked for deprecation (to be removed in the next major release, likely 4.0).

Code Block
    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
     * using the {@link TableJoined} instance for configuration of the {@link StreamPartitioner this table's
     * key serde} and {@link StreamPartitioner the other table's key serde}.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param tableJoined         a {@link TableJoined} used to configure partitioners and names of internal topics and stores
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final TableJoined<K, KO> tableJoined);

	/**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join,
     * using the {@link TableJoined} instance for configuration of the {@link StreamPartitioner this table's
     * key serde} and {@link StreamPartitioner the other table's key serde}.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param tableJoined         a {@link TableJoined} used to configure partitioners and names of internal topics and stores
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final TableJoined<K, KO> tableJoined,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

	/**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
     * using the {@link TableJoined} instance for configuration of the {@link StreamPartitioner this table's
     * key serde} and {@link StreamPartitioner the other table's key serde}.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param tableJoined         a {@link TableJoined} used to configure partitioners and names of internal topics and stores
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final TableJoined<K, KO> tableJoined);

	/**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join,
     * using the {@link TableJoined} instance for configuration of the {@link StreamPartitioner this table's
     * key serde} and {@link StreamPartitioner the other table's key serde}.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param tableJoined         a {@link TableJoined} used to configure partitioners and names of internal topics and stores
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final TableJoined<K, KO> tableJoined,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

...

Code Block
/**
 * The {@code TableJoined} class represents optional params that can be passed to
 * {@link KTable#join(KTable, Function, ValueJoiner, TableJoined) KTable#join(KTable,Function,...)} and
 * {@link KTable#leftJoin(KTable, Function, ValueJoiner, TableJoined) KTable#leftJoin(KTable,Function,...)}
 * operations, for foreign key joins.
 * @param <K>   this key type ; key type for the left (primary) table
 * @param <KO>  other key type ; key type for the right (foreign key) table
 */
public class TableJoined<K, KO> implements NamedOperation<TableJoined<K, KO>> {

	[...]

    /**
     * Create an instance of {@code TableJoined} with partitioner and otherPartitioner {@link StreamPartitioner} instances.
     * {@code null} values are accepted and will result in the default partitioner being used.
     *
     * @param partitioner      a {@link StreamPartitioner} that specifies the partitioning strategy for the left (primary)
     *                         table of the foreign key join. The partitioning strategy must depend only on the message key
     *                         and not the message value. If {@code null} the default partitioner will be used.
     * @param otherPartitioner a {@link StreamPartitioner} that specifies the partitioning strategy for the right (foreign
     *                         key) table of the foreign key join. The partitioning strategy must depend only on the message
     *                         key and not the message value. If {@code null} the default partitioner will be used.
     * @param <K>              this key type ; key type for the left (primary) table
     * @param <KO>             other key type ; key type for the right (foreign key) table
     * @return new {@code TableJoined} instance with the provided partitioners
     */
    public static <K, KO> TableJoined<K, KO> with(final StreamPartitioner<K, Void> partitioner,
                                                  final StreamPartitioner<KO, Void> otherPartitioner) {
        return new TableJoined<>(partitioner, otherPartitioner, null);
    }

    /**
     * Create an instance of {@code TableJoined} with partitionerbase andname otherPartitionerfor {@linkall StreamPartitioner} instances.
     * {@code null} values are accepted and will result in the default partitioner being usedcomponents of the join, including internal topics
     * created to complete the join.
     *
     * @param partitionername the name used as the abase {@linkfor StreamPartitioner}naming thatcomponents specifiesof the partitioningjoin strategy for the left (primary)
     *                         table of the foreign key join. The partitioning strategy must depend only on the message key
     *                         and not the message value. If {@code null} the default partitioner will be used.
     * @param otherPartitioner a {@link StreamPartitioner} that specifies the partitioning strategy for the right (foreign
     *                         key) table of the foreign key join. The partitioning strategy must depend only on the message
     *                         key and not the message value. If {@code null} the default partitioner will be used.
     * @param name             the name used as the base for naming components of the join including internal topics
     * @param <K>              this key type ; key type for the left (primary) table
     * @param <KO>             other key type ; key type for the right (foreign key) table
     * @return new {@code TableJoined} instance with the provided partitioners
     */
    public static <K, KO> TableJoined<K, KO> with(final StreamPartitioner<K, Void> partitioner,
                                                  final StreamPartitioner<KO, Void> otherPartitioner,
                                                  final String name) {
        return new TableJoined<>(partitioner, otherPartitioner, name);
    }

    /**
     * Create an instance of {@code TableJoined} with a custom {@link StreamPartitioner}.
     * {@code null} values are accepted and will result in the default partitioner being used.
     *
     * @param partitioner the {@link StreamPartitioner} that specifies the partitioning strategy for the left (primary)
     *                    table of the foreign key join. The partitioning strategy must depend only on the message key
     *                    and not the message value. If {@code null} the default partitioner will be used.
     * @param <K>         this key type ; key type for the left (primary) table
     * @param <KO>        other key type ; key type for the right (foreign key) table
     * @return new {@code TableJoined} instance configured with the partitioner
     */
    public static <K, KO> TableJoined<K, KO> partitioner(final StreamPartitioner<K, Void> partitioner) {
        return new TableJoined<>(partitioner, null, null);
    }

    /**
     * Create an instance of {@code TableJoined} with a custom other {@link StreamPartitioner}.
     * {@code null} values are accepted and will result in the default partitioner being used.
     *
     * @param otherPartitioner the {@link StreamPartitioner} that specifies the partitioning strategy for the right (foreign
     *                         key) table of the foreign key join. The partitioning strategy must depend only on the message
     *                         key and not the message value. If {@code null} the default partitioner will be used.
     * @param <K>              this key type ; key type for the left (primary) table
     * @param <KO>             other key type ; key type for the right (foreign key) table
     * @return new {@code TableJoined} instance configured with the otherPartitioner
     */
    public static <K, KO> TableJoined<K, KO> otherPartitioner(final StreamPartitioner<KO, Void> otherPartitioner) {
        return new TableJoined<>(null, otherPartitioner, null);
    }

    /**
     * Create an instance of {@code TableJoined} with base name for all components of the join, including internal topics
     * created to complete the join.including internal topics
     *
     * @param name the name used as the base for naming components of the join including internal topics
     * @param <K>  this key type ; key type for the left (primary) table
     * @param <KO> other key type ; key type for the right (foreign key) table
     * @return new {@code TableJoined} instance configured with the name
     *
     */
    public static <K, KO> TableJoined<K, KO> as(final String name) {
        return new TableJoined<>(null, null, name);
    }

    /**
     * Set the custom {@link StreamPartitioner} to be used. Null values are accepted and will result in the
     * default partitioner being used.
     *
     * @param partitioner the {@link StreamPartitioner} that specifies the partitioning strategy for the left (primary)
     *                    table of the foreign key join. The partitioning strategy must depend only on the message key
     *                    and not the message value. If {@code null} the default partitioner will be used.
     * @return new {@code TableJoined} instance configured with the {@code partitioner}
     */
    public TableJoined<K, KO> withPartitioner(final StreamPartitioner<K, Void> partitioner) {
        return new TableJoined<>(partitioner, otherPartitioner, name);
    }

    /**
     * Set the custom other {@link StreamPartitioner} to be used. Null values are accepted and will result
     * in the default partitioner being used.
     *
     * @param otherPartitioner the {@link StreamPartitioner} that specifies the partitioning strategy for the right (foreign
     *                         key) table of the foreign key join. The partitioning strategy must depend only on the message
     *                         key and not the message value. If {@code null} the default partitioner will be used.
     * @return new {@code TableJoined} instance configured with the {@code otherPartitioner}
     */
    public TableJoined<K, KO> withOtherPartitioner(final StreamPartitioner<KO, Void> otherPartitioner) {
        return new TableJoined<>(partitioner, otherPartitioner, name);
    }

    /**
     * Set the base name used for all components of the join, including internal topics
     * created to complete the join.
     *
     * @param name the name used as the base for naming components of the join including internal topics
     * @return new {@code TableJoined} instance configured with the {@code name}
     */
    @Override
    public TableJoined<K, KO> withName(final String name) {
        return new TableJoined<>(partitioner, otherPartitioner, name);
    }
}

...

Existing FK join interfaces (without custom partitioners) will continue to use the default partitioner for subscription and response topics. Existing methods which accept Named will be marked for deprecation (to be removed in the next major release, likely 4.0).

Rejected Alternatives

N/A