Status

TBD

Motivation

When StreamsUncaughtExceptionHandler or ProcessingExceptionHandler return SHUTDOWN or FAIL, the Streams does not commit previous successful processed messages and, if the process automatically restart, these records will be repeated produced in output topics.

Proposed Change

Allow ProcessingExceptionHandler return COMMIT_AND_FAIL option.

We propose a new exception handler behavior where the StreamTask will commit all previous successfully processed message except the failing one. After the process restart, the failed message will be the first to be consumed.

The proposed ProcessingHandlerResponse would be:

public interface ProcessingExceptionHandler extends Configurable {
    ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception);

    enum ProcessingHandlerResponse {
        /** Continue processing. */
        CONTINUE(1, "CONTINUE"),
        /** Fail processing. */
        FAIL(2, "FAIL"),
        COMMIT_AND_FAIL(3, "COMMIT_AND_FAIL");

        /**
         * An english description for the used option. This is for debugging only and may change.
         */
        public final String name;

        /**
         * The permanent and immutable id for the used option. This can't change ever.
         */
        public final int id;

        ProcessingHandlerResponse(final int id, final String name) {
            this.id = id;
            this.name = name;
        }
    }
}


New or Changed Public Interfaces

The goal of this KIP is to implement the necessary changes without changing the API or interfaces outside of ProcessingHandlerResponse

Migration Plan and Compatibility

The proposed change is fully backward compatible and does not impact existing applications.

Rejected Alternatives

TBD

  • No labels