DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Motivation
There is demand within the community for an Amazon SQS Sink connector.
Public Interfaces
Initially, we will add a SQS sink via the Async Sink framework with support for batch API.
Proposed Changes
The new sink connector will be based on the Async Sink (FLIP-171)
we will add a package called "flink-connector-sqs" under "flink-connectors-aws" repo.
A sample using the connector is shown below:
stream.sinkTo(SqsSink.<T>builder()
.setSqsUrl("https://sqs.us-east-1.amazonaws.com/23145433/sqs-test")
.setFailOnError(false)
.setSqsClientProperties(getSqsClientProperties())
.setSerializationSchema(serializationSchema)
.setMaxBatchSize(batchSize)
.setMaxBufferedRequests(bufferedRequests)
.build();
private Properties getSqsClientProperties() {
final Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "testRoleArn");
props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "test=sqs=session");
props.setProperty(SqsSinkBuilder.MAX_HTTP_CONNECTION_PROPERTY, 200);
return props;
}
//SqsSinkWriter
@Override
protected void submitRequestEntries(
@NonNull final List<SendMessageBatchRequestEntry> requestEntries,
@NonNull final Consumer<List<SendMessageBatchRequestEntry>> requestResult)
{
final SendMessageBatchRequest batchRequest =
SendMessageBatchRequest.builder().entries(filteredEntries).queueUrl(sqsUrl).build();
try {
final CompletableFuture<SendMessageBatchResponse> future = sqsAsyncClient.sendMessageBatch(batchRequest);
future.whenComplete(
(response, err) -> {
if (err != null) {
log.warn("BatchResponse : " + response + " err : " + err + " batchRequest: " + batchRequest);
handleFullyFailedRequest(err, requestEntries, requestResult);
} else if (response.failed() != null && response.failed().size() > 0) {
log.warn("BatchResponse : " + response + " Retry failed : " + response.failed().size()
+ " batchRequest: " + batchRequest);
handlePartiallyFailedRequest(response, requestEntries, requestResult);
} else {
requestResult.accept(Collections.emptyList());
}
});
} catch (final UnsupportedOperationException e) {
log.error("Error while sendMessageBatch api call", e);
}
}
}
//SqsSinkElementConverter.java
@NonNull
@Override
public SendMessageBatchRequestEntry apply(@NonNull final T element, @NonNull final SinkWriter.Context context) {
checkOpened();
String messageBody = null;
final byte[] compressedBytes = serializationSchema.serialize(element);
if (compressedBytes != null) {
messageBody = Base64.getEncoder().encodeToString(compressedBytes);
}
return SendMessageBatchRequestEntry.builder()
.id(UUID.randomUUID().toString())
.messageBody(messageBody)
.build();
}
Versioning Strategy
The flink-connector-sqs version will be independent of Flink. We will follow the same versioning strategy as Flink in terms of feature freeze windows, release candidates and branching/tagging. We will publish a Flink support matrix in the connector README and also update Flink documentation to reference supported connectors. The initial release of flink-connector-sqs will target 1.0.0 and support 1.17.x and above.
FAQ's
- Are we planning to support Table API & SQL as well?
Yes.
2. Are we going to use the existing aws client providers to handle the authentication and async client creation similar to Kinesis/Firehose and DDB
Yes
3. Could you elaborate on the fault-tolerant capabilities that the flink-connector-sqs will provide?
atleast-once
4. Can you help with what the minimal configuration required for instantiating the sink ?
SQSSink.builder() .setSqsUrl(sqsUrl) .setSqsClientProperties(getSQSClientProperties()) .setSerializationSchema(serializationSchema) .build();
private Properties getSQSClientProperties() { final Properties props = new Properties(); props.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"); props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "any-assume-role-arn"); props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "test-session-name"); return props; }
5. Amazon SQS offers various data types . Could you outline the types of SQS data the sink plans to support?
Compatibility, Deprecation, and Migration Plan
The connectors are compatible with Amazon SQS. With respect to Flink, this is a new feature, no compatibility, deprecation, and migration plan is expected.
Test Plan
We will add the following tests:
- Unit test
- Docker based integration testing using Testcontainer
- End to end integration tests that hit the real Amazon SQS service. These tests will be enabled when credentials are defined.