Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

2. Time-triggered processing: this function can be triggered whenever a specified time period has elapsed. It can be used for windowing computation, for example.

...

Compossible Processing

Multiple processors should be able to chained up to form a DAG (i.e. the processor topology) for complex processing logic.

Users can define such processor topology in a exploring REPL manner: make an initial topology, deploy and run, check the results and intermediate values, and pause the job and edit the topology on-the-fly.

Local State Storage

Users can create state storage inside a processor that can be accessed locally.

...

Code Block
languagejava
public interface ProcessorContext {

    void send(String topic, Object key, Object value);  // send the key value-pair to a Kafka topic

    void schedule(long timestamp);                      // repeatedly schedule the punctuation function for the period

    void commit();                                      // commit the current state, along with the upstream offset and the downstream sent data

    String topic();                                     // return the Kafka record's topic of the current processing key-value pair

    int partition();                                    // return the Kafka record's partition id of the current processing key-value pair

    long offset();                                      // return the Kafka record's offset of the current processing key-value pair
}
 
public interface Processor<K, V>  {

    void init(ProcessorContext context);           // initialize the processor

    void process(K1 key, V1 value);                // process a key-value pair

    void punctuate();                              // process when the the scheduled time has reached
 
    void close();                                  // close the processor
}
 
public interface ProcessorDef {

    Processor instance();                          // create a new instance of the processor from its definition
}

 

And users can create their processing job with the created processor topology as the following:

Code Block
languagejava

 
public class ProcessorJobTopologyBuilder {
 
    privatepublic staticfinal classTopologyBuilder MyProcessorDef implements ProcessorDef {

  addSource(String name, String... topics) { ... }      @Override
        public Processor<String, Integer> instance() {
       // add a source node returnto newthe Processor<String, Integer>() {
       topology which generates incoming traffic with the specified Kafka topics
 
    public final TopologyBuilder addSink(String name, privateString ProcessorContext context;
    topic, String... parentNames) { ... }        // add a sink privatenode KeyValueStore<String,to Integer> kvStore;

                @Overridethe topology with the specified parent nodes that sends out-going traffic to the specified Kafka topics
 
    public final TopologyBuilder addProcessor(String name, ProcessorDef definition,      public void init(ProcessorContext contextString... parentNames) {
 ... }   // add a processor node to the topology with the specified parent nodes
}

 

And users can create their processing job with the created processor topology as the following:

Code Block
languagejava
public class ProcessorJob {

    this.context = context;
          private static class MyProcessorDef implements ProcessorDef {

     this.context.schedule(this, 1000);   @Override
        public Processor<String, Integer> instance() {
         this.kvStore =   return new InMemoryKeyValueStore<>("local-state"Processor<String, contextInteger>(); {
                private }ProcessorContext context;

                private KeyValueStore<String, Integer> kvStore;

                @Override
                public void processinit(String key, Integer valueProcessorContext context) {
                    Integerthis.context oldValue = this.kvStore.get(key)context;
                    if (oldValue == null) {this.context.schedule(this, 1000);
                    this.kvStore =   this.kvStore.put(key, valuenew InMemoryKeyValueStore<>("local-state", context);
                }

      } else {
        @Override
                intpublic newValuevoid = oldValue + value;process(String key, Integer value) {
                    Integer oldValue =  this.kvStore.putget(key, newValue);
                    if  }

(oldValue == null) {
                        contextthis.kvStore.commitput(key, value);
                }

    } else {
          @Override
              int newValue public= voidoldValue punctuate(long streamTime) {+ value;
                    KeyValueIterator<String, Integer> iter = this.kvStore.allput(key, newValue);

                    }

      while (iter.hasNext()) {
            context.commit();
            Entry<String, Integer> entry = iter.next(); }

                @Override
        System.out.println("[" + entry.key() + ", " + entry.value() + "]");

   public void punctuate(long streamTime) {
                     context.forward(entry.key(), entry.value());KeyValueIterator<String, Integer> iter = this.kvStore.all();

                    }
  while (iter.hasNext()) {
              }

          Entry<String, Integer> entry    @Override= iter.next();

                public void close() {
     System.out.println("[" + entry.key() + ", " + entry.value()        this.kvStore.close(+ "]");

                }
            }context.forward(entry.key(), entry.value());
        }
    }

    public static void main(String[] args) throws Exception {
 }
           StreamingConfig config = new StreamingConfig(new Properties()); }

        TopologyBuilder builder = new TopologyBuilder();
    @Override
    KafkaStreaming streaming = new KafkaStreaming(builder, config);
        streaming.start();
public void close() {
                    this.kvStore.close();
                }
            };
        }
    }

 

This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.

 

High-level Stream DSL

In addition to the processor API, we would also like to introduce a higher-level stream DSL for users that covers most common processor implementations.

Code Block
languagejava
public interface KStream<K, V> {

    /**


    public static void main(String[] args) throws Exception {
      *  StreamingConfig Createsconfig a= new stream consists of all elements of this stream which satisfy a predicate
     */StreamingConfig(new Properties());
 
        // build topology
        TopologyBuilder builder = new TopologyBuilder();
    KStream<K,   V> filter(Predicate<K, V> predicate);
builder.addSource("SOURCE", "topic-source");
    /**
     * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream
 .addProcessor("PROCESS", new MyProcessorDef(), "SOURCE");
             */
    <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, K1, V1> mapper);
.addSink("SINK", "topic-sink", "PROCESS");
 
    /**
    // *start Createsprocess
 a new stream by transforming valuesa by aKafkaStreaming mapperstreaming to= all values of this stream
     */
new KafkaStreaming(builder, config);
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapperstreaming.start();

    }
}

 

This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.

 

High-level Stream DSL

In addition to the processor API, we would also like to introduce a higher-level stream DSL for users that covers most common processor implementations.

Code Block
languagejava
public interface KStream<K, V> {

    /**/**
     * Creates a new stream by applying a flat-mapper to all elements of this stream
     */
    <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, K1, ? extends Iterable<V1>> mapper);

    /**
     * Creates a new stream by applying a flat-mapper toconsists of all valueselements of this stream which satisfy a predicate
     */
    <V1> KStream<K, V1>V> flatMapValuesfilter(ValueMapper<VPredicate<K, ? extends Iterable<V1>> processorV> predicate);

    /**
     * Creates a new windowed stream usingby atransforming specifiedkey-value windowpairs instance.
by a mapper to all elements of this stream
     */
    KStreamWindowed<K<K1, V1> KStream<K1, V>V1> with(Window<Kmap(KeyValueMapper<K, V, K1, V>V1> windowmapper);

    /**
     * Creates an array of streams from thisa new stream. Eachby streamtransforming invaluesa theby arraya correspondsmapper to aall predicatevalues in
of     * supplied predicates in the same order. this stream
     */
    <V1> KStream<K, V>[]V1> branchmapValues(Predicate<KValueMapper<V, V>... predicatesV1> mapper);

    /**
     * Sends key-value to a topic. Creates a new stream by applying a flat-mapper to all elements of this stream
     */
    <K1, V1> KStream<K1, voidV1> sendTo(String topicflatMap(KeyValueMapper<K, V, K1, ? extends Iterable<V1>> mapper);

    /**
     * SendsCreates key-valuea tonew astream topic,by alsoapplying creates a newflat-mapper streamto fromall thevalues topic.
of     * This is mostly used for repartitioning and is equivalent to calling sendTo(topic) and from(topic).this stream
     */
    <V1> KStream<K, V>V1> through(String topicflatMapValues(ValueMapper<V, ? extends Iterable<V1>> processor);

    /**
     * ProcessesCreates alla elementsnew inwindowed thisstream streamusing bya applyingspecified awindow processorinstance.
     */
    <K1KStreamWindowed<K, V1> KStream<K1, V1> process(KafkaProcessor<K, V, K1, V1> processor);
 
    // .. more operators
}

public interface KStreamWindowed<K, V> extends KStream<K, V> {V> with(Window<K, V> window);

    /**
     * Creates aan newarray streamof bystreams joiningfrom this stream. windowedEach stream within the other windowed stream. array corresponds to a predicate in
     * Eachsupplied elementpredicates arrivedin fromthe eithersame oforder. the
 streams is joined with elements*/
 with the same key in another stream.
KStream<K, V>[] branch(Predicate<K, V>... predicates);

    /**
 * The resulting values are* computedSends bykey-value applyingto a joinertopic.
     */
    <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);

    /**
     * Createsvoid sendTo(String topic);

    /**
     * Sends key-value to a topic, also creates a new stream by joining this windowed stream with the other windowed streamfrom the topic.
     * EachThis elementis arrivedmostly fromused eitherfor ofrepartitioning theand streamsis isequivalent joinedto with elements with the same key in another streamcalling sendTo(topic) and from(topic).
     */
    KStream<K, *V> if the element from the other stream has an older timestamp.through(String topic);

    /**
     * Processes Theall resultingelements valuesin arethis computedstream by applying a joinerprocessor.
     */
    <V1<K1, V2>V1> KStream<KKStream<K1, V2>V1> joinPriorprocess(KStreamWindowed<KKafkaProcessor<K, V1> otherV, ValueJoiner<VK1, V1, V2> joinerV1> processor);
}

 

With this high-level interface, the user instantiated program can be simplified as (using lambda expression in Java 0.8):

Code Block
languagejava
public class KStreamJob 
    // .. more operators
}

public interface KStreamWindowed<K, V> extends KStream<K, V> {

    public/**
 static void main(String[] args) throws* ExceptionCreates {
a new stream by joining this windowed stream StreamingConfigwith configthe =other new StreamingConfig(props);

windowed stream.
     * Each element KStreamBuilderarrived builderfrom =either new KStreamBuilder();

        StringSerializer stringSerializer = new StringSerializer();of the streams is joined with elements with the same key in another stream.
     * The resulting values IntegerSerializerare computed intSerializerby =applying new IntegerSerializer();
a joiner.
     */
   KStream<String <V1, String>V2> stream1KStream<K, =V2> builder.from(new StringDeserializer(), new StringDeserializer(), "topic1"join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);

    /**
    KStream<String, Integer>* stream2Creates =
a new stream by joining this windowed stream with the other windowed stream1.map((key, value) -> new KeyValue<>(key, new Integer(value)))
                   .filter(((key, value) -> true));

stream.
     * Each element arrived from either of the streams is joined with elements with the same key in another stream
     * if the element KStream<String, Integer>[] streams = stream2
   from the other stream has an older timestamp.
     * The resulting  .branch((key, value) -> value > 10,values are computed by applying a joiner.
     */
    <V1, V2> KStream<K, V2> joinPrior(KStreamWindowed<K, V1> other,     (keyValueJoiner<V, V1, V2> joiner);
}

 

With this high-level interface, the user instantiated program can be simplified as (using lambda expression in Java 0.8):

Code Block
languagejava
public class KStreamJob {
value) -> value <= 10);
 
        streams[0].sendTo("topic2", stringSerializer, intSerializer);
    public static void  streamsmain(String[1].sendTo("topic3", stringSerializer, intSerializer);

] args) throws Exception {
        KafkaStreamingStreamingConfig kstreamconfig = new KafkaStreaming(builder, configStreamingConfig(props);

        // build the topology
        KStreamBuilder builder = new KStreamBuilder();

        KStream<String, String> stream1 = kstreambuilder.startfrom("topic1");

    }
}

 

Architecture Design

We summarize some key architecture design points in the following sub-sections.

 

Image Removed

Partition Distribution

     KStream<String, Integer> stream2 =
            stream1.map((key, value) -> new KeyValue<>(key, new Integer(value)))
                   .filter(((key, value) -> true));

        KStream<String, Integer>[] streams = stream2
            .branch((key, value) -> value > 10,
                    (key, value) -> value <= 10);
 
        streams[0].sendTo("topic2");
        streams[1].sendTo("topic3");

        // start the process
        KafkaStreaming kstream = new KafkaStreaming(builder, config);
        kstream.start();
    }
}

 

Architecture Design

We summarize some key architecture design points in the following sub-sections.

 

Image Added

Partition Distribution

As shown in the digram above, each KStream process could have multiple threads (#.threads configurable in the properties), with As shown in the digram above, each KStream process could have multiple threads (#.threads configurable in the properties), with each thread having a separate consumer and producer. So the first question is how can we distribute the partitions of the subscribed topics in the source processor among all the processes / threads.

...

    b. All user-specified local states will also be created during the initialization process . This will require the corresponding changelog to be created for the local state stores, such that:

...

(we will talk about this later in the later sections).

    c. Creates the record queue for each one of the task's associated partition-group's partitions, so that when consumers fetches new messages, it will put them into the corresponding queue.

...

Users can create one or more state stores during their processing logic, and each task will have a state manager that keeps an instance of each specified store inside the task.

and a store instance will be created for each of their specified partition groups. Since a single store instance will not be shared across multiple partition groups, and each partition group will only be processed by a single thread, this guarantees any store will not be accessed concurrently by multiple thread at any given time.

...

Each state store will be backed up by a different Kafka change log topic, and each instance of the store correlates to one partition of the topic, such that:

Code Block
#.partitions tasks of== the#. changepartition loggroups == #. store instances for each state store instances == #. partition groupspartitions of the change log for each state store


For example, if a processor instance consumes from upstream Kafka topic "topic-A" with 4 partitions, and creates two stores, namely store1 and store2, and user groups the 8 4 partitions into {topic-A-p1, topic-A-p2} and {topic-A-p3, topic-A-p4}; then two change log topics, for example namely "topic-store1-changelog" and "topic-store2-changelog", need to be created beforehand, each with two partitions.

...

    b. Otherwise, do not load the previously flushed state and replay the change log from the beginning up to the log-end-offset.

 

Workflow Summary

This section summarized We summarize the KafkaProcess workflow of a kafka streaming process following the above architecture design.

Startup

Upon user calling KafkaProcesscalling KafkaStreaming.start(), the process instance creates the worker threads given user specified #.threads. In each worker thread:

1. Trigger Topology.build() to retrieve  Construct the producer and consumer client, extract the subscription topic names from the topology.

2. Construct Let the consumer , to subscribe to the topics and gets the assigned partitions.

3. Trigger the grouping function with the assigned partitions get the returned list of partition-groups with associated ids.

4. Initialize each partition group by:

    a. Creates a record queue for buffering the fetched records for each partition.

    b. Initialize the topology with the constructed processor context, in which users may create the local state.

    c. Perform validations on changelog topics if local state gets created and restored following the above steps.

5. Each thread then runs their loop at their own pace, there is no synchronization between these threads. In each iteration of the loop:

     a. Thread checks if the record queues are empty / low, and if yes calls consumer.poll(timeout) / consumer.poll(0) to re-fill the buffer.

     b. Choose one record from the queues and process it through the processor topology.

     c. Check if user calls commit() during the processing of this records; if yes commit the offset / flush the local state / flush the producer.

 

Shutdown

Upon user calling KafkaProcess.shutdown(), the following steps are executed:

1. Commit / flush each partition-group's current processing state as described in the local state management section.

2. Close the embedded producer and consumer clients.

 

Packaging Design

It would be best to package Processor / KStream as a separate jar, since it introduces extra external dependencies, such as RocksDB, etc. Under this model:

1. We will let users to create their own MyKStream.java class that depends on the kafka-stream.jar.

2. We will let users to write their own Main function as the entry point for starting their process instance.

 

Current class / package names can be found in this PR. A general summary:

1. All classes are defined in the "stream" folder.

2. Low-level Processor interface is under the "o.a.k.clients.processor" package; high-level KStream interface is under the "o.a.k.stream" package.

3. Important user-facing classes include:

Code Block
KafkaProcessor: implements Processor, Receiver, Punctuator; used for computation logic.
 
ProcessorContext: passed in KafkaProcessor.init(); provides schedule / send / commit / etc functions, and topic / partition / offset / etc source record metadata.

StateStore: can be created inside KafkaProcessor.init() for storing local state.
PTopology: requires users to implement the build() function, in which addProcessor / addSource can be used to construct the DAG.
 
KStreamTopology: extends PTopology, and in its build() function high-level operators like map / filter / branch / etc can be used.

KStreamProcess: used in main function to take provided Topology class and configs to start the instance.

(hence tasks) with associated ids.

4. Initialize each task by:

    a. Creates a record queue for buffering the fetched records for each partition.

    b. Initialize a topology instance for the task from the builder with a newly created processor context.

    c. Initialize the state manager of the task and constructs / resumes user defined local states.

5. Runs the loop at its own pace until notified to be shutdown: there is no synchronization between these threads. In each iteration of the loop:

     a. Thread checks if the record queues are empty / low, and if yes calls consumer.poll(timeout) / consumer.poll(0) to re-fill the buffer.

     b. Choose one record from the queues and process it through the processor topology.

     c. Check if some of the processors' punctuate functions need to be triggered, and if yes, execute the function.

     d. Check if user calls commit() during the processing of this records; if yes commit the offset / flush the local state / flush the producer.

 

Shutdown

Upon user calling KafkaStreaming.shutdown(), the following steps are executed:

1. Commit / flush each partition-group's current processing state as described in the local state management section.

2. Close the embedded producer and consumer clients.

 

Packaging Design

It would be best to package Processor / KStream as a separate jar, since it introduces extra external dependencies, such as RocksDB, etc. Under this model:

1. We will let users to create their own MyKStream.java class that depends on the kafka-stream.jar.

2. We will let users to write their own Main function as the entry point for starting their process instance.

 

Current class / package names can be found in this PR. A general summary:

1. All classes are defined in the "streams" folder.

2. Low-level Processor interface is under the "o.a.k.streams.processor" package; high-level KStream interface is under the "o.a.k.streams.kstream" package.

3. Some example classes can be found in o.a.k.streamstreams.examples.

 

4. Important internal classes include:

Code Block
PartitionGroup: a set of partitions along with their queuing buffers and timestamp extraction logic.
 
ProcessorStateManager: the manager of the local states within a task.
 
ProcessorTopologyIngestor: the wrappedinstance consumerof instancethe fortopology fetchinggenerated databy /the managing offsetsTopologyBuilder.
 
KStreamThread: multi-threaded KStreamProcess will create #.KStreamThread specified in configs, each maintaining its own IngestorStreamTask: the task of the processing tasks unit, which include a ProcessorTopology, a ProcessorStateManager and a PartitionGroup.
 
StreamGroupStreamThread: thecontains unitmultiple ofStreamTasks, processinga tasksConsumer thatand area assigned to KStreamThread within the KStreamProcess instanceProducer client.
 
KStreamFilter/Map/Branch/...: implementations of high-level KStream topology builder operators.

...