DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
| Code Block |
|---|
private ErrantRecordReporter reporter;
@Override
public void start(Map<String, String> props) {
...
try {
reporter = context.failedRecordReportererrantRecordReporter(); // may be null if DLQ not enabled
} catch (NoSuchMethodException | NoClassDefFoundError e) {
// Will occur in Connect runtimes earlier than 2.6
reporter = null;
}
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record: records) {
try {
// attempt to send record to data sink
process(record);
} catch(Exception e) {
if (reporter != null) {
// Send errant record to error reporter
Future<Void> future = reporter.acceptreport(record, e);
// Optionally wait till the failure's been recorded in Kafka
future.get();
} else {
// There's no error reporter, so fail
throw new ConnectException("Failed on record", e);
}
}
}
} |
...