Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated to Sep 25th

...

Table of Contents

Motivation

Many applications within enterprise Data Processing Pipelines use SQS as a source. SQS can capture the entire event or simply the URL of the event location that the pipeline needs to process. Depending on the use case, these pipelines process events either in batches or in a streaming fashion.

While migrating these pipelines to Flink, we identified the need for a Flink SQS Connector that can reliably ingest events from the SQS queue. This connector will currently support both Amazon SQS queue types - Standard queue type only, support for FIFO Queues will be added later as an extension.  AWS SQS Queue Types Reference -  Standard and FIFO queues

Additionally, this connector can be extended to support reading data from S3 files after extracting the URL from the SQS payload.

Public Interfaces

The interfaces for the connector mentioned rely on the standard Source interfaces detailed in FLIP-27: Refactor Source Interface. Therefore, we will adhere to the public contract listed in this FLIP.

SQS Terminology

Visibility TimeoutVisibility timeout is a mechanism that prevents other consumers from processing the same message while it is being worked on. Amazon SQS does not automatically delete the message; instead, the consumer must explicitly delete the message using the DeleteMessage action after it has been successfully processed. 

Message Retention Period - The retention period describes how much time a message retains within the queue before it gets deleted automatically.
The retention period can be in the interval of 1 minute and 14 days. If a message reaches the time of the retention period SQS deletes the message automatically.
We typically set this to a sufficiently long duration. This is beneficial for recovering from extended outages and backfilling data.

Proposed Changes

The new SQS source connector will be based on the Refactored Source Interface (FLIP-27). We see a similar effort for adding support for an SQS Sink FLIP-438: Amazon SQS Sink Connector. We plan to use the same package, "flink-connector-sqs," under the "flink-connectors-aws" repository and create a “source” folder that will contain the code for the SQS-based source.

This connector will support both Bounded (Batch) and Unbounded (Streaming) cases. It will be implemented to support the DataStream API (now) and the Table API/SQL (future extension). The key concepts for this connector are outlined below:

  • A Split is an SQS queue URL.
  • The SplitEnumerator registers the split with the SourceReader.
  • The SourceReader reads messages from the assigned splits using the SQS client and returns the deserialized records via the ReceiveMessage AWS API.

Rate Limiting Strategy:

We will be adding a customisable RateLimiterStrategy in our connector, which applies rate-limiting to a source sub-task. 

This will allow customers to adjust/configure their SQS message ingestion as per their requirements.

Fault Tolerance Guarantees:

We will support both Exactly Once and Atleast Once Fault Tolerance Guarantees for the SQS source connector. 

...

Atleast Once (Checkpoint

...

Disabled)

If the checkpointing is enableddisabled, by default the connector adhere defaults to exactlyat-least-once semantics. The SQS read messages are tracked in a queue data structure via their message IDs. The code registers a checkpoint completion notification callback via notifyCheckpointComplete to delete the messages from the SQS queue. This mechanism ensures that each SQS message is processed exactly once, adhering to the exactly-once processing semantics.In this mode, messages are deleted from the queue immediately after they are read.

Atleast Once (Checkpoint

...

Enabled)

If the checkpointing is disabled, by default connector adhere to at-least once semantics.

Atleast Once (Checkpoint Enabled)

enabled, messages that have been read and processed are deleted upon receiving the notifyCheckpointComplete callback, which indicates the successful completion of the checkpoint cycle.

State Management

Based upon the configurations, receipt handles for the messages are stored in the state. Receipt handles are necessary for performing the delete operation on the messages later. Checkpoint completion frequency dictates the state size of the source operator. Frequent successful checkpoints will keep the state of the operator under the limitBased upon a boolean flag, connector can disable exactly once semantics, even if the checkpoint is enabled.

Sample code snapshots:

Instantiate the SQS connector:

Code Block
titleInstantiate the SQS connector:
SingleOutputStreamOperator sqsSourceStream = env.fromSource(SqsSource.<T>builder
	()
        .setSqsUrl("https://sqs.us-east-1.amazonaws.com/23145433/sqs-test")
	.setFailOnError(false)
	.        .setSqsClientProperties(getSqsClientProperties())
    	.setWatermark(WatermarkStrategy.noWatermarks());

...

Code Block
titleInstantiate the SQS connector properties example
private Properties getSqsClientProperties() {
         final Properties props = new Properties();
         props.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
        props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
         props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "testRoleArn");
          props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "test=sqs=session");
         props.setProperty(SqsSourceBuilder.MAX_HTTP_CONNECTION_PROPERTY, 200);
        return props;
}


SQS Split:

...

We have reviewed the FLIP-33: Standardize Connector Metrics and identified several metrics that will be beneficial for users of this connector. We plan to utilize the existing interfaces/implementations for commonly known metrics and will add support for any new metrics as needed.

Metric Name

Description

numRecordsIn (Counter) (Records)

The total number of input records since the source started.

numRecordsInPerSecond (Meter) (Records/sec)

The input records per second

numRecordsInErrors (Counter)

The total number of record that failed to consume, process or emit.

numBytesIn (Counter) (Bytes)

The total number of input bytes since the source started.

numBytesInPerSecond (Meter) (Bytes/sec)

The input bytes per second.

sourceIdleTime (Gauge) (ms)

The time in milliseconds that the source has not processed any record.

sourceIdleTime = CurrentTime - LastRecordProcessTime

currentFetchEventTimeLag  (Guage) (ms)

The time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. 

currentFetchEventTimeLag = FetchTime - EventTime

Per minute average value will be reported.

currentEmitEventTimeLag (Guage) (ms)

The time in milliseconds from the record event timestamp to the timestamp the record is emitted by (leaves)the source connector.

currentEmitEventTimeLag = EmitTime - EventTime 

Per minute average value will be reported.

numSqsDelectionsFailed (Counter)

Total number of records that the application failed to delete after consumption from SQS.


Logs

We will be incorporating the necessary logging into the system to capture warnings and errors encountered by the AWS SQS source connector.

Compatibility, Deprecation, and Migration Plan

Future Extension

In the current proposed FLIP, we also plan to support Table APIs later. The first implementation will focus on the DataStream APIs, with support for Table APIs to be added in the next phase.

We also foresee use cases where a resource, such as S3 files containing data in various formats, is downloaded based on the SQS message. Future enhancements will cover these areasThis connector is compatible with Amazon SQS. As this is a new addition to the Flink connectors set, there are no compatibility, deprecation, or migration requirements related to this FLIP.

Test Plan

We will be adding a wide variety of documentation and testing focused on:

  • Unit tests
  • Docker-based integration testing using Testcontainers
  • End-to-end integration tests that will poll a functioning live Amazon SQS service (these tests will require valid credentials to the SQS service as a prerequisite).

Rejected Alternatives

Future Extension


This FLIP is a work in progress for adding vast support of features for SQS customers. We plan to extend this FLIP in future with the support of following featue set.

  • Support for AWS SQS FIFO queue. 
  • Support for exactly once semantics.
  • Support for Table APIs.

Support for usecases like S3 files containing data in various formats, is downloaded based on the SQS message
Future enhancements will cover above areas and more.n/a