Motivation
Many applications within enterprise Data Processing Pipelines use SQS as a source. SQS can capture the entire event or simply the URL of the event location that the pipeline needs to process. Depending on the use case, these pipelines process events either in batches or in a streaming fashion.
While migrating these pipelines to Flink, we identified the need for a Flink SQS Connector that can reliably ingest events from the SQS queue. This connector will currently support Amazon SQS Standard queue type only, support for FIFO Queues will be added later as an extension. AWS SQS Queue Types Reference - Standard and FIFO queues.
Additionally, this connector can be extended to support reading data from S3 files after extracting the URL from the SQS payload.
Public Interfaces
The interfaces for the connector mentioned rely on the standard Source interfaces detailed in FLIP-27: Refactor Source Interface. Therefore, we will adhere to the public contract listed in this FLIP.
SQS Terminology
Visibility Timeout - Visibility timeout is a mechanism that prevents other consumers from processing the same message while it is being worked on. Amazon SQS does not automatically delete the message; instead, the consumer must explicitly delete the message using the DeleteMessage action after it has been successfully processed.
Message Retention Period - The retention period describes how much time a message retains within the queue before it gets deleted automatically.
The retention period can be in the interval of 1 minute and 14 days. If a message reaches the time of the retention period SQS deletes the message automatically.
We typically set this to a sufficiently long duration. This is beneficial for recovering from extended outages and backfilling data.
Proposed Changes
The new SQS source connector will be based on the Refactored Source Interface (FLIP-27). We see a similar effort for adding support for an SQS Sink FLIP-438: Amazon SQS Sink Connector. We plan to use the same package, "flink-connector-sqs," under the "flink-connectors-aws" repository and create a “source” folder that will contain the code for the SQS-based source.
This connector will support both Bounded (Batch) and Unbounded (Streaming) cases. It will be implemented to support the DataStream API (now) and the Table API/SQL (future extension). The key concepts for this connector are outlined below:
- A Split is an SQS queue URL.
- The SplitEnumerator registers the split with the SourceReader.
- The SourceReader reads messages from the assigned splits using the SQS client and returns the deserialized records via the ReceiveMessage AWS API.
Rate Limiting Strategy:
We will be adding a customisable RateLimiterStrategy in our connector, which applies rate-limiting to a source sub-task.
This will allow customers to adjust/configure their SQS message ingestion as per their requirements.
Fault Tolerance Guarantees:
We will support Atleast Once Fault Tolerance Guarantees for the SQS source connector.
Atleast Once (Checkpoint Disabled)
If checkpointing is disabled, the connector defaults to at-least-once semantics. In this mode, messages are deleted from the queue immediately after they are read.
Atleast Once (Checkpoint Enabled)
If checkpointing is enabled, messages that have been read and processed are deleted upon receiving the notifyCheckpointComplete callback, which indicates the successful completion of the checkpoint cycle.
State Management
Based upon the configurations, receipt handles for the messages are stored in the state. Receipt handles are necessary for performing the delete operation on the messages later. Checkpoint completion frequency dictates the state size of the source operator. Frequent successful checkpoints will keep the state of the operator under the limit.
Sample code snapshots:
Instantiate the SQS connector:
SingleOutputStreamOperator sqsSourceStream = env.fromSource(SqsSource.<T>builder() .setSqsUrl("https://sqs.us-east-1.amazonaws.com/23145433/sqs-test") .setSqsClientProperties(getSqsClientProperties()) .setWatermark(WatermarkStrategy.noWatermarks());
private Properties getSqsClientProperties() { final Properties props = new Properties(); props.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"); props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "testRoleArn"); props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "test=sqs=session"); props.setProperty(SqsSourceBuilder.MAX_HTTP_CONNECTION_PROPERTY, 200); return props; }
SQS Split:
Note: Multiple splits get assigned the same split id which is a single SQS queue URL. SQS queue inherently enables multiple consumers to read messages concurrently.
public class SqsSplit implements SourceSplit { private final String queueUrl; public SqsSplit(final String queueUrl) { this.queueUrl = queueUrl; } @Override public String splitId() { return queueUrl; } }
SQS Split Reader:
@Override public RecordsWithSplitIds<Message> fetch() { RecordsBySplits.Builder<Message> recordsBuilder = new RecordsBySplits.Builder<>(); if (assignedSplits.isEmpty()) { return recordsBuilder.build(); } final SqsSplit split = assignedSplits.get(0); List<Message> messages = receiveMessages(split); return recordsBuilder.build(); }
SQS Split Enumerator:
@Override public void addSplitsBack(List<SqsSplit> splits, int subtaskId) { LOG.info("SqsSourceEnumerator addSplitsBack, subtaskId={}, splits={}", subtaskId, splits); createPendingAssignmentForReader(subtaskId); // If the failed subtask has already restarted, we need to assign pending splits to it if (context.registeredReaders().containsKey(subtaskId)) { assignPendingSplits(Collections.singleton(subtaskId)); } } @Override public void addReader(int subtaskId) { LOG.info("Adding reader {} to SqsSourceEnumerator.", subtaskId); assignPendingSplits(Collections.singleton(subtaskId)); }
SQS Source Reader:
public class SqsSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<Message, T, SqsSplit, SqsSplitState> { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { // Check if delete on checkpoint is enabled, delete messages if enabled LOG.debug("Deleting messages for checkpoint {}", checkpointId); }
SQS Record Emitter:
public class SqsRecordEmitter<T> implements RecordEmitter<Message, T, SqsSplitState> { private final SqsMessageDeserializationSchema<T> deserializationSchema; private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>(); public SqsRecordEmitter(SqsMessageDeserializationSchema<T> deserializationSchema) { this.deserializationSchema = deserializationSchema; } @Override public void emitRecord(Message element, SourceOutput<T> output, SqsSplitState splitState) throws Exception { deserializationSchema.deserialize(element, sourceOutputWrapper); } }
Observability
Metrics
We have reviewed the FLIP-33: Standardize Connector Metrics and identified several metrics that will be beneficial for users of this connector. We plan to utilize the existing interfaces/implementations for commonly known metrics and will add support for any new metrics as needed.
Metric Name | Description |
numRecordsIn (Counter) (Records) | The total number of input records since the source started. |
numRecordsInPerSecond (Meter) (Records/sec) | The input records per second |
numRecordsInErrors (Counter) | The total number of record that failed to consume, process or emit. |
numBytesIn (Counter) (Bytes) | The total number of input bytes since the source started. |
numBytesInPerSecond (Meter) (Bytes/sec) | The input bytes per second. |
sourceIdleTime (Gauge) (ms) | The time in milliseconds that the source has not processed any record. sourceIdleTime = CurrentTime - LastRecordProcessTime |
currentFetchEventTimeLag (Guage) (ms) | The time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. currentFetchEventTimeLag = FetchTime - EventTime Per minute average value will be reported. |
currentEmitEventTimeLag (Guage) (ms) | The time in milliseconds from the record event timestamp to the timestamp the record is emitted by (leaves)the source connector. currentEmitEventTimeLag = EmitTime - EventTime Per minute average value will be reported. |
numSqsDelectionsFailed (Counter) | Total number of records that the application failed to delete after consumption from SQS. |
Logs
We will be incorporating the necessary logging into the system to capture warnings and errors encountered by the AWS SQS source connector.
Compatibility, Deprecation, and Migration Plan
This connector is compatible with Amazon SQS. As this is a new addition to the Flink connectors set, there are no compatibility, deprecation, or migration requirements related to this FLIP.
Test Plan
We will be adding a wide variety of documentation and testing focused on:
- Unit tests
- Docker-based integration testing using Testcontainers
- End-to-end integration tests that will poll a functioning live Amazon SQS service (these tests will require valid credentials to the SQS service as a prerequisite).
Future Extension
This FLIP is a work in progress for adding vast support of features for SQS customers. We plan to extend this FLIP in future with the support of following featue set.
- Support for AWS SQS FIFO queue.
- Support for exactly once semantics.
- Support for Table APIs.
Support for usecases like S3 files containing data in various formats, is downloaded based on the SQS message
Future enhancements will cover above areas and more.