...
Code Block | ||
---|---|---|
| ||
public interface ProductionExceptionHandler extends Configurable {
...
@Deprecated
default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
...
}
/**
* Inspect a record that we attempted to produce, and the exception that resulted
* from attempting to produce it and determine to continue or stop processing.
*
* @param context
* The error handler context metadata.
* @param record
* The record that failed to produce.
* @param exception
* The exception that occurred during production.
*
* @return a {@link Response} object
*/
default Response handleError(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
return new Response(Result.from(handle(context, record, exception)), Collections.emptyList());
}
@Deprecated
default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record,
final Exception exception) {
...
}
/**
* Handles serialization exception and determine if the process should continue. The default implementation is to
* fail the process.
*
* @param context
* The error handler context metadata.
* @param record
* The record that failed to serialize.
* @param exception
* The exception that occurred during serialization.
* @param origin
* The origin of the serialization exception.
*
* @return a {@link Response} object
*/
default Response handleSerializationError(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
final SerializationExceptionOrigin origin) {
return new Response(Result.from(handleSerializationException(context, record, exception, origin)), Collections.emptyList());
}
@Deprecated
enum ProductionExceptionHandlerResponse {
...
}
/**
* Enumeration that describes the response from the exception handler.
*/
enum Result {
/** Resume processing.
*
* <p> For this case, output records which could not be written successfully are lost.
* Use this option only if you can tolerate data loss.
*/
RESUME(0, "RESUME"),
/** Fail processing.
*
* <p> Kafka Streams will raise an exception and the {@code StreamsThread} will fail.
* No offsets (for {@link org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or transactions
* (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 exactly-once}) will be committed.
*/
FAIL(1, "FAIL"),
/** Retry the failed operation.
*
* <p> Retrying might imply that a {@link TaskCorruptedException} exception is thrown, and that the retry
* is started from the last committed offset.
*
* <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for
* {@link org.apache.kafka.common.errors.RetriableException retriable exceptions}.
* If {@code RETRY} is returned for a non-retriable exception it will be interpreted as {@link #FAIL}.
*/
RETRY(2, "RETRY");
/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;
/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;
Result(final int id, final String name) {
this.id = id;
this.name = name;
}
/**
* Converts the deprecated enum ProductionExceptionHandlerResponse into the new Result enum.
*
* @param value the old ProductionExceptionHandlerResponse enum value
* @return a {@link ProductionExceptionHandler.Result} enum value
* @throws IllegalArgumentException if the provided value does not map to a valid {@link ProductionExceptionHandler.Result}
*/
@Deprecated
private static ProductionExceptionHandler.Result from(final ProductionExceptionHandlerResponse value) {
switch (value) {
case FAIL:
return Result.FAIL;
case CONTINUE:
return Result.RESUME;
case RETRY:
return Result.RETRY;
default:
throw new IllegalArgumentException("No Result enum found for old value: " + value);
}
}
}
/**
* Represents the result of handling a production exception.
* <p>
* The {@code Response} class encapsulates a {@link ProductionExceptionHandlerResponse},
* indicating whether processing should continue or fail, along with an optional list of
* {@link ProducerRecord} instances to be sent to a dead letter queue.
* </p>
*/
class Response {
private Result result;
private List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
/**
* Constructs a new {@code Response} object.
*
* @param result the result indicating whether processing should continue or fail;
* must not be {@code null}.
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
*/
private Response(final Result result,
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
this.result = result;
this.deadLetterQueueRecords = deadLetterQueueRecords;
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#FAIL} status.
*/
public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.FAIL, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#FAIL} status.
*/
public static Response fail() {
return fail(Collections.emptyList());
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RESUME} status.
*/
public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.RESUME, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RESUME} status.
*/
public static Response resume() {
return resume(Collections.emptyList());
}
/**
* Creates a {@code Response} indicating that processing should retry.
*
* @return a {@code Response} with a {@link ProductionExceptionHandler.Result#RETRY} status.
*/
public static Response retry() {
return new Response(Result.RETRY, Collections.emptyList());
}
/**
* Retrieves the production exception handler result.
*
* @return the {@link Result} indicating whether processing should continue, fail or retry.
*/
public Result result() {
return result;
}
/**
* Retrieves an unmodifiable list of records to be sent to the dead letter queue.
* <p>
* If the list is {@code null}, an empty list is returned.
* </p>
*
* @return an unmodifiable list of {@link ProducerRecord} instances
* for the dead letter queue, or an empty list if no records are available.
*/
public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
if (deadLetterQueueRecords == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(deadLetterQueueRecords);
}
}
}
|
...
Code Block | ||
---|---|---|
| ||
public interface DeserializationExceptionHandler extends Configurable {
...
@Deprecated
default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
...
}
/**
* Inspects a record and the exception received during deserialization.
*
* @param context
* Error handler context.
* @param record
* Record that failed deserialization.
* @param exception
* The actual exception.
*
* @return a {@link Response} object
*/
default Response handleError(final ErrorHandlerContext context, final ConsumerRecord<byte[], byte[]> record, final Exception exception) {
return new Response(Result.from(handle(context, record, exception)), Collections.emptyList());
}
@Deprecated
enum DeserializationHandlerResponse {
...
}
/**
* Enumeration that describes the response from the exception handler.
*/
enum Result {
/** Continue processing. */
RESUME(0, "RESUME"),
/** Fail processing. */
FAIL(1, "FAIL");
/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;
/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;
Result(final int id, final String name) {
this.id = id;
this.name = name;
}
/**
* Converts the deprecated enum DeserializationHandlerResponse into the new Result enum.
*
* @param value the old DeserializationHandlerResponse enum value
* @return a {@link Result} enum value
* @throws IllegalArgumentException if the provided value does not map to a valid {@link Result}
*/
@Deprecated
private static DeserializationExceptionHandler.Result from(final DeserializationHandlerResponse value) {
switch (value) {
case FAIL:
return Result.FAIL;
case CONTINUE:
return Result.RESUME;
default:
throw new IllegalArgumentException("No Result enum found for old value: " + value);
}
}
}
/**
* Represents the result of handling a deserialization exception.
* <p>
* The {@code Response} class encapsulates a {@link ProcessingExceptionHandler.Result},
* indicating whether processing should continue or fail, along with an optional list of
* {@link ProducerRecord} instances to be sent to a dead letter queue.
* </p>
*/
class Response {
private Result result;
private List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
/**
* Constructs a new {@code DeserializationExceptionResponse} object.
*
* @param result the result indicating whether processing should continue or fail;
* must not be {@code null}.
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
*/
private Response(final Result result,
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
this.result = result;
this.deadLetterQueueRecords = deadLetterQueueRecords;
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status.
*/
public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.FAIL, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status.
*/
public static Response fail() {
return fail(Collections.emptyList());
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status.
*/
public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.RESUME, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @return a {@code Response} with a {@link DeserializationHandlerResponse#CONTINUE} status.
*/
public static Response resume() {
return resume(Collections.emptyList());
}
/**
* Retrieves the deserialization handler result.
*
* @return the {@link Result} indicating whether processing should continue or fail.
*/
public Result result() {
return result;
}
/**
* Retrieves an unmodifiable list of records to be sent to the dead letter queue.
* <p>
* If the list is {@code null}, an empty list is returned.
* </p>
*
* @return an unmodifiable list of {@link ProducerRecord} instances
* for the dead letter queue, or an empty list if no records are available.
*/
public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
if (deadLetterQueueRecords == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(deadLetterQueueRecords);
}
}
}
|
...
Code Block | ||
---|---|---|
| ||
public interface ProcessingExceptionHandler extends Configurable {
...
@Deprecated
default ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception){
...
};
/**
* Inspects a record and the exception received during processing.
*
* @param context
* Processing context metadata.
* @param record
* Record where the exception occurred.
* @param exception
* The actual exception.
*
* @return a {@link Response} object
*/
default Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
return new Response(ProcessingExceptionHandler.Result.from(handle(context, record, exception)), Collections.emptyList());
}
@Deprecated
enum ProcessingHandlerResponse {
...
}
/**
* Enumeration that describes the response from the exception handler.
*/
enum Result {
/** Resume processing. */
RESUME(1, "RESUME"),
/** Fail processing. */
FAIL(2, "FAIL");
/**
* An english description for the used option. This is for debugging only and may change.
*/
public final String name;
/**
* The permanent and immutable id for the used option. This can't change ever.
*/
public final int id;
Result(final int id, final String name) {
this.id = id;
this.name = name;
}
/**
* Converts the deprecated enum ProcessingHandlerResponse into the new Result enum.
*
* @param value the old DeserializationHandlerResponse enum value
* @return a {@link ProcessingExceptionHandler.Result} enum value
* @throws IllegalArgumentException if the provided value does not map to a valid {@link ProcessingExceptionHandler.Result}
*/
@Deprecated
private static ProcessingExceptionHandler.Result from(final ProcessingHandlerResponse value) {
switch (value) {
case FAIL:
return Result.FAIL;
case CONTINUE:
return Result.RESUME;
default:
throw new IllegalArgumentException("No Result enum found for old value: " + value);
}
}
}
/**
* Represents the result of handling a processing exception.
* <p>
* The {@code Response} class encapsulates a {@link Result},
* indicating whether processing should continue or fail, along with an optional list of
* {@link org.apache.kafka.clients.producer.ProducerRecord} instances to be sent to a dead letter queue.
* </p>
*/
class Response {
private Result result;
private List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
/**
* Constructs a new {@code ProcessingExceptionResponse} object.
*
* @param result the result indicating whether processing should continue or fail;
* must not be {@code null}.
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
*/
private Response(final Result result,
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
this.result = result;
this.deadLetterQueueRecords = deadLetterQueueRecords;
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#FAIL} status.
*/
public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.FAIL, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should fail.
*
* @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#FAIL} status.
*/
public static Response fail() {
return fail(Collections.emptyList());
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
* @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#RESUME} status.
*/
public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
return new Response(Result.RESUME, deadLetterQueueRecords);
}
/**
* Creates a {@code Response} indicating that processing should continue.
*
* @return a {@code Response} with a {@link ProcessingExceptionHandler.Result#RESUME} status.
*/
public static Response resume() {
return resume(Collections.emptyList());
}
/**
* Retrieves the processing handler result.
*
* @return the {@link Result} indicating whether processing should continue or fail.
*/
public Result result() {
return result;
}
/**
* Retrieves an unmodifiable list of records to be sent to the dead letter queue.
* <p>
* If the list is {@code null}, an empty list is returned.
* </p>
*
* @return an unmodifiable list of {@link ProducerRecord} instances
* for the dead letter queue, or an empty list if no records are available.
*/
public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
if (deadLetterQueueRecords == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(deadLetterQueueRecords);
}
}
}
|
...