Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Background and Motivation
Brief Introduction to Redshift
Redshift is a cloud-based data warehousing service provided by Amazon Web Services (AWS). It is designed to handle large-scale data analytics workloads, processing petabytes of structured and semi-structured data with high performance and scalability. Redshift allows businesses to store and analyze vast amounts of data in a cost-effective way, using a columnar storage format and massively parallel processing. It supports SQL queries, making it easy for users to extract insights from their data (detail reference). One of the key benefits of Redshift is its ability to scale elastically, automatically adding or removing compute nodes as needed to handle changes in workload. It also integrates with a wide range of other AWS services, such as S3, Kinesis, and Lambda, enabling users to build sophisticated data processing pipelines. Overall, Amazon Redshift is a powerful tool for businesses looking to store, process, and analyze large volumes of data in the cloud, with high performance, scalability, and ease of use.
Why is Flink connector to Redshift useful?
Apache Flink is a popular stream processing framework that enables businesses to analyze and act on data as it arrives in real-time. Amazon Redshift, on the other hand, is a cloud-based data warehousing service that provides fast and cost-effective analysis of large-scale data.
The Flink Redshift Connector will enable Flink users to seamlessly integrate Flink with Redshift, allowing them to perform real-time data analysis and write the results directly to Redshift. With the Flink Redshift Connector, Flink users can take advantage of the scalability, reliability, and cost-effectiveness of Redshift, while leveraging the real-time processing power of Flink.
The benefits of using the Flink Redshift Connector include:
- Real-time data analysis: With Flink, businesses can analyze data as it arrives, enabling them to respond quickly to changes and make data-driven decisions in real time.
- Scalability: The Flink Redshift Connector allows businesses to scale their data processing and analysis up or down as needed, depending on changes in workload.
- Easy integration: The Flink Redshift Connector will provide easy integration with existing Flink and Redshift workflows, enabling users to get up and running quickly.
- Community support: As an open-source solution, the Flink Redshift Connector benefits from a vibrant community of developers and users who contribute to its development and provide support.
Overall, the Flink Redshift Connector is a powerful tool for businesses looking to perform real-time data analysis and integrate their Flink and Redshift workflows. By using the connector, businesses can take advantage of the best of both worlds, leveraging the real-time processing power of Flink and the scalability and cost-effectiveness of Redshift.
We are putting forward a proposal to develop a submodule called "flink-connector-redshift" within the existing framework of "flink-connector-aws" By integrating it into "flink-connector-aws," we can leverage the authentication and essential utilities already present in the AWS-specific modules. This will streamline the development process and enable direct utilization of these resources.
- Integrate with Flink Sink API (FLIP-171)
- Integrate with Table API (FLIP-95) for Sink. Build upon Flink New DynamicTableSink and DynamicTableSinkFactory Interfaces
- Integrate with Flink's new Source API (FLIP-27)
- Integrate with Table API (FLIP-95) for Source. Build upon Flink New DynamicTableSource and DynamicTableSinkFactory Interfaces.
I) Transactional Guarantees
For general information on Redshift's transactional guarantees, please refer to the documentation on
managing concurrent write operations and serializable isolation. According to the Redshift documentation, although any of the four transaction isolation levels can be used, Amazon Redshift processes all isolation levels as serializable. Amazon Redshift also supports a default automatic commit behaviour in which each separately executed SQL command commits individually. As a result, individual commands such as
UNLOAD are atomic and transactional, while explicit
END should only be necessary to enforce the atomicity of multiple commands/queries. When reading from or writing to Redshift, this library reads and writes data in S3. Both Flink and Redshift produce partitioned output, which is stored in multiple files in S3. According to the Amazon S3 Data Consistency Model documentation, S3 bucket listing operations are eventually-consistent. Therefore, flink-connector-redshift needs to take special measures to avoid missing or incomplete data due to this source of eventual-consistency.
II) Guarantees Reading Data From Redshift
- Appending to an existing table: In the
COPYcommand, this library will use copy-manifests to guard against certain eventually-consistent S3 operations. As a result, appending to existing tables has the same atomic and transactional properties as regular Redshift
- Inserting rows into an existing table: When inserting rows into Redshift, this library uses the
COPYcommand and specifies manifests to guard against certain eventually-consistent S3 operations. As a result,
flink-connector-redshiftappends to existing tables have the same atomic and transactional properties as regular Redshift
- Creating a new table: Creating a new table is a two-step process, consisting of a
CREATE TABLEcommand followed by a
COPYcommand to append the initial set of rows. Both of these operations are performed in a single transaction.
- Overwriting an existing table: By default, this library uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table, and appending rows to it. Upon configuring required options in flink-connector-redshift, the connector will commit the
DELETE TABLEcommand before appending rows to the new table. This sacrifices the atomicity of the overwrite operation but reduces the amount of staging space that Redshift needs during the overwrite.
- Querying Redshift tables: Queries use Redshift's
UNLOADcommand to execute a query and save its results to S3. This library uses unload-manifests to guard against certain eventually-consistent S3 operations. As a result, queries from the Redshift data source for Flink should have the same consistency properties as regular Redshift queries.
III) Column Encoding
To customize compression encoding for specific columns when creating a table, the connector provides a way to set the
encoding column metadata field. Please refer to the available encodings in the Amazon Redshift documentation.
Redshift allows descriptions to be attached to columns, which can be viewed in most query tools. You can specify a description for individual columns by setting the
description column metadata field. This can be done using the
IV) Datatype Mapping
|Flink Type||Redshift Type|
V) Source Design
Flink connector redshift will offer 2 modes to read from the source:-
- Directly reading from Redshift using JDBC driver.
UNLOADcommand to execute a query and save its result to s3 and read from s3.
In flink-connector-redshift, the concept of a connector source allows users to configure different modes for data retrieval. This functionality is achieved by utilizing the `scan.read.mode
` option available in the source configuration. This flexible configuration empowers users to adapt the connector's behaviour based on their specific requirements.
The flink-connector-redshift module will leverage and use already existing connectors. When a user specifies the read mode as "jdbc" in the source configuration, the flink-connector-redshift module internally uses redshift jdbc driver. This integration enables seamless interaction with Redshift using the JDBC protocol. Leveraging this connector, the flink-connector-redshift can efficiently retrieve data from the Redshift database, providing a reliable and performant data source for further processing within the Flink framework.
Alternatively, when the selected read mode is
unload , the flink-connector-redshift module takes a different approach. In this case, flink-connector-redshift will execute preprocessing and collect the required information from redshift like schema, the path to unload the data and other meta information then it dynamically employs the functionalities of the flink-filesystem module, specifically utilizing the S3 connector. By doing so, the flink-connector-redshift is able to read data from the unloaded path in an S3 bucket. This approach leverages the scalability and durability of the S3 storage system, making it an efficient and reliable intermediary for data transfer between Redshift and the Flink framework.
In summary, the flink-connector-redshift module acts as an intelligent mediator that dynamically selects and integrates with the appropriate connectors based on the specified read mode. This flexibility allows users to choose between a direct JDBC-based interaction with Redshift or an optimized data unload mechanism using the S3 connector. By leveraging the capabilities of these connectors, the flink-connector-redshift module effectively achieves the goal of seamless and efficient data transfer between Redshift and the Flink framework, providing users with a comprehensive and adaptable solution for their data processing needs.
Source Connector Options
Option Required Default Type Description hostname required none String Redshift connection hostname port required 5439 Integer Redshift connection port username required none String Redshift user username password required none String Redshift user password database.name required dev String Redshift database to connect table.name required none String Reshift table name source.batch.size optional 1000 Integer The max flush size, over this will flush data. source.flush.interval optional 1s Duration Over this flush interval mills, asynchronous threads will flush data. source.max.retries optional 3 Integer The max retry times when writing records to the database failed. scan.read.mode required false Boolean Using Redshift UNLOAD command. unload.temp.s3.path conditional required none String If the unload-mode=true then then Redshift UNLOAD command must need a S3 URI. iam-role-arn conditional required none String If the unload.mode=true then then Redshift UNLOAD command must need a IAM role. And this role must have the privilege and attache to the Redshift cluser
VI) Sink Design
Flink connector Redshift will offer 2 modes to write to Redshift:-
- Directly writing to redshift using JDBC driver.
- Flink stream write to a specified s3 path in the format and schema accepted by Redshift then use
COPYcommand to write data into the redshift table.
Flink connector redshift will provide users with the ability to configure different modes for the connector sink by utilizing the
sink.write.modeoption in the source configuration. This configuration flexibility allows users to tailor the behaviour of the connector sink to their specific needs. Internally, the flink-connector-redshift module will intelligently select and integrate with the appropriate connectors based on the chosen write mode. If the user specifies the write mode as "jdbc" in the source configuration, the flink-connector-redshift will use custom redshift JDBC driver. This integration will enable seamless interaction with Redshift using the JDBC protocol, ensuring efficient data transfer from Flink to the Redshift database. Similarly, when the write mode is selected as a file-based operation, the flink-connector-redshift module will utilize the flink-connector-filesystem. This connector will facilitate writing data to an S3 bucket, adhering to the specific format and schema requirements outlined by Redshift's COPY command. By utilizing this connector, the flink-connector-redshift module will ensure that data is written in a manner compatible with Redshift's expectations. To provide a streamlined sink solution for Flink and Redshift integration, the flink-connector-redshift module will orchestrate and use flink-filesystem, behind the scenes. It preprocesses the data and seamlessly wraps these connectors to offer a unified sink interface for transferring data from Flink to Redshift.
Sink Connector Options
|hostname||required||none||String||Redshift connection hostname|
|port||required||5439||Integer||Redshift connection port|
|username||required||none||String||Redshift user username|
|password||required||none||String||Redshift user password|
|database-name||required||dev||String||Redshift database to connect|
|table-name||required||none||String||Reshift table name|
|sink.batch-size||optional||1000||Integer||The max flush size, over this will flush data.|
|sink.flush-interval||optional||1s||Duration||Over this flush interval mills, asynchronous threads will flush data.|
|sink.max-retries||optional||3||Integer||The max retry times when writing records to the database failed.|
|copy-mode||required||false||Boolean||Using Redshift COPY command to insert/upsert or not.|
|copy-temp-s3-uri||conditional required||none||String||If the copy-mode=true then then Redshift COPY command must need a S3 URI.|
|iam-role-arn||conditional required||none||String||If the copy-mode=true then then Redshift COPY command must need a IAM role. And this role must have the privilege and attache to the Redshift cluser.|
Update/Delete Data Considerations: The data is updated and deleted by the primary key.
iii) Sample Source Code:
- Sample DataStream API Code:
- Sample DataStream API Code:
- Sample Streaming Table Write:
Create and sink a table in COPY mode
-- register a Redshift table `t_user` in flink sql. CREATE TABLE t_user ( `user_id` BIGINT, `user_type` INTEGER, `language` STRING, `country` STRING, `gender` STRING, `score` DOUBLE, PRIMARY KEY (`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'redshift', 'hostname' = 'xxxx.redshift.awsamazon.com', 'port' = '5439', 'username' = 'awsuser', 'password' = 'passwordxxxx', 'database.name' = 'tutorial', 'table.name' = 'users', 'sink.batch.size' = '500', 'sink.flush.interval' = '1000', 'sink.max.retries' = '3', 'write.mode' = 'copy', 'copy.temp.s3.uri' = 's3://bucket-name/key/temp', 'iam-role-arn' = 'arn:aws:iam::xxxxxxxx:role/xxxxxRedshiftS3Rolexxxxx' );
Create and sink a table in pure JDBC mode
-- register a Redshift table `t_user` in flink sql. CREATE TABLE t_user ( `user_id` BIGINT, `user_type` INTEGER, `language` STRING, `country` STRING, `gender` STRING, `score` DOUBLE, PRIMARY KEY (`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'redshift', 'hostname' = 'xxxx.redshift.awsamazon.com', 'port' = '5439', 'username' = 'awsuser', 'password' = 'passwordxxxx', 'database-name' = 'tutorial', 'table.name' = 'users', 'sink.batch.size' = '500', 'sink.flush.interval' = '1000', 'sink.max.retries' = '3' ); -- write data into the Redshift table from the table `T` INSERT INTO t_user SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`) FROM T;
Additional Features that the Flink connector for AWS Redshift can provide on top of using JDBC:
1. Integration with AWS Redshift Workload Management (WLM): AWS Redshift allows you to configure WLM to manage query prioritization and resource allocation. The Flink connector for Redshift will be agnostic to the configured WLM and utilise it for scaling in and out for the source/sink. This means that the connector can leverage the WLM capabilities of Redshift to optimize the execution of queries and allocate resources efficiently based on your defined workload priorities.
2. Abstraction of AWS Redshift Quotas and Limits: AWS Redshift imposes certain quotas and limits on various aspects such as the number of clusters, concurrent connections, queries per second, etc. The Flink connector for Redshift will provide an abstraction layer for users, allowing them to work with Redshift without having to worry about these specific limits. The connector will handle the management of connections and queries within the defined quotas and limits, abstracting away the complexity and ensuring compliance with Redshift's restrictions.
VII) Authentication to S3 and Redshift
The use of this library involves several connections which must be authenticated/secured, all of which are illustrated in the following diagram:
Reference from the above diagram
→ Redshift Configured Connections:- Configuring a connection for JDBC driver version 2.1 for Amazon Redshi...
→ SSL enable in Redshift:- Configuring authentication and SSL - Amazon Redshift
This library reads and writes data to S3 when transferring data to/from Redshift. As a result, it requires AWS credentials with read and write access to a S3 bucket (specified using the temp.s3.path configuration parameter).
The following describes how each connection can be authenticated:
- Flink to Redshift: The Flink connects to Redshift via JDBC using a username and password.
Redshift does not support the use of IAM roles to authenticate this connection.
This connection can be secured using SSL; for more details, see the Encryption section below.
- Flink to S3: S3 acts as a middleman to store bulk data when reading from or writing to Redshift.
Flink connects to S3 using both the Hadoop FileSystem interfaces and directly using the Amazon Java SDK's S3 client.
This connection can be authenticated using either AWS keys or IAM roles.
- Default Authentication: Flink-connector-aws provides different modes of Authentication redshift connectors will use CredentialProvider.
AWS credentials will automatically be retrieved through the DefaultAWSCredentialsProviderChain.
Users can use IAM instance roles to authenticate to S3 (e.g. on EMR, or EC2).
- Flink to Redshift: The Flink connects to Redshift via JDBC using a username and password.
- Parallelism in flink-connector-redshift will be limited by Quotas configured for the job in Redshift Connections Quotas and limits in Amazon Redshift - Amazon Redshift.
- Speed and latency in source and sink will be regulated by the latency and data transfer rate of
COPYfeature from redshift
- Flink connector redshift sink will only support append-only tables. (no changelog mode support)
Compatibility, Deprecation, and Migration Plan
The flink-connector-redshift will be compatible with respect to the Flink source and sink interface.
This is a new connector(feature) no compatibility, deprecation, and migration plan are expected.
- There will be Unit Tests cases to test methods in the redshift connector.
- E2E test suit to be added in flink-connector-aws-e2e-tests.
- Initially , Idea was to use flink-connector-jdbc to support jdbc mode in redshift connector but rejected because it adds dependency of flink-connector-aws on flink-connector-jdbc creating dependency on release cycles of different externalised connectors.