You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Current »

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

There is demand within the community for an Amazon DynamoDB connector. This Jira [1] was created in March 2020 to capture interest. The Async Sink framework was introduced in Flink 1.15 under FLIP-171 [2] and a DynamoDB sink was implemented, but not merged before feature freeze [3].

[1] https://issues.apache.org/jira/browse/FLINK-16504
[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
[3] https://issues.apache.org/jira/browse/FLINK-24229

Public Interfaces

Initially we will add a DynamoDB sink via the Async Sink framework with support for batch/stream and table API. Source support can be added at a later date.

Proposed Changes

The new sink connector will be based on the Async Sink (FLIP-171), support both Bounded (Batch) and Unbounded (Streaming) and both DataStream and Table API/SQL.

The Flink community will create a new connector repo, similar to ElasticSearch [1]. There is a work In Progress PR [2] from Flink 1.15 which will be ported to the new repository and finalised. The repository name will be:

  • flink-connector-dynamodb

We already have a DynamoDB streams consumer [3] that currently lives within the Kinesis Data Streams connector [4] module. Since DynamoDB streams uses the Kinesis Data Streams API, we will leave this source connector where it is to retain backwards compatibility. Once the Kinesis connector is migrated to the new source interfaces [5] (FLIP-27) we can consider moving the DynamoDB Streams source to a new repository.

A sample using the connector is shown below:

Properties clientProperties = new Properties();
// Supports auth methods and HTTP client configs from flink-connector-aws-base
clientProperties.setProperty(AWS_ACCESS_KEY_ID, ...);
clientProperties.setProperty(AWS_SECRET_ACCESS_KEY, ...);
clientProperties.setProperty(AWS_REGION, ...);

stream.sinkTo(DynamoDbSink.<T>builder()
.setTableName(tableName)
.setPartitionKey(partitionKey)
.setSortKey(sortKey)
.setDynamoDbClientProperties(clientProperties)
.build());


[1] https://github.com/apache/flink-connector-elasticsearch
[2] https://github.com/apache/flink/pull/18518
[3] https://issues.apache.org/jira/browse/FLINK-4582
[4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java
[5] https://issues.apache.org/jira/browse/FLINK-24438

Versioning Strategy

The flink-connector-dynamodb version will be independent of Flink. We will follow the same versioning strategy as Flink in terms of feature freeze windows, release candidates and branching/tagging. We will publish a Flink support matrix in the connector README and also update Flink documentation to reference supported connectors. The initial release of flink-connector-dynamodb will target 1.0.0 and support Flink 1.15.x and 1.16.x (if available).

Compatibility, Deprecation, and Migration Plan

The connectors are compatible with Amazon DynamoDB. With respect to Flink, this is a new feature, no compatibility, deprecation, and migration plan is expected.

Test Plan

We will add the following tests:

  • Unit test
  • Integration tests that perform end to end tests against a DynamoDB localstack container
  • End to end tests that hit the real Amazon DynamoDB service. These tests will be enabled when credentials are defined.

Rejected Alternatives

We considered creating the connector in a third party repository. This approach was rejected since the native Apache Flink repositories provide the best experience for Flink users.

  • No labels