DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
| Table of Contents |
|---|
Motivation
Many applications within enterprise Data Processing Pipelines use SQS as a source. SQS can capture the entire event or simply the URL of the event location that the pipeline needs to process. Depending on the use case, these pipelines process events either in batches or in a streaming fashion.
While migrating these pipelines to Flink, we identified the need for a Flink SQS Connector that can reliably ingest events from the SQS queue. This connector will currently support both Amazon SQS queue types - Standard queue type only, support for FIFO Queues will be added later as an extension. AWS SQS Queue Types Reference - Standard and FIFO queues.
Additionally, this connector can be extended to support reading data from S3 files after extracting the URL from the SQS payload.
Public Interfaces
The interfaces for the connector mentioned rely on the standard Source interfaces detailed in FLIP-27: Refactor Source Interface. Therefore, we will adhere to the public contract listed in this FLIP.
SQS Terminology
Visibility Timeout - Visibility timeout is a mechanism that prevents other consumers from processing the same message while it is being worked on. Amazon SQS does not automatically delete the message; instead, the consumer must explicitly delete the message using the DeleteMessage action after it has been successfully processed.
Message Retention Period - The retention period describes how much time a message retains within the queue before it gets deleted automatically.
The retention period can be in the interval of 1 minute and 14 days. If a message reaches the time of the retention period SQS deletes the message automatically.
We typically set this to a sufficiently long duration. This is beneficial for recovering from extended outages and backfilling data.
Proposed Changes
The new SQS source connector will be based on the Refactored Source Interface (FLIP-27). We see a similar effort for adding support for an SQS Sink FLIP-438: Amazon SQS Sink Connector. We plan to use the same package, "flink-connector-sqs," under the "flink-connectors-aws" repository and create a “source” folder that will contain the code for the SQS-based source.
This connector will support both Bounded (Batch) and Unbounded (Streaming) cases. It will be implemented to support the DataStream API (now) and the Table API/SQL (future extension). The key concepts for this connector are outlined below:
- A Split is an SQS queue URL.
- The SplitEnumerator registers the split with the SourceReader.
- The SourceReader reads messages from the assigned splits using the SQS client and returns the deserialized records via the ReceiveMessage AWS API.
Rate Limiting Strategy:
We will be adding a customisable RateLimiterStrategy in our connector, which applies rate-limiting to a source sub-task.
This will allow customers to adjust/configure their SQS message ingestion as per their requirements.
Fault Tolerance Guarantees:
We will support both Exactly Once and Atleast Once Fault Tolerance Guarantees for the SQS source connector.
...
Atleast Once (Checkpoint
...
Disabled)
If the checkpointing is enableddisabled, by default the connector adhere defaults to exactlyat-least-once semantics. The SQS read messages are tracked in a queue data structure via their message IDs. The code registers a checkpoint completion notification callback via notifyCheckpointComplete to delete the messages from the SQS queue. This mechanism ensures that each SQS message is processed exactly once, adhering to the exactly-once processing semantics.In this mode, messages are deleted from the queue immediately after they are read.
Atleast Once (Checkpoint
...
Enabled)
If the checkpointing is disabled, by default connector adhere to at-least once semantics.
Atleast Once (Checkpoint Enabled)
enabled, messages that have been read and processed are deleted upon receiving the notifyCheckpointComplete callback, which indicates the successful completion of the checkpoint cycle.
State Management
Based upon the configurations, receipt handles for the messages are stored in the state. Receipt handles are necessary for performing the delete operation on the messages later. Checkpoint completion frequency dictates the state size of the source operator. Frequent successful checkpoints will keep the state of the operator under the limitBased upon a boolean flag, connector can disable exactly once semantics, even if the checkpoint is enabled.
Sample code snapshots:
Instantiate the SQS connector:
| Code Block | ||
|---|---|---|
| ||
SingleOutputStreamOperator sqsSourceStream = env.fromSource(SqsSource.<T>builder () .setSqsUrl("https://sqs.us-east-1.amazonaws.com/23145433/sqs-test") .setFailOnError(false) . .setSqsClientProperties(getSqsClientProperties()) .setWatermark(WatermarkStrategy.noWatermarks()); |
...
| Code Block | ||
|---|---|---|
| ||
private Properties getSqsClientProperties() {
final Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "testRoleArn");
props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "test=sqs=session");
props.setProperty(SqsSourceBuilder.MAX_HTTP_CONNECTION_PROPERTY, 200);
return props;
} |
SQS Split:
...
We have reviewed the FLIP-33: Standardize Connector Metrics and identified several metrics that will be beneficial for users of this connector. We plan to utilize the existing interfaces/implementations for commonly known metrics and will add support for any new metrics as needed.
Metric Name | Description |
numRecordsIn (Counter) (Records) | The total number of input records since the source started. |
numRecordsInPerSecond (Meter) (Records/sec) | The input records per second |
numRecordsInErrors (Counter) | The total number of record that failed to consume, process or emit. |
numBytesIn (Counter) (Bytes) | The total number of input bytes since the source started. |
numBytesInPerSecond (Meter) (Bytes/sec) | The input bytes per second. |
sourceIdleTime (Gauge) (ms) | The time in milliseconds that the source has not processed any record. sourceIdleTime = CurrentTime - LastRecordProcessTime |
currentFetchEventTimeLag (Guage) (ms) | The time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. currentFetchEventTimeLag = FetchTime - EventTime Per minute average value will be reported. |
currentEmitEventTimeLag (Guage) (ms) | The time in milliseconds from the record event timestamp to the timestamp the record is emitted by (leaves)the source connector. currentEmitEventTimeLag = EmitTime - EventTime Per minute average value will be reported. |
numSqsDelectionsFailed (Counter) | Total number of records that the application failed to delete after consumption from SQS. |
Logs
We will be incorporating the necessary logging into the system to capture warnings and errors encountered by the AWS SQS source connector.
Compatibility, Deprecation, and Migration Plan
Future Extension
In the current proposed FLIP, we also plan to support Table APIs later. The first implementation will focus on the DataStream APIs, with support for Table APIs to be added in the next phase.
We also foresee use cases where a resource, such as S3 files containing data in various formats, is downloaded based on the SQS message. Future enhancements will cover these areasThis connector is compatible with Amazon SQS. As this is a new addition to the Flink connectors set, there are no compatibility, deprecation, or migration requirements related to this FLIP.
Test Plan
We will be adding a wide variety of documentation and testing focused on:
- Unit tests
- Docker-based integration testing using Testcontainers
- End-to-end integration tests that will poll a functioning live Amazon SQS service (these tests will require valid credentials to the SQS service as a prerequisite).
Rejected Alternatives
Future Extension
This FLIP is a work in progress for adding vast support of features for SQS customers. We plan to extend this FLIP in future with the support of following featue set.
- Support for AWS SQS FIFO queue.
- Support for exactly once semantics.
- Support for Table APIs.
Support for usecases like S3 files containing data in various formats, is downloaded based on the SQS message
Future enhancements will cover above areas and more.n/a