...
Code Block | ||
---|---|---|
| ||
class MyPojo { Integer key; Integer value; Set<Long> timers; } class ReaderFunction extends KeyedStateReaderFunction<Integer, MyPojo> { ValueState<Integer> state; @Override public void open(Configuration parameters) { state = getRuntimeContext().getState(stateDescriptor); } @Override public void processKey( Integer key, Context ctx, Collector<MyPojo> out) throws Exception { MyPojo pojo = new MyPojo(); pojo.key = key; pojo.value = state.value(); pojo.timers = ctx.getEventTimeTimers(); out.collect(pojo); } } DataSet<MyPojo> keyedState = operatorsavepoint.readKeyedState("uid", new ReaderFunction()); |
...