DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
/**
* Watermarks are the progress indicators in the data streams. A watermark signifies
* that no events with a timestamp smaller or equal to the watermark's time will occur after the
* water. A watermark with timestamp <i>T</i> indicates that the stream's event time has progressed
* to time <i>T</i>.
*
* <p>Watermarks are created at the sources and propagate through the streams and operators.
*
* <p>In some cases a watermark is only a heuristic, meaning some events with a lower timestamp
* may still follow. In that case, it is up to the logic of the operators to decide what to do
* with the "late events". Operators can for example ignore these late events, route them to a
* different stream, or send update to their previously emitted results.
*
* <p>When a source reaches the end of the input, it emits a final watermark with timestamp
* {@code Long.MAX_VALUE}, indicating the "end of time".
*
* <p>Note: A stream's time starts with a watermark of {@code Long.MIN_VALUE}. That means that all records
* in the stream with a timestamp of {@code Long.MIN_VALUE} are immediately late.
*/
@Public
public final class Watermark implements Serializable {
private static final long serialVersionUID = 1L;
/** Thread local formatter for stringifying the timestamps. */
private static final ThreadLocal<SimpleDateFormat> TS_FORMATTER = ThreadLocal.withInitial(
() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));
// ------------------------------------------------------------------------
/** The watermark that signifies end-of-event-time. */
public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
// ------------------------------------------------------------------------
/** The timestamp of the watermark in milliseconds. */
private final long timestamp;
/**
* Creates a new watermark with the given timestamp in milliseconds.
*/
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
/**
* Returns the timestamp associated with this Watermark.
*/
public long getTimestamp() {
return timestamp;
}
/**
* Formats the timestamp of this watermark, assuming it is a millisecond timestamp.
* The returned format is "yyyy-MM-dd HH:mm:ss.SSS".
*/
public String getFormattedTimestamp() {
return TS_FORMATTER.get().format(new Date(timestamp));
}
// ------------------------------------------------------------------------
@Override
public boolean equals(Object o) {
return this == o ||
o != null &&
o.getClass() == Watermark.class &&
((Watermark) o).timestamp == this.timestamp;
}
@Override
public int hashCode() {
return Long.hashCode(timestamp);
}
@Override
public String toString() {
return "Watermark @ " + timestamp + " (" + getFormattedTimestamp() + ')';
}
} |
...