Status
Current state: UNDER DISCUSSIONPartially Accepted. Superseded by SEP-13.
Discussion thread: http://mail-archives.apache.org/mod_mbox/samza-dev/201704.mbox/%3CCAFvExu0UptjFJnHtbbVsHteM3gfsZKHMPq%2BVrLjWsPmmjPunzw%40mail.gmail.com%3E
...
Code Block |
---|
/** * Example code to implement window-based counter with a re-partition stage */ public class PageViewCounterExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”); MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”); pageViewEvents. partitionBy(m -> m.getMessage().memberId). window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). setEarlyTrigger(Triggers.repeat(Triggers.count(5))). setAccumulationMode(AccumulationMode.DISCARDING)). map(MyStreamOutput::new). sendTo(pageViewPerMemberCounters); } public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config); localRunner.run(new PageViewCounterExample()); } } |
Table 1: Repartition counter example using new fluent API
...