Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
class MyPojo {
  Integer key;
  Integer value;
  Set<Long> timers;
}

class ReaderFunction extends KeyedStateReaderFunction<Integer, MyPojo> {
  ValueState<Integer> state;

  @Override
  public void open(Configuration parameters) {
     state = getRuntimeContext().getState(stateDescriptor);
  }

  @Override
  public void processKey(
	Integer key,
	Context ctx,
	Collector<MyPojo> out) throws Exception {

     MyPojo pojo = new MyPojo();
     pojo.key 	 = key;
     pojo.value  = state.value();
     pojo.timers = ctx.getEventTimeTimers();
     out.collect(pojo);
  }
}

DataSet<MyPojo> keyedState = operatorsavepoint.readKeyedState("uid", new ReaderFunction());

...