DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
TBD
Motivation
When StreamsUncaughtExceptionHandler or ProcessingExceptionHandler return SHUTDOWN or FAIL, the Streams does not commit previous successful processed messages and, if the process automatically restart, these records will be repeated produced in output topics.
Proposed Change
Allow ProcessingExceptionHandler return COMMIT_AND_FAIL option.
We propose a new exception handler behavior where the StreamTask will commit all previous successfully processed message except the failing one. After the process restart, the failed message will be the first to be consumed.
The proposed ProcessingHandlerResponse would be:
public interface ProcessingExceptionHandler extends Configurable {
ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception);
enum ProcessingHandlerResponse {
/** Continue processing. */
CONTINUE(1, "CONTINUE"),
/** Fail processing. */
FAIL(2, "FAIL"),
COMMIT_AND_FAIL(3, "COMMIT_AND_FAIL");
/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;
/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;
ProcessingHandlerResponse(final int id, final String name) {
this.id = id;
this.name = name;
}
}
}
New or Changed Public Interfaces
The goal of this KIP is to implement the necessary changes without changing the API or interfaces outside of ProcessingHandlerResponse.
Migration Plan and Compatibility
The proposed change is fully backward compatible and does not impact existing applications.
Rejected Alternatives
TBD