Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update code example

...

Code Block
languagejava
public class WordCount {

    // Job definition
    public static void main(String[] args) throws Exception {
        // Some initialization code...

        // Source
        DataStream<String> text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");

        // Aggregate
        DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new Tokenizer())
                        .name("tokenizer")
                        .keyBy(value -> value.f0)
                        // User codes
                        .flatMap(new Counter())
                        .name("counter");

        // Sink
        counts.print().name("print-sink");
        
        env.execute("WordCount");
    }

    // Core processing user function
    public static final class Counter extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

        // Initialization omitted.
        private transient ValueState<Integer> wordCounter;
        
        // Synchronous state API usage
        @Override
        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
            Integer val = wordCounter.value();
            int updated = (val == null ? 1 : val + 1);
            wordCounter.update(updated);
            out.collect(Tuple2.of(value.f0, updated)); 
        }

        // Asynchronous state API usage
        @Override
        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
            AtomicInteger updated = new AtomicInteger(0);
            wordCounter.value().thenthenCompose(
                    val -> {
                        if (val == null) {
                            updated.set(1);
                        } else {
                            updated.set(val + 1);
                        }
                        return wordCounter.update(updated.get());
                    }
            ).thenthenAccept(
                    empty -> {
                        out.collect(Tuple2.of(value.f0, updated.get()));
                    }
            );
        }
        
    }
}

...

DataStream users need to rewrite the stateful operators in a call-back way as demonstrated in the example. We will try to provide tools later to facilitate migration.

Rejected Alternatives

Tiered Storage: DFS as complimentary when the local disk is out of space. While initially appealing due to its intuitive nature and potential performance benefits within local capacity, the tiered storage solution with local disk overflow to remote storage ultimately proves unsuitable for disaggregated state management in Flink.

Here's why:

  1. Heavy Checkpointing Procedure: A considerable amount of files need to be uploaded during checkpointing.
  2. Limited Data Structure Flexibility: Confining local disk data to the SST format restricts potential performance gains from alternative caching structures.
  3. Inaccurate Warm/Cold Distinction: File-level classification of data as warm or cold inaccurately reflects actual access patterns, leading to suboptimal resource allocation.
  4. More Complicated File Management: This architecture indicates that both local disk and DFS play part of the primary storage, hence needs to unify the file management of the local disk and DFS, which is complicated in cases

Appendix: How to run the PoC

...