...
Code Block | ||
---|---|---|
| ||
public class WordCount { // Job definition public static void main(String[] args) throws Exception { // Some initialization code... // Source DataStream<String> text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input"); // Aggregate DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()) .name("tokenizer") .keyBy(value -> value.f0) // User codes .flatMap(new Counter()) .name("counter"); // Sink counts.print().name("print-sink"); env.execute("WordCount"); } // Core processing user function public static final class Counter extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { // Initialization omitted. private transient ValueState<Integer> wordCounter; // Synchronous state API usage @Override public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { Integer val = wordCounter.value(); int updated = (val == null ? 1 : val + 1); wordCounter.update(updated); out.collect(Tuple2.of(value.f0, updated)); } // Asynchronous state API usage @Override public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { AtomicInteger updated = new AtomicInteger(0); wordCounter.value().thenthenCompose( val -> { if (val == null) { updated.set(1); } else { updated.set(val + 1); } return wordCounter.update(updated.get()); } ).thenthenAccept( empty -> { out.collect(Tuple2.of(value.f0, updated.get())); } ); } } } |
...
DataStream users need to rewrite the stateful operators in a call-back way as demonstrated in the example. We will try to provide tools later to facilitate migration.
Rejected Alternatives
Tiered Storage: DFS as complimentary when the local disk is out of space. While initially appealing due to its intuitive nature and potential performance benefits within local capacity, the tiered storage solution with local disk overflow to remote storage ultimately proves unsuitable for disaggregated state management in Flink.
Here's why:
- Heavy Checkpointing Procedure: A considerable amount of files need to be uploaded during checkpointing.
- Limited Data Structure Flexibility: Confining local disk data to the SST format restricts potential performance gains from alternative caching structures.
- Inaccurate Warm/Cold Distinction: File-level classification of data as warm or cold inaccurately reflects actual access patterns, leading to suboptimal resource allocation.
- More Complicated File Management: This architecture indicates that both local disk and DFS play part of the primary storage, hence needs to unify the file management of the local disk and DFS, which is complicated in cases
Appendix: How to run the PoC
...