Discussion threadhttps://lists.apache.org/thread/p27tj3kzyln1fjqyx2xmg4tt7thoh0sh
Vote threadTBD
JIRA

TBD

Release4.4 
AuthorsSaurabh Singh & Abhi Sagar Khatri
Status

Motivation

Many applications within enterprise Data Processing Pipelines use SQS as a source. SQS can capture the entire event or simply the URL of the event location that the pipeline needs to process. Depending on the use case, these pipelines process events either in batches or in a streaming fashion.

While migrating these pipelines to Flink, we identified the need for a Flink SQS Connector that can reliably ingest events from the SQS queue. This connector will currently support Amazon SQS Standard queue type only, support for FIFO Queues will be added later as an extension.  AWS SQS Queue Types Reference -  Standard and FIFO queues

Additionally, this connector can be extended to support reading data from S3 files after extracting the URL from the SQS payload.

Public Interfaces

The interfaces for the connector mentioned rely on the standard Source interfaces detailed in FLIP-27: Refactor Source Interface. Therefore, we will adhere to the public contract listed in this FLIP.

SQS Terminology

Visibility TimeoutVisibility timeout is a mechanism that prevents other consumers from processing the same message while it is being worked on. Amazon SQS does not automatically delete the message; instead, the consumer must explicitly delete the message using the DeleteMessage action after it has been successfully processed. 

Message Retention Period - The retention period describes how much time a message retains within the queue before it gets deleted automatically.
The retention period can be in the interval of 1 minute and 14 days. If a message reaches the time of the retention period SQS deletes the message automatically.
We typically set this to a sufficiently long duration. This is beneficial for recovering from extended outages and backfilling data.

Proposed Changes

The new SQS source connector will be based on the Refactored Source Interface (FLIP-27). We see a similar effort for adding support for an SQS Sink FLIP-438: Amazon SQS Sink Connector. We plan to use the same package, "flink-connector-sqs," under the "flink-connectors-aws" repository and create a “source” folder that will contain the code for the SQS-based source.

This connector will support both Bounded (Batch) and Unbounded (Streaming) cases. It will be implemented to support the DataStream API (now) and the Table API/SQL (future extension). The key concepts for this connector are outlined below:

  • A Split is an SQS queue URL.
  • The SplitEnumerator registers the split with the SourceReader.
  • The SourceReader reads messages from the assigned splits using the SQS client and returns the deserialized records via the ReceiveMessage AWS API.

Rate Limiting Strategy:

We will be adding a customisable RateLimiterStrategy in our connector, which applies rate-limiting to a source sub-task. 

This will allow customers to adjust/configure their SQS message ingestion as per their requirements.

Fault Tolerance Guarantees:

We will support Atleast Once Fault Tolerance Guarantees for the SQS source connector. 

Atleast Once (Checkpoint Disabled)

If checkpointing is disabled, the connector defaults to at-least-once semantics. In this mode, messages are deleted from the queue immediately after they are read.

Atleast Once (Checkpoint Enabled)

If checkpointing is enabled, messages that have been read and processed are deleted upon receiving the notifyCheckpointComplete callback, which indicates the successful completion of the checkpoint cycle.

State Management

Based upon the configurations, receipt handles for the messages are stored in the state. Receipt handles are necessary for performing the delete operation on the messages later. Checkpoint completion frequency dictates the state size of the source operator. Frequent successful checkpoints will keep the state of the operator under the limit.

Sample code snapshots:

Instantiate the SQS connector:

Instantiate the SQS connector:
SingleOutputStreamOperator sqsSourceStream = env.fromSource(SqsSource.<T>builder()
        .setSqsUrl("https://sqs.us-east-1.amazonaws.com/23145433/sqs-test")
        .setSqsClientProperties(getSqsClientProperties())
    .setWatermark(WatermarkStrategy.noWatermarks());
Instantiate the SQS connector properties example
private Properties getSqsClientProperties() {
    final Properties props = new Properties();
    props.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
    props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "testRoleArn");
    props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "test=sqs=session");
    props.setProperty(SqsSourceBuilder.MAX_HTTP_CONNECTION_PROPERTY, 200);
    return props;
}

SQS Split:

Note: Multiple splits get assigned the same split id which is a single SQS queue URL. SQS queue inherently enables multiple consumers to read messages concurrently.


SQS Split
public class SqsSplit implements SourceSplit {
   private final String queueUrl;


   public SqsSplit(final String queueUrl) {
       this.queueUrl = queueUrl;
   }


   @Override
   public String splitId() {
       return queueUrl;
   }
}

SQS Split Reader:

SQS Split Reader
@Override
public RecordsWithSplitIds<Message> fetch() {
   RecordsBySplits.Builder<Message> recordsBuilder = new RecordsBySplits.Builder<>();
   if (assignedSplits.isEmpty()) {
       return recordsBuilder.build();
   }


   final SqsSplit split = assignedSplits.get(0);
   List<Message> messages = receiveMessages(split);


   return recordsBuilder.build();
}

SQS Split Enumerator:

SQS Split Enumerator
@Override
public void addSplitsBack(List<SqsSplit> splits, int subtaskId) {
   LOG.info("SqsSourceEnumerator addSplitsBack, subtaskId={}, splits={}", subtaskId, splits);
   createPendingAssignmentForReader(subtaskId);


   // If the failed subtask has already restarted, we need to assign pending splits to it
   if (context.registeredReaders().containsKey(subtaskId)) {
       assignPendingSplits(Collections.singleton(subtaskId));
   }
}


@Override
public void addReader(int subtaskId) {
   LOG.info("Adding reader {} to SqsSourceEnumerator.", subtaskId);
   assignPendingSplits(Collections.singleton(subtaskId));
}

SQS Source Reader:

SQS Source Reader
public class SqsSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<Message, T, SqsSplit, SqsSplitState> {
   @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
	// Check if delete on checkpoint is enabled, delete messages if enabled
        LOG.debug("Deleting messages for checkpoint {}", checkpointId);
   }


SQS Record Emitter:

SQS record emitter
public class SqsRecordEmitter<T> implements RecordEmitter<Message, T, SqsSplitState> {
   private final SqsMessageDeserializationSchema<T> deserializationSchema;
   private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>();


   public SqsRecordEmitter(SqsMessageDeserializationSchema<T> deserializationSchema) {
       this.deserializationSchema = deserializationSchema;
   }


   @Override
   public void emitRecord(Message element, SourceOutput<T> output, SqsSplitState splitState) throws Exception {
       deserializationSchema.deserialize(element, sourceOutputWrapper);
   }
}  


Observability

Metrics

We have reviewed the FLIP-33: Standardize Connector Metrics and identified several metrics that will be beneficial for users of this connector. We plan to utilize the existing interfaces/implementations for commonly known metrics and will add support for any new metrics as needed.

Metric Name

Description

numRecordsIn (Counter) (Records)

The total number of input records since the source started.

numRecordsInPerSecond (Meter) (Records/sec)

The input records per second

numRecordsInErrors (Counter)

The total number of record that failed to consume, process or emit.

numBytesIn (Counter) (Bytes)

The total number of input bytes since the source started.

numBytesInPerSecond (Meter) (Bytes/sec)

The input bytes per second.

sourceIdleTime (Gauge) (ms)

The time in milliseconds that the source has not processed any record.

sourceIdleTime = CurrentTime - LastRecordProcessTime

currentFetchEventTimeLag  (Guage) (ms)

The time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. 

currentFetchEventTimeLag = FetchTime - EventTime

Per minute average value will be reported.

currentEmitEventTimeLag (Guage) (ms)

The time in milliseconds from the record event timestamp to the timestamp the record is emitted by (leaves)the source connector.

currentEmitEventTimeLag = EmitTime - EventTime 

Per minute average value will be reported.

numSqsDelectionsFailed (Counter)

Total number of records that the application failed to delete after consumption from SQS.


Logs

We will be incorporating the necessary logging into the system to capture warnings and errors encountered by the AWS SQS source connector.

Compatibility, Deprecation, and Migration Plan

This connector is compatible with Amazon SQS. As this is a new addition to the Flink connectors set, there are no compatibility, deprecation, or migration requirements related to this FLIP.

Test Plan

We will be adding a wide variety of documentation and testing focused on:

  • Unit tests
  • Docker-based integration testing using Testcontainers
  • End-to-end integration tests that will poll a functioning live Amazon SQS service (these tests will require valid credentials to the SQS service as a prerequisite).

Future Extension


This FLIP is a work in progress for adding vast support of features for SQS customers. We plan to extend this FLIP in future with the support of following featue set.

  • Support for AWS SQS FIFO queue. 
  • Support for exactly once semantics.
  • Support for Table APIs.

Support for usecases like S3 files containing data in various formats, is downloaded based on the SQS message
Future enhancements will cover above areas and more.

  • No labels