...
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
Code Block | ||
---|---|---|
| ||
public class PulsarSourceBuilder<OUT> { ... // Skip the existing methods /** * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource. * * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine * whether the corresponding split has reached end of stream. If a record is matched by the * evaluator, the source would not emit this record as well as the following records in the same * split. * * <p>Note that the evaluator works jointly with the stopping criteria specified by the {@link * #setBoundedStopCursor(StopCursor)} or the {@link #setUnboundedStopCursor(StopCursor)}. * The source stops consuming from a split when any of these conditions is met. * * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator} * @return this KafkaSourceBuilder. */ public PulsarSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) { this.eofRecordEvaluator = eofRecordEvaluator; return this; } } |
5) For SQL users, a new connector option 'scan.record.evaluator.class' is added to provide the custom RecordEvaluator class.
Proposed Changes
We expect user to specify the EOF-detection logic in a RecordEvaluator instance and pass this instance to KafkaSourceBuilder::setEofRecordEvaluator. Then KafkaSource would enforce the EOF-detection logic in the following way:
...