Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/wsz4jgdpnlyw1x781f9qpk7y416b45dj
Vote threadhttps://lists.apache.org/thread/m2cqp4mc1rt4hz0sxj2n50lxjs7m9wnl
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31854

Release


...

Code Block
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val prop = new Properties()
prop.setProperty("url","
jdbc:redshift://redshifthost:5439/database?user=<username>&password=<password>")
prop.setProperty("database", "test_db")
prop.setProperty("querytable-name", "test_sink_table")
prop.setProperty("sink.write.mode", "jdbc")

tEnv
.connect(new Redshift().properties(props))
.inAppendMode()
.registerTableSource("sink-table")

val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

Create and sink a table in COPY mode


-- register a Redshift table `t_user` in flink sql.
CREATE TABLE t_user (
    `user_id` BIGINT,
    `user_type` INTEGER,
    `language` STRING,
    `country` STRING,
    `gender` STRING,
    `score` DOUBLE,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'redshift',
    'hostname' = 'xxxx.redshift.awsamazon.com',
    'port' = '5439',
    'username' = 'awsuser',
    'password' = 'passwordxxxx',
    'database.name' = 'tutorial',
    'table.name' = 'users',
    'sink.batch.size' = '500',
    'sink.flush.interval' = '1000',
    'sink.max.retries' = '3',
    'write.mode' = 'copy',
    'copy.temp.s3.uri' = 's3://bucket-name/key/temp',
    'iam-role-arn' = 'arn:aws:iam::xxxxxxxx:role/xxxxxRedshiftS3Rolexxxxx'
);

Create and sink a table in pure JDBC mode

-- register a Redshift table `t_user` in flink sql.
CREATE TABLE t_user (
    `user_id` BIGINT,
    `user_type` INTEGER,
    `language` STRING,
    `country` STRING,
    `gender` STRING,
    `score` DOUBLE,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'redshift',
    'hostname' = 'xxxx.redshift.awsamazon.com',
    'port' = '5439',
    'username' = 'awsuser',
    'password' = 'passwordxxxx',
    'database-name' = 'tutorial',
    'table.name' = 'users',
    'sink.batch.size' = '500',
    'sink.flush.interval' = '1000',
    'sink.max.retries' = '3'
);

-- write data into the Redshift table from the table `T`
INSERT INTO t_user
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`) FROM T;


NOTE:

Additional Features that the Flink connector for AWS Redshift can provide on top of using JDBC:

1. Integration with AWS Redshift Workload Management (WLM): AWS Redshift allows you to configure WLM to manage query prioritization and resource allocation. The Flink connector for Redshift will be agnostic to the configured WLM and utilise it for scaling in and out for the source/sink. This means that the connector can leverage the WLM capabilities of Redshift to optimize the execution of queries and allocate resources efficiently based on your defined workload priorities.

2. Abstraction of AWS Redshift Quotas and Limits: AWS Redshift imposes certain quotas and limits on various aspects such as the number of clusters, concurrent connections, queries per second, etc. The Flink connector for Redshift will provide an abstraction layer for users, allowing them to work with Redshift without having to worry about these specific limits. The connector will handle the management of connections and queries within the defined quotas and limits, abstracting away the complexity and ensuring compliance with Redshift's restrictions.

...