...
Public Interfaces
The following new methods method will be introduced:
Code Block |
---|
interface KStream<K, V> {
KStream<K, V> recursively(UnaryOperator<KStream<K, V>> op);
KStream<K, V> recursively(UnaryOperator<KStream<K, V>> op, Produced<K, V> produced);
} |
Note: UnaryOperator
is java.util.function.UnaryOperator
...
- op cannot be
UnaryOperator.identity
, or an equivalent function that simply returns its argument unmodified - this would produce an infinite recursive loop, since there's no opportunity refine the output to break out of the loop. op
MUST "terminate"; that is, it must have some condition which eventually prevents further recursion of a record. In our example here, the terminating condition is thejoin
, since the root node of our graph will have noparent
, so thejoin
will produce no output for the root node.- We can attempt to detect "definitely non-terminating" arguments by failing to detect operations that can cause the stream to terminate (e.g.
filter
,join
,flatMap
, etc.) in the process graph produced by the function. - We cannot guarantee that a function that includes terminating operations (
filter
,join
,flatMap
, etc.) actually terminates.
- We can attempt to detect "definitely non-terminating" arguments by failing to detect operations that can cause the stream to terminate (e.g.
Automatic Repartitioning
If the op
argument applies a key-changing operation (as it does in our example above), a repartition
topic may be automatically created. The optional Produced
argument can be provided to customize repartitioning behaviour. This argument is ignored if a repartition
topic is not necessary.
- We use
Produced
instead ofRepartitioned
, because when operating recursively, it would be an error to modify the number of partitions, since the topic MUST have the same number of partitions as the currentTask
.
...
- .
...
Implementation
In KStreamImpl
, implementation is fairly simple:
- We call
op
, passing our currentKStream
as its argument. This produces ouroutput
KStream.
- We determine if repartitioning is required on the
op
stream, and if it is, we automatically include a repartition node, equivalent to adding.repartition()
to the end of theop
stream.We wire up thegraphNode
from theoutput
KStream
as a parent of the currentKStream
. This takes care of the recursion. - Finally, we return the
output
KStream
. This enables users to operate on the records that are being recursively produced, as above.
...
The following tests will be added:
- Streaming recursion:
- No repartitioning required
- Repartitioning handled by main stream. Repartitioning handled by
- Counting descendants of graph nodes arriving in any order
op
argument.Counting descendants of graph nodes arriving in-order (as above)Rejected Alternatives
It's currently possible to implement streaming recursion via explicit topics, albeit with a number of disadvantages:
...