Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Example for the interface  Predicate




Code Block
languagejava
public interface Predicate<K, V> extends Named {

    /**
     * Test if the record with the given key and value satisfies the predicate.
     *
     * @param key   the key of the record
     * @param value the value of the record
     * @return {@code true} if the {@link KeyValue} pair satisfies the predicate&mdash;{@code false} otherwise
     */
    boolean test(final K key, final V value);

    /**
     * Creates a new operation with the specified name.
     *
     * @param name          the name to be used for generated processors applied to this stream.
     * @param predicate     the predicate.
     * @param <K>           the type of keys
     * @param <V>           the type of values
     * @return a new {@link Predicate} instance.
     */
    static <K, V> Predicate<K, V> named(final String name, final Predicate<K, V> predicate) {
        return new Predicate<K, V>() {
            @Override
            public boolean test(K key, V value) {
                return predicate.test(key, value);
            }

            @Override
            public String name() {
                return name;
            }
        };
    }
}




Stateless operations

Stateless operations like foreach(), map(), flatMap(), mapValues(), filter(),filterNot,  are translated into a single processor. The action interfaces depicted above will be used to directly change the processor name (i.e ForEachAction, ValueMapper, ValueMapperWithKey KeyValueMapper).

...

Code Block
languagejava
final StreamsBuilder builder = new StreamsBuilder();


builder.stream("topic-input", Consumed.withProcessorNamewith("STREAM-FROM-TOPIC-INPUT")
        .filter(Predicate.named("FILTER-NULL-VALUE", (k, v) -> true ))
        .map(KeyValueMapper.named("MAP-TO-UPPERCASE", (k, v) -> KeyValue.pair(k, v.toUpperCase()))
        .to("topic-output", Produced.with("TO-OUTPUT-TOPIC"));

System.out.println(builder.build().describe());
---- (output)----
Topologies:
   Sub-topology: 0
    Source: STREAM-FROM-TOPIC-INPUT (topics: [topic-input])
      --> FILTER-NULL-VALUE
    Processor: FILTER-NULL-VALUE (stores: [])
      --> MAP-TO-UPPERCASE
      <-- STREAM-FROM-TOPIC-INPUT
    Processor: MAP-TO-UPPERCASE (stores: [])
      --> TO-OUTPUT-TOPIC
      <-- FILTER-NULL-VALUE
    Sink: TO-OUTPUT-TOPIC (topic: topic-output)
      <-- MAP-TO-UPPERCASE

...