DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
Public Interfaces
The following new methods method will be introduced:
| Code Block |
|---|
interface KStream<K, V> {
KStream<K, V> recursively(UnaryOperator<KStream<K, V>> op);
KStream<K, V> recursively(UnaryOperator<KStream<K, V>> op, Produced<K, V> produced);
} |
Note: UnaryOperator is java.util.function.UnaryOperator
...
- op cannot be
UnaryOperator.identity, or an equivalent function that simply returns its argument unmodified - this would produce an infinite recursive loop, since there's no opportunity refine the output to break out of the loop. opMUST "terminate"; that is, it must have some condition which eventually prevents further recursion of a record. In our example here, the terminating condition is thejoin, since the root node of our graph will have noparent, so thejoinwill produce no output for the root node.- We can attempt to detect "definitely non-terminating" arguments by failing to detect operations that can cause the stream to terminate (e.g.
filter,join,flatMap, etc.) in the process graph produced by the function. - We cannot guarantee that a function that includes terminating operations (
filter,join,flatMap, etc.) actually terminates.
- We can attempt to detect "definitely non-terminating" arguments by failing to detect operations that can cause the stream to terminate (e.g.
Automatic Repartitioning
If the op argument applies a key-changing operation (as it does in our example above), a repartition topic may be automatically created. The optional Produced argument can be provided to customize repartitioning behaviour. This argument is ignored if a repartition topic is not necessary.
- We use
Producedinstead ofRepartitioned, because when operating recursively, it would be an error to modify the number of partitions, since the topic MUST have the same number of partitions as the currentTask.
...
- .
...
Implementation
In KStreamImpl, implementation is fairly simple:
- We call
op, passing our currentKStreamas its argument. This produces ouroutputKStream. - We determine if repartitioning is required on the
opstream, and if it is, we automatically include a repartition node, equivalent to adding.repartition()to the end of theopstream.We wire up thegraphNodefrom theoutputKStreamas a parent of the currentKStream. This takes care of the recursion. - Finally, we return the
outputKStream. This enables users to operate on the records that are being recursively produced, as above.
...
The following tests will be added:
- Counting descendants of graph nodes arriving in-order (as above)
- Counting descendants of graph nodes arriving in any orderStreaming recursion:
- No repartitioning required
- Repartitioning handled by main stream. Repartitioning handled by
op argument.Rejected Alternatives
It's currently possible to implement streaming recursion via explicit topics, albeit with a number of disadvantages:
...