1 Motivation

The dynamic table is a Flink SQL's fundamental abstraction, backed by a continuously evolving changelog stream (a stream of INSERT, UPDATE, and DELETE operations). The DataStream API already exposes this stream/table duality through flexible methods like toChangelogStream() and fromChangelogStream().

This FLIP proposes to bring a similar functionality and flexibility of these methods adapted into Flink SQL by using built-in Process Table Functions (PTFs): TO_CHANGELOG and FROM_CHANGELOG. This allows SQL users to have explicit, fine-grained control over the interpretation and generation of changelog streams, supporting advanced use cases like:

  1. FROM_CHANGELOG: enables custom SQL connectors by handling custom Change Data Capture (CDC) formats.

  2. TO_CHANGELOG: enables to create a CDC append stream from a Flink Table. This is the first operator that makes it possible to change an retract/upsert stream back to append.

With the introduction of PTFs, we now can define built-in PTFs as common utilities to complement our SQL offering as we take advantage of that here.

2 Background

This section contains some base knowledge used in the FLIP.

2.1 Change Data Capture Basics

This section partially copied from our PTFs documentation.

Under the hood, tables in Flink's SQL engine are backed by changelogs. These changelogs encode CDC (Change Data Capture) information containing INSERT (+I), UPDATE_BEFORE (-U), UPDATE_AFTER (+U), or DELETE (-D) messages.

The existence of these flags in the changelog constitutes the Changelog Mode of a consumer or producer:

2.1.1 Append Mode {+I}

  • All messages are insert-only.
  • Every insertion message is an immutable fact.
  • Messages can be distributed in an arbitrary fashion across partitions and processors because they are unrelated.

2.1.2 Upsert Mode {+I, +U, -D}

  • Messages can contain updates leading to an updating table.
  • Updates are related using a key (i.e. the upsert key).
  • Every message is either an upsert or delete message for a result under the upsert key.
  • Messages for the same upsert key should land at the same partition and processor.
  • Deletions can contain only values for upsert key columns (i.e. partial deletes) or values for all columns (i.e. full deletes).
  • The mode is also known as partial image in the literature because -U messages are missing.

2.1.3 Retract Mode {+I, -U, +U, -D}

  • Messages can contain updates leading to an updating table.
  • Every insertion or update event is a fact that can be "undone" (i.e. retracted).
  • Updates are related by all columns. In simplified words: The entire row is kind of the key but duplicates are supported. For example: +I['Bob', 42] is related to -D['Bob', 42] and +U['Alice', 13] is related to -U['Alice', 13].
  • Thus, every message is either an insertion (+) or its retraction (-).
  • The mode is known as full image in the literature.

2.1.4 Partial vs Full Deletes

Deletions in changelog streams can be either:

  • Partial deletes: Contain only the key columns (upsert key). The downstream system must have stored the full row to process the deletion.
  • Full deletes: Contain all columns of the row being deleted. The downstream system can process the deletion without prior state.

Partial deletes are common in upsert mode where only the key is needed to identify the row to delete. Full deletes are typical in retract mode where the complete row image is retracted.

3 Public Interfaces

We'll introduce FROM_CHANGELOG and TO_CHANGELOG.

<FROM_CHANGELOG>
SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)

Complete signature:
SELECT * FROM FROM_CHANGELOG(
        input => TABLE cdc_stream PARTITION BY partition_key ORDER BY timestamp,
        uid => 'my-cdc-ptf',
        before => DESCRIPTOR(before),
        after => DESCRIPTOR(after),
        op => DESCRIPTOR(op),
        state_ttl => INTERVAL '10' MINUTE,
        consume_full_deletes => false,
        op_mapping => MAP[
            'c', 'INSERT',
            'd', 'DELETE',
            'u', 'UPDATE_BEFORE, UPDATE_AFTER'
        ],
        invalid_op_handling => 'FAIL'
    );

<TO_CHANGELOG>
SELECT * FROM TO_CHANGELOG(input => TABLE t)

Complete signature:
SELECT * FROM TO_CHANGELOG(
  input => TABLE t PARTITION BY k,
  uid => 'my-cdc-ptf',
  before => DESCRIPTOR(before),
  after => DESCRIPTOR(after),
  op => DESCRIPTOR(op),
  state_ttl => INTERVAL '10' MINUTE,
  produces_full_deletes => false,
  op_mapping => MAP[
            'INSERT', 'c',
            'DELETE', 'd',
            'UPDATE_BEFORE, UPDATE_AFTER', 'u'
            ]
  )

  -- Alternative for op_mapping param
  op_mapping => MAP[
            ARRAY['INSERT'], ARRAY['c'],
            ARRAY['DELETE'], ARRAY['d'],
            ARRAY['UPDATE_BEFORE', 'UPDATE_AFTER'], ARRAY['u']
        ]

4 Proposed Changes

We will introduce TO_CHANGELOG and FROM_CHANGELOG as PTFs, supporting the following signatures and semantics.


4.1 FROM_CHANGELOG PTF

This PTF converts an append-only stream of changelog records (the input table) into a dynamic table. With it you can, for example, create your own custom cdc connector.

4.1.1 Default signature

SELECT * FROM FROM_CHANGELOG(input => TABLE flat_stream)

Input
-- +I[op: 'INSERT', id: 5, name: 'name']
-- +I[op: 'DELETE', id: 5, name: 'name']

Flink Changelog
-- +I[ id: 5, name: 'name' ]
-- -D[ id: 5, name: 'name' ]

Table
-- Empty table

4.1.2 Parameters

4.1.2.1 Operation mapping

  • op: (Optional) Defines the field to look for the operation code. Default is 'op'.

  • before: (Optional) Defines the field to look for the "before" image. Default: no before field, assumes flattened records. If neither before nor after is provided, the PTF assumes the entire row (except the op field) is the payload.

  • after: (Optional) Defines the field to look for the "after" image. Default: no after field, assumes flattened records. Tip: If you have a single field for both images (e.g., payload), you can configure both as before => DESCRIPTOR(payload), after => DESCRIPTOR(payload).

  • op_mapping: (Optional) Defines the mapping from input operation codes to Flink RowKind.

    • Default: INSERT → 'INSERT', DELETE → 'DELETE', UPDATE_AFTER → 'UPDATE_AFTER', UPDATE_BEFORE → 'UPDATE_BEFORE'.
    • Supports mapping multiple input codes to the same RowKind, like it's done for Debezium (e.g., 'CREATE, READ' -> 'INSERT').
    • Supports mapping one input code to multiple RowKinds (e.g., 'UPDATE' -> 'UPDATE_BEFORE, UPDATE_AFTER').
  •   invalid_op_handling: (Optional) Defines behavior when encountering operation codes not defined in op_mapping. Options: 'FAIL', 'LOG', 'SKIP'.

    • Default: 'FAIL' - Throws an exception and fails the job.
    • 'LOG' - Logs a warning and skips the record.
    • 'SKIP' - Silently skips the record without logging.
4.1.2.1.1 Example: Flat records with custom header
SELECT * FROM FROM_CHANGELOG(
    input => TABLE flat_stream,
    op => DESCRIPTOR(type)
)

Input
-- { type: 'INSERT', id: 5, name: 'name' }
-- { type: 'DELETE', id: 5, name: 'name' }

Flink Changelog
-- +I[ id: 5, name: 'name' ]
-- -D[ id: 5, name: 'name' ]

Table
-- Empty table
4.1.2.1.2 Example: Mapping 'u' to Retract Stream (UPDATE_BEFORE + UPDATE_AFTER nesting)
-- Useful when the input 'u' contains both before and after images and you want
-- to produce a full retract stream (RowKind -U followed by +U).
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream,
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    op_mapping => MAP['u', 'UPDATE_BEFORE, UPDATE_AFTER']
)

Input
-- +I[op: 'u', before: {id: 5, name: 'old'}, after: {id: 5, name: 'new'}]

Flink Changelog
-- -U[ id: 5, name: 'old' ]
-- +U[ id: 5, name: 'new' ]

Table
-- [ id: 5, name: 'new' ]

4.1.2.2 Partitioning, out of orderness, state

  • uid: (Optional) Unique identifier for the PTF instance. Recommended for query evolution - allows Flink to restore state correctly when queries are modified. See Flink Docs on Query Evolution.

  • PARTITION BY: (Optional) Distributes processing. If not defined, for CDC streams that contain updates, parallelism is 1 so that:

    • Related events for a key are routed to the same instance (e.g., when emitting UPDATE_BEFORE from UPDATE_AFTER only, or when handling partial deletes).
    • It guarantees that ordered changelog events for a partition key remain ordered when parallelism > 1.
    • Should optimally match the distribution key of the input table (e.g., Kafka partition key).
  • ORDER BY: (Optional) If provided, a sorting operator will be added to the pipeline which will reorder events between two watermarks according to the provided column.

    • If your changelog stream is not ordered in the source, you need to use this functionality or else results might be incorrect.
    • Records arriving with timestamps lower than the current watermark (late records) will be dropped by the sorting operator.
    • Note: ORDER BY and state_ttl serve different purposes and can be used together. ORDER BY fixes event ordering; state_ttl manages key state for generating changelog operations.
  • state_ttl: (Optional) Defines retention time for state used to store previous values for keys. This state is used to:

    • Generate UPDATE_BEFORE (-U) records when only UPDATE_AFTER or INSERT is available in the input.
    • Decide between emitting INSERT (+I) or UPDATE (+U) based on whether the key has been seen before.
    • Use state_ttl => INTERVAL '0' MINUTE to store state forever.
4.1.2.2.1 Example: Ordering with Watermarks and Late Records
-- Input stream is partitioned by 'id'.
-- We wait for 'event_time' watermark to progress before emitting.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY id ORDER BY event_time,
)

-- Watermark strategy: INTERVAL '5' MINUTE
-- Current Watermark: 10:00

-- Record A
-- +I[op: 'INSERT', id: 6, name: 'A', event_time: '10:05']
-- Current Watermark: 10:00
-- Result: Buffered. Emitted only when watermark passes 10:05.

-- Record B (Late):
-- +I[op: 'INSERT', id: 5, name: 'B', event_time: '09:55']
-- Current Watermark: 10:00
-- Result: Dropped because 09:55 < 10:00.

-- Record C (Late):
-- +I[op: 'INSERT', id: 7, name: 'C', event_time: '10:11']
-- Current Watermark: 10:06
-- Record A emitted, Record C buffered.
4.1.2.2.2 Example: Single upsert/delete event into INSERT, UPDATE_BEFORE, UPDATE_AFTER, and DELETE
  • We use state here because we need to store the previous events for a key so that we know:

    • If we should emit INSERT or UPDATE_BEFORE/UPDATE_AFTER

    • How to populate UPDATE_BEFORE since we only receive the new version

SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY id,
    uid => 'upsert-unzip-example',
    state_ttl => INTERVAL '5' MINUTE,
    op_mapping => MAP[
        'upsert', 'INSERT, UPDATE_BEFORE, UPDATE_AFTER',
        'delete', 'DELETE'
    ]
);

-- STATE TTL 5 MIN
-- Wall Clock: 14:00
-- Create Record for id=99
-- +I[op: 'upsert', id: 99, name: 'Orphan']
-- State for id=99:
    {id: 99, name: 'Orphan', expires: '14:05'}
-- Emits
    +I[id: 99, name: 'Orphan']

-- Wall Clock: 14:03
-- Update Record for id=99
-- +I[op: 'upsert', id: 99, name: 'Orphan Updated']
-- State for id=99:
    {id: 99, name: 'Orphan Updated', expires: '14:08'}
-- Emits
    -U[id: 99, name: 'Orphan']
    +U[id: 99, name: 'Orphan Updated']

-- Wall Clock: 14:05
-- Delete Record for id=99
-- +I[op: 'delete', id: 99, name: 'Orphan Updated']
-- State for id=99:
    {} -- CLEARED
-- Emits
    -D[id: 99, name: 'Orphan Updated']

-- Wall Clock: 14:10
-- Re-create Record for id=99 (after deletion)
-- +I[op: 'upsert', id: 99, name: 'Resurrected']
-- State for id=99:
    {id: 99, name: 'Resurrected', expires: '14:15'}
-- Emits
    +I[id: 99, name: 'Resurrected']

-- Wall Clock: 15:00 (state expired)
-- State TTL kicks in (processing time)
-- State for id=99:
    {} -- CLEARED by TTL
4.1.2.2.3 Example: watermarks with multiple out-of-order updates
  • Events are ordered before processing when we use ORDER BY event_time
  • Events for the same key are compacted in the PTF if possible
  • The example below shows the state for the sort operator before it enters the PTF
-- Input stream is partitioned by 'id'.
-- We wait for 'event_time' watermark to progress before emitting.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY id ORDER BY event_time
)

-- Watermark strategy: INTERVAL '5' MINUTE

-- Create 1 Record A
-- +I[op: 'INSERT', id: 5, name: 'A', event_time: '9:55']
-- Current Watermark: 9:50
-- Sort Operator State:
    +I[op: 'INSERT', id: 5, name: 'A', event_time: '9:55']

-- Update 3 Record A:
-- +I[op: 'UPDATE_AFTER', id: 5, name: 'Updated A', event_time: '9:57']
-- Current Watermark: 9:52
-- Sort Operator State:
    +I[op: 'INSERT', id: 5, name: 'A', event_time: '9:55']
    +I[op: 'UPDATE_AFTER', id: 5, name: 'Updated A', event_time: '9:57']

-- Update 2 Record A:
-- +I[op: 'UPDATE_AFTER', id: 5, name: 'Between updates A', event_time: '9:56']
-- Current Watermark: 9:52
-- Sort Operator State:
    +I[op: 'INSERT', id: 5, name: 'A', event_time: '9:55']
    +I[op: 'UPDATE_AFTER', id: 5, name: 'Between updates A', event_time: '9:56']
    +I[op: 'UPDATE_AFTER', id: 5, name: 'Updated A', event_time: '9:57']

-- Create Record B:
-- +I[op: 'INSERT', id: 6, name: 'B', event_time: '10:20']
-- Current Watermark: 10:15
-- Sort Operator State:
    +I[op: 'INSERT', id: 6, name: 'B', event_time: '10:20']
-- Watermark triggered emission
-- Sort Operator Emits
    +I[op: 'INSERT', id: 5, name: 'A', event_time: '9:55']
    +I[op: 'UPDATE_AFTER', id: 5, name: 'Between updates A', event_time: '9:56']
    +I[op: 'UPDATE_AFTER', id: 5, name: 'Updated A', event_time: '9:57']
-- FROM_CHANGELOG Emits (compacted - only final state for key within watermark window)
    +I[ id: 5, name: 'Updated A' ]

-- Note: When ORDER BY is used with watermarks, the PTF can compact multiple changes
-- to the same key within a watermark window, emitting only the final state.
-- See section 4.3.2 for details on watermark-based compaction.

-- If we didn't have ORDER BY here, we would have emitted
+I[ id: 5, name: 'A' ]
-U[ id: 5, name: 'A' ]
+U[ id: 5, name: 'Between updates A' ]
+I[ id: 6, name: 'B' ]

4.1.2.3 Delete handling

  • consume_full_deletes: (Optional, boolean, default: false) Specifies whether the input stream contains full deletes (all columns) or partial deletes (key columns only). This parameter controls how the PTF interprets incoming delete records.

    • When false: Input contains partial deletes. For parallelism > 1, PARTITION BY is required so the PTF can route deletions correctly.
    • When true: Input contains full deletes with all column values.
    • Pass-through behavior: The PTF does not convert between partial and full deletes. The output delete format matches the input format.
    • Limitation: If the downstream planner requires full deletes but consume_full_deletes => false, an error will be emitted. Converting partial deletes to full deletes is not supported in FROM_CHANGELOG.
4.1.2.3.1 Example: Consuming partial deletes
-- Input stream has partial deletes (only key columns)
-- PARTITION BY is required for parallelism > 1
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY id,
    uid => 'partial-delete-example',
    op_mapping => MAP[
        'c', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

-- Input: +I[op: 'd', id: 5]  -- partial delete, only key
-- Output: -D[id: 5]
4.1.2.3.2 Example: Consuming full deletes
-- Input stream has full deletes (all columns)
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY id,
    uid => 'full-delete-example',
    consume_full_deletes => true,
    op_mapping => MAP[
        'c', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

-- Input: +I[op: 'd', id: 5, name: 'Alice', age: 30]  -- full delete
-- Output: -D[id: 5, name: 'Alice', age: 30]

4.1.2.4 Passthrough fields

Note: This parameter is not part of the first implementation. It is documented here because it addresses a commonly needed functionality pattern when working with CDC envelope formats.

  • passthrough: (Optional) Specifies top-level input fields to forward to the output alongside the extracted before/after payload. Only relevant when before and/or after are used; in flat mode (no before/after), all fields except op are already part of the output, making this a no-op.

    • Output schema becomes: [...fields from before/after..., ...passthrough fields...]
    • Passthrough fields are not interpreted for changelog logic — they are forwarded as-is from each input record.
4.1.2.4.1 Example: Preserving Debezium envelope metadata
SELECT * FROM FROM_CHANGELOG(
    input => TABLE debezium_source PARTITION BY id,
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    passthrough => DESCRIPTOR(ts_ms),
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

-- Input:  +I[op: 'c', ts_ms: 1234, before: null, after: {id: 1, name: 'Alice'}]
-- Output: +I[id: 1, name: 'Alice', ts_ms: 1234]

4.1.2.5 Complete signature

SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key ORDER BY event_time,
    uid => 'my-cdc-ptf',
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    op => DESCRIPTOR(op),
    state_ttl => INTERVAL '10' MINUTE,
    consume_full_deletes => false,
    passthrough => DESCRIPTOR(field1, field2),
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_BEFORE, UPDATE_AFTER',
    ],
    invalid_op_handling => 'FAIL'
)

4.1.3 Use cases

The following examples show how different use cases for cdc streams can be expressed using the supported signature.

4.1.3.1 Upsert stream to upsert stream

Converting flat append records with explicit operation codes to a Flink upsert stream.

  • Input: Flat append records with an operation field (e.g., __op with values 'c', 'u', 'd').
  • Output: Table backed by a Flink upsert stream {+I, +U, -D}.
  • State: Not required. Updates are directly emitted as UPDATE_AFTER.
  • Key: A primary key is required on the source table so that we can produce updates for this key.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key,
    uid => 'my-cdc-ptf',
    op => DESCRIPTOR(__op),
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

4.1.3.2 Retract stream to retract stream (with Before/After Image)

Converting an append stream containing CDC events with before/after images to a Flink retract stream.

  • Input: Append stream with nested before and after fields containing full row images.
  • Output: Table backed by a Flink retract stream {+I, -U, +U, -D}.
  • State: Not required. Input already contains both before and after images. We assumed that because no state_ttl is specified.
  • Key: A primary key is required on the source table so that we can produce updates for this key.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key,
    uid => 'my-cdc-ptf',
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_BEFORE, UPDATE_AFTER'
    ]
)

4.1.3.3 Retract stream to upsert stream (with Before/After Image)

Converting an append stream containing CDC events with before/after images to a Flink upsert stream (discarding before images).

  • Input: Append stream with nested before and after fields.
  • Output: Table backed by a Flink upsert stream {+I, +U, -D}.
  • State: Not required. The before image is simply ignored.
  • Key: A primary key is required on the source table so that we can produce updates for this key.
  • Partition: For parallelism > 1, the primary key should contain the partition_key.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key,
    uid => 'my-cdc-ptf',
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

4.1.3.4 Upsert stream to upsert stream (with Before/After Image)

Converting an append stream containing CDC events with only after images to a Flink upsert stream.

  • Input: Append stream with nested before and after fields, but only after is populated for updates.
  • Output: Table backed by a Flink upsert stream {+I, +U, -D}.
  • State: Not required. Input format matches output requirements directly.
  • Key: A primary key is required on the source table so that we can produce updates for this key.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key,
    uid => 'my-cdc-ptf',
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

4.1.3.5 Consuming partial deletes from upsert source

Converting an upsert stream where delete events contain only key columns (common in Kafka compacted topics or upsert sources).

  • Input: Upsert stream where deletes contain only the key columns.
  • Output: Table backed by a Flink upsert stream {+I, +U, -D} with partial deletes.
  • State: Not required. Partial deletes are forwarded as-is.
  • Partition: Required for parallelism > 1 so the PTF can route deletions to the correct processor.
  • Key: A primary key is required on the source table so that we can produce updates for this key.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE upsert_source PARTITION BY id,
    uid => 'partial-delete-consumer',
    op_mapping => MAP[
        'c', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

-- Input: +I[op: 'd', id: 5]
-- Output: -D[id: 5]

4.1.3.6 Consuming full deletes from CDC source

Converting a CDC stream where delete events contain all columns (common in Debezium sources with full row images).

  • Input: CDC stream where deletes contain all column values.
  • Output: Table backed by a Flink retract or upsert stream with full deletes.
  • State: Not required. Full deletes are forwarded with all column values.
  • Key: A primary key is required on the source table so that we can produce updates for this key.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE debezium_source PARTITION BY id,
    uid => 'full-delete-consumer',
    consume_full_deletes => true,
    op_mapping => MAP[
        'c', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

-- Input: +I[op: 'd', id: 5, name: 'Alice', age: 30]
-- Output: -D[id: 5, name: 'Alice', age: 30]

4.1.3.7 Upsert stream to retract stream (with deletion flag)

Converting flat append records (no before/after nesting) with a deletion flag to a Flink retract stream.

  • Input: Flat append records where deleted='false' indicates upsert and deleted='true' indicates delete. The deleted field is a boolean or string column that marks whether the record represents a deletion. Common in Kafka Connect formats.
  • Output: Table backed by a Flink retract stream {+I, -U, +U, -D}.
  • State: Required. Must store previous row images to emit UPDATE_BEFORE (-U) since input contains no before image.
  • Key: A primary key is required on the source table so that we can produce updates for this key.

See also: Single upsert/delete event example for a detailed walkthrough with data.

SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key,
    uid => 'my-cdc-ptf',
    op => DESCRIPTOR(deleted),
    state_ttl => INTERVAL '10' MINUTE,
    op_mapping => MAP[
        'false', 'INSERT, UPDATE_BEFORE, UPDATE_AFTER',
        'true', 'DELETE',
    ]
)

4.1.3.8 Upsert stream to upsert stream (with deletion flag)

Converting flat append records with a deletion flag to a Flink upsert stream.

  • Input: Flat append records where deleted='false' indicates upsert and deleted='true' indicates delete. The deleted field is a boolean or string column that marks whether the record represents a deletion.
  • Output: Table backed by a Flink upsert stream {+I, +U, -D}.
  • State: Required. Must track whether a key has been seen to distinguish INSERT from UPDATE_AFTER.
  • Key: A primary key is required on the source table so that we can produce updates for this key.

See also: Single upsert/delete event example for a detailed walkthrough with data.

SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key,
    uid => 'my-cdc-ptf',
    op => DESCRIPTOR(deleted),
    state_ttl => INTERVAL '10' MINUTE,
    op_mapping => MAP[
        'false', 'INSERT, UPDATE_AFTER',
        'true', 'DELETE',
    ]
)

4.1.3.9 Upsert stream to retract stream

Converting flat append records with explicit operation codes to a Flink retract stream.

  • Input: Flat append records with an operation field (e.g., __op with values 'c', 'u', 'd').
  • Output: Table backed by a Flink retract stream {+I, -U, +U, -D}.
  • State: Required. Must store previous row images to emit UPDATE_BEFORE (-U) for update operations.
  • Key: A primary key is required on the source table so that we can produce updates for this key.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key,
    uid => 'my-cdc-ptf',
    op => DESCRIPTOR(__op),
    state_ttl => INTERVAL '10' MINUTE,
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_BEFORE, UPDATE_AFTER'
    ]
)

4.1.3.10 Upsert stream to retract stream (with Before/After Image)

Converting an append stream containing CDC events with only after images to a Flink retract stream.

  • Input: Append stream with nested before and after fields, but only after is populated for updates.
  • Output: Table backed by a Flink retract stream {+I, -U, +U, -D}.
  • State: Required. Must store previous row images to emit UPDATE_BEFORE (-U) when an update arrives.
  • Key: A primary key is required on the source table so that we can produce updates for this key.
SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream PARTITION BY partition_key,
    uid => 'my-cdc-ptf',
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    state_ttl => INTERVAL '10' MINUTE,
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_BEFORE, UPDATE_AFTER'
    ]
)

4.1.3.11 CDC stream with envelope metadata (passthrough)

Preserving top-level CDC envelope fields alongside the extracted row data using passthrough.

  • Input: Append stream with Debezium envelope containing before/after images plus top-level metadata fields (e.g., ts_ms).
  • Output: Table backed by a Flink upsert stream {+I, +U, -D} with ts_ms carried through as an additional column.
  • State: Not required. The before image is simply ignored and passthrough fields are forwarded as-is.
  • Key: A primary key is required on the source table so that we can produce updates for this key.

Note: This use case requires the passthrough parameter which is not part of the first implementation.

SELECT * FROM FROM_CHANGELOG(
    input => TABLE debezium_source PARTITION BY (user_id, tenant_id),
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    passthrough => DESCRIPTOR(ts_ms),
    op_mapping => MAP[
        'c, r', 'INSERT',
        'd', 'DELETE',
        'u', 'UPDATE_AFTER'
    ]
)

-- Input:  +I[op: 'c', ts_ms: 1234, before: null, after: {user_id: 1, name: 'Alice'}]
-- Output: +I[user_id: 1, name: 'Alice', ts_ms: 1234]

4.2 TO_CHANGELOG PTF

This PTF converts a dynamic table (input table) into an append-only stream of changelog records. It is useful for exporting Flink dynamic tables to external systems that expect CDC formats. It's the first functionality in Flink SQL that allows an upsert stream to be converted back to an append stream.

4.2.1 Parameters

4.2.1.1 Default signature

  • input: The dynamic table to convert.
SELECT * FROM TO_CHANGELOG(input => TABLE t)

-- +I[id: 5, name: 'name']
-- Output: +I[op: 'INSERT', id: 5, name: 'name']

-- +U[id: 5, name: 'updated_name']
-- Output: +I[op: 'UPDATE_AFTER', id: 5, name: 'updated_name']

-- -D[id: 5, name: 'updated_name']
-- Output: +I[op: 'DELETE', id: 5, name: 'updated_name']

4.2.1.2 Operation mapping

  • op: (Optional) Defines the output field name for the operation code. Default is 'op'.

  • before: (Optional) Defines the output field for the "before" image. Default: produces flattened records. If neither before nor after is provided, the PTF outputs a flat row with the op field appended, and UPDATE_BEFORE records are ignored.

  • after: (Optional) Defines the output field for the "after" image. Default: produces flattened records. Tip: If you want a single field for both images (e.g., payload), you can configure both as before => DESCRIPTOR(payload), after => DESCRIPTOR(payload).

  • op_mapping: (Optional) Defines mapping from Flink RowKind to output operation codes.

    • Default: INSERT → 'INSERT', DELETE → 'DELETE', UPDATE_AFTER → 'UPDATE_AFTER', UPDATE_BEFORE → 'UPDATE_BEFORE'.
    • Supports combining UPDATE_BEFORE and UPDATE_AFTER into a single output record (e.g., 'UPDATE_BEFORE, UPDATE_AFTER', 'u').
    • Supports mapping multiple RowKinds to the same output code (e.g., 'INSERT, UPDATE_AFTER', 'upsert').
4.2.1.2.1 Example: Flat records output (no before/after nesting)
-- If neither before nor after is provided, output is a flat row with 'op' appended
SELECT * FROM TO_CHANGELOG(
    input => TABLE t PARTITION BY key
)

-- +I[id: 5, name: 'name']
-- Output: +I[op: 'INSERT', id: 5, name: 'name']

-- -U[id: 5, name: 'name']
-- (Ignored by default)

-- +U[id: 5, name: 'updated_name']
-- Output: +I[op: 'UPDATE_AFTER', id: 5, name: 'updated_name']

-- -D[id: 5, name: 'updated_name']
-- Output: +I[op: 'DELETE', id: 5, name: 'updated_name']
4.2.1.2.2 Example: Explicit Update Mapping with Before/After Fields
-- Explicitly mapping updates to 'u' (Debezium style) and defining output structure.
-- This requires the PTF to buffer the -U record to combine it with the +U record.
SELECT * FROM TO_CHANGELOG(
    input => TABLE t,
    before => DESCRIPTOR(before_state),
    after => DESCRIPTOR(after_state),
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_BEFORE, UPDATE_AFTER', 'u'
    ]
)

-- -U[id: 5, name: 'old'], +U[id: 5, name: 'new']
-- Output: +I[op: 'u', before_state: {id: 5, name: 'old'}, after_state: {id: 5, name: 'new'}]

4.2.1.3 Partitioning, out of orderness, state

  • uid: (Optional) Unique identifier for the PTF instance. Recommended for query evolution - allows Flink to restore state correctly when queries are modified. See Flink Docs on Query Evolution.

  • PARTITION BY: (Optional) Distributes processing. The behavior depends on the input changelog mode:

    • For upsert streams: If not specified, the PTF can infer the partition key from the table's upsert key.
    • For retract streams: Partitioning is not strictly required since related -U and +U events arrive in sequence.
    • For append streams: Partitioning has no semantic impact but can improve parallelism.
    • If not defined and no key can be inferred, parallelism defaults to 1.
  • state_ttl: (Optional) Retention time for state used to store previous values for keys. This state is used to:

    • Construct before/after images when combining UPDATE_BEFORE and UPDATE_AFTER into a single output record.
    • Store the previous row image to populate the before field when only UPDATE_AFTER is available.
    • Use state_ttl => INTERVAL '0' MINUTE to store state forever.
4.2.1.3.1 Example: State TTL with Unified Updates
-- Scenario: Flink emits updates as two records: -U (UPDATE_BEFORE) and +U (UPDATE_AFTER).
-- TO_CHANGELOG must cache the -U record to combine it with +U into a single 'u' output.
SELECT * FROM TO_CHANGELOG(
    input => TABLE t,
    state_ttl => INTERVAL '5' MINUTE,
    op_mapping => MAP[
        'INSERT', 'INSERT',
        'DELETE', 'DELETE',
        'UPDATE_BEFORE, UPDATE_AFTER', 'UPDATE'
    ]
)

-- 1. Receive UPDATE_BEFORE
-- -U[ id: 1, val: 10 ]
-- State: Stored -U payload for key 1. (Expires in 5 mins)

-- 2. Receive UPDATE_AFTER 2 minutes later
-- +U[ id: 1, val: 20 ]
-- Result: Matched with stored -U. Emits +I[op: 'UPDATE', before: {id:1, val:10}, after: {id:1, val:20}].

-- Note: If +U arrives > 5 minutes later, the -U state would have expired,
-- potentially resulting in an incomplete record or implicit insert depending on configuration.
4.2.1.3.2 Example: Watermark-based Compaction and Flushing
-- TO_CHANGELOG uses watermarks from the input table to group and flush results.
-- This can compact multiple changes to the same key within a watermark window.
-- Note: The input table must already contain an ordered changelog.

SELECT * FROM TO_CHANGELOG(
    input => TABLE t PARTITION BY id,
    state_ttl => INTERVAL '7' DAY,
    before => DESCRIPTOR(before_state),
    after => DESCRIPTOR(after_state),
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_BEFORE, UPDATE_AFTER', 'u'
    ]
)

-- Watermark strategy: INTERVAL '5' MINUTE
-- Current Watermark: 10:00

-- Record A: Insert for id=5
-- +I[ id: 5, name: 'Alice', event_time: '10:05' ]
-- Current Watermark: 10:00
-- Result: Buffered. Waiting for watermark to advance.

-- Record B: Update for id=5 (before watermark advances)
-- -U[ id: 5, name: 'Alice', event_time: '10:06' ]
-- +U[ id: 5, name: 'Alice Updated', event_time: '10:06' ]
-- Current Watermark: 10:00
-- Result: Buffered. PTF sees INSERT then UPDATE for same key in same window.

-- Record C: Insert for id=6
-- +I[ id: 6, name: 'Bob', event_time: '10:30' ]
-- Current Watermark: 10:25
-- Watermark advanced! Flush buffered records for id=5.

-- PTF Compaction: Instead of emitting:
--   +I[op: 'c', before_state: null, after_state: {id: 5, name: 'Alice'}]
--   +I[op: 'u', before_state: {id: 5, name: 'Alice'}, after_state: {id: 5, name: 'Alice Updated'}]
-- PTF emits only the final state:
--   +I[op: 'c', before_state: null, after_state: {id: 5, name: 'Alice Updated'}]

-- State: Stored payload for key 5: {id: 5, name: 'Alice Updated'} (Expires in 7 days)

-- Record D: Another update for id=5 (after flush)
-- -U[ id: 5, name: 'Alice Updated', event_time: '10:50' ]
-- +U[ id: 5, name: 'Alice Final', event_time: '10:50' ]
-- Current Watermark: 10:45
-- Result: Buffered.

-- Record E: Watermark advances again
-- +I[ id: 7, name: 'Carol', event_time: '11:10' ]
-- Current Watermark: 11:05
-- PTF emits update for id=5 using stored state:
--   +I[op: 'u', before_state: {id: 5, name: 'Alice Updated'}, after_state: {id: 5, name: 'Alice Final'}]
-- State: Updated payload for key 5: {id: 5, name: 'Alice Final'} (Expires in 7 days)

4.2.1.4 Delete handling

  • produces_full_deletes: (Optional, boolean, default: false) Specifies whether the output stream should contain full deletes (all columns) or partial deletes (key columns only).

    • When false: Output contains partial deletes with only key column values. This is the default.
    • When true: Output contains full deletes with all column values. If the input is an upsert stream (partial deletes), a ChangelogNormalize operator is added to the pipeline to produce full deletes before records enter the PTF.
4.2.1.4.1 Example: Producing partial deletes
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    -- produces_full_deletes => false,  -- default
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_AFTER', 'u'
    ]
)

-- -D[id: 5, name: 'Alice', age: 30]
-- Output: +I[op: 'd', id: 5]  -- partial delete, only key columns
4.2.1.4.2 Example: Producing full deletes
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    produces_full_deletes => true,
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_AFTER', 'u'
    ]
)

-- If input is upsert stream, ChangelogNormalize is added upstream
-- -D[id: 5, name: 'Alice', age: 30]
-- Output: +I[op: 'd', id: 5, name: 'Alice', age: 30]  -- full delete

4.2.1.5 Passthrough fields

Note: This parameter is not part of the first implementation. It is documented here because it addresses a commonly needed functionality pattern when producing CDC-compatible output.

  • passthrough: (Optional) Specifies input table columns to exclude from before/after nesting, keeping them as top-level fields in the output envelope. Only relevant when before and/or after are used; in flat mode (no before/after), all columns are already at the top level, making this a no-op.

    • Output schema becomes: [op, ...passthrough fields..., before: {...remaining columns...}, after: {...remaining columns...}]
    • Passthrough fields are not included in the before/after images.
4.2.1.5.1 Example: Producing CDC envelope with metadata
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    passthrough => DESCRIPTOR(ts_ms),
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_BEFORE, UPDATE_AFTER', 'u'
    ]
)

-- Input:  +I[id: 1, name: 'Alice', ts_ms: 1234]
-- Output: +I[op: 'c', ts_ms: 1234, before: null, after: {id: 1, name: 'Alice'}]

4.2.1.6 Complete signature

SELECT * FROM TO_CHANGELOG(
  input => TABLE t PARTITION BY k,
  uid => 'my-cdc-ptf',
  before => DESCRIPTOR(before),
  after => DESCRIPTOR(after),
  op => DESCRIPTOR(op),
  state_ttl => INTERVAL '10' MINUTE,
  produces_full_deletes => false,
  passthrough => DESCRIPTOR(field1, field2),
  op_mapping => MAP[
            'INSERT', 'c',
            'DELETE', 'd',
            'UPDATE_BEFORE, UPDATE_AFTER', 'u'
            ]
)

-- +I[id: 5, name: 'name']
-- Output: +I[op: 'c', before: null, after: {id: 5, name: 'name'}]

-- -U[id: 5, name: 'name'], +U[id: 5, name: 'updated_name']
-- Output: +I[op: 'u', before: {id: 5, name: 'name'}, after: {id: 5, name: 'updated_name'}]

-- -D[id: 5, name: 'updated_name']
-- Output: +I[op: 'd', before: {id: 5, name: 'updated_name'}, after: null]

4.2.2 Use cases

4.2.2.1 Retract/upsert stream to upsert stream (with deletion flag)

Converting a Flink stream to flat append records with a deletion flag (Kafka Connect style).

  • Input: Table backed by a Flink retract or upsert stream.
  • Output: Flat append records where deleted='false' for inserts/updates and deleted='true' for deletes. The deleted field is a boolean or string column added to the output that marks whether the record represents a deletion.
  • State: Not required. UPDATE_BEFORE (-U) events are dropped if present.
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    op => DESCRIPTOR(deleted),
    op_mapping => MAP[
        'INSERT, UPDATE_AFTER', 'false', -- UPDATE_BEFORE will be dropped if exists
        'DELETE', 'true'
    ]
)

-- -D[id: 5, name: 'A']
-- Output: +I[id: 5, name: 'A', deleted: 'true']

4.2.2.2 Retract/upsert stream to upsert stream

Converting a Flink stream to flat append records with explicit operation codes.

  • Input: Table backed by a Flink retract or upsert stream.
  • Output: Flat append records with operation code field (e.g., 'I', 'U', 'D').
  • State: Not required. Each Flink event maps directly to an output record.
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    op => DESCRIPTOR(op_code),
    op_mapping => MAP[
        'INSERT', 'I',
        'DELETE', 'D',
        'UPDATE_AFTER', 'U'
    ]
)

-- +U[id: 7, val: 50]
-- Output: +I[op_code: 'U', id: 7, val: 50]

4.2.2.3 Retract stream to upsert stream

Converting a Flink retract stream to an append stream with only after images.

  • Input: Table backed by a Flink retract stream {+I, -U, +U, -D}.
  • Output: Append stream with flat records or nested payload (no before image).
  • State: Not required. UPDATE_BEFORE (-U) events are simply dropped.
  • Note: User can use SQL to assign data to a before/after field depending on operation if necessary.
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    uid => 'nested-cdc-out',
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_AFTER', 'u'
    ]
)

-- +U[id: 1, val: 20]
-- Output: +I[op: 'u', id: 1, val: 20]

or

SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    uid => 'nested-cdc-out',
    before => DESCRIPTOR(payload),
    after => DESCRIPTOR(payload),
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_AFTER', 'u'
    ]
)

-- +U[id: 1, val: 20]
-- Output: +I[op: 'u', payload: {id: 1, val: 20}]

4.2.2.4 Upsert stream to upsert stream

Converting a Flink upsert stream to an append stream with only after images (reformatted).

  • Input: Table backed by a Flink upsert stream {+I, +U, -D}.
  • Output: Append stream with flat records or nested payload.
  • State: Not required. Direct passthrough with format transformation only.
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    uid => 'nested-cdc-out',
    before => DESCRIPTOR(payload),
    after => DESCRIPTOR(payload),
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_AFTER', 'u'
    ]
)

-- +U[id: 1, val: 20]
-- Output: +I[op: 'u', payload: {id: 1, val: 20}]

4.2.2.5 Producing partial deletes for upsert sink

Exporting to an upsert sink (e.g., Kafka compacted topic) that only needs key columns for deletions.

  • Input: Table backed by a Flink upsert or retract stream.
  • Output: Append stream with partial deletes (only key columns in delete events).
  • State: Not required. Delete events are emitted with key columns only.
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    -- produces_full_deletes => false,  -- default
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_AFTER', 'u'
    ]
)

-- -D[id: 5, name: 'Alice', age: 30]
-- Output: +I[op: 'd', id: 5]

4.2.2.6 Producing full deletes for CDC sink

Exporting to a CDC-compatible sink that requires all columns in delete events (e.g., for downstream systems that need the full row image for auditing or replication).

  • Input: Table backed by a Flink upsert or retract stream.
  • Output: Append stream with full deletes (all columns in delete events).
  • State: If input is an upsert stream, ChangelogNormalize is added upstream to materialize full row images for deletes.
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    produces_full_deletes => true,
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_AFTER', 'u'
    ]
)

-- -D[id: 5, name: 'Alice', age: 30]
-- Output: +I[op: 'd', id: 5, name: 'Alice', age: 30]

4.2.2.7 Retract stream to retract stream (with Before/After Image)

Converting a Flink retract stream to an append stream with nested before/after images (e.g., Debezium format).

  • Input: Table backed by a Flink retract stream {+I, -U, +U, -D} where updates arrive as two separate events.
  • Output: Append stream with before and after fields combined in a single record per update.
  • State: Required. Must buffer UPDATE_BEFORE (-U) until UPDATE_AFTER (+U) arrives to combine them into a single output record.
  • Key: If parallelism > 1, the upsert key from the input table should contain the partition key.
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    uid => 'nested-cdc-out',
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    state_ttl => INTERVAL '1' HOUR,
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_BEFORE, UPDATE_AFTER', 'u'
    ]
)

-- -U[id: 1, val: 10] then +U[id: 1, val: 20]
-- Output: +I[op: 'u', before: {id: 1, val: 10}, after: {id: 1, val: 20}]

4.2.2.8 Upsert stream to retract stream (with Before/After Image)

Converting a Flink upsert stream to an append stream with nested before/after images.

  • Input: Table backed by a Flink upsert stream {+I, +U, -D} where no UPDATE_BEFORE is available.
  • Output: Append stream with before and after fields.
  • State: Required. Must store previous row images to populate the before field when an update arrives.
  • Key: If parallelism > 1, the upsert key from the input table should contain the partition key.
SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    uid => 'nested-cdc-out',
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    state_ttl => INTERVAL '1' HOUR,
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_AFTER', 'u'
    ]
)

-- +I[id: 1, val: 10] then +U[id: 1, val: 20]
-- Output: +I[op: 'u', before: {id: 1, val: 10}, after: {id: 1, val: 20}]

4.2.2.9 Producing CDC envelope with metadata (passthrough)

Keeping specified input columns at the top level of the output envelope instead of nesting them inside before/after.

  • Input: Table backed by a Flink upsert or retract stream with a ts_ms column that should remain at the envelope level.
  • Output: Append stream with ts_ms at the top level and remaining columns nested in before/after.
  • State: Not required for upsert-to-upsert. Required if combining UPDATE_BEFORE and UPDATE_AFTER into a single output record.

Note: This use case requires the passthrough parameter which is not part of the first implementation.

SELECT * FROM TO_CHANGELOG(
    input => TABLE dynamic_table PARTITION BY id,
    before => DESCRIPTOR(before),
    after => DESCRIPTOR(after),
    passthrough => DESCRIPTOR(ts_ms),
    op_mapping => MAP[
        'INSERT', 'c',
        'DELETE', 'd',
        'UPDATE_BEFORE, UPDATE_AFTER', 'u'
    ]
)

-- Input:  +I[id: 1, name: 'Alice', ts_ms: 1234]
-- Output: +I[op: 'c', ts_ms: 1234, before: null, after: {id: 1, name: 'Alice'}]

4.3 Error Handling

This section documents the behavior of FROM_CHANGELOG and TO_CHANGELOG when encountering edge cases and error conditions.

4.3.1 Invalid Operation Codes

When FROM_CHANGELOG encounters an operation code not defined in op_mapping, behavior is controlled by the invalid_op_handling parameter:

  • 'FAIL' (default): Throws a ValidationException and fails the job. Ensures strict validation.
  • 'LOG': The record is skipped and a warning is logged. Provides visibility without disrupting production.
  • 'SKIP': Silently skips the record without logging. Use when invalid ops are expected and can be ignored.

4.3.2 Understanding ORDER BY, Watermarks, and state_ttl

These concepts serve different purposes and can be used together when needed:

ORDER BY (Sorting Operator) - FROM_CHANGELOG only

  • Availability: Only available for FROM_CHANGELOG (not TO_CHANGELOG)
  • Purpose: Reorders out-of-order events using watermarks
  • Where it operates: A separate sorting operator added before the PTF
  • Time semantics: Event time (uses watermarks from input stream)
  • Late data handling: Records arriving after the watermark are dropped by the sorting operator
  • Note: TO_CHANGELOG does not support ORDER BY because we assume tables in Flink already contain an ordered changelog

Watermarks inside the PTF

  • Purpose: Group and flush results, potentially compacting the changelog to discard unnecessary intermediate changes
  • Where it operates: Inside the PTF itself
  • Benefit: Can reduce output volume by combining multiple changes to the same key within a watermark window
  • Used by: Both FROM_CHANGELOG and TO_CHANGELOG

state_ttl (Key State Management)

  • Purpose: Store previous values for keys to:
    • Generate UPDATE_BEFORE (-U) when only UPDATE_AFTER or previous INSERT is available
    • Decide between emitting INSERT (+I) or UPDATE (+U) based on whether the key exists in state
  • Time semantics: Processing time (wall-clock based expiration)
  • State expiration: Based on wall-clock inactivity since last access

Comparison Table

AspectORDER BY (Sorting)Watermarks (PTF)state_ttl
PurposeReorder eventsGroup/flush resultsStore key state for -U/+I vs +U
LocationSeparate operatorInside PTFInside PTF
AvailabilityFROM_CHANGELOG onlyBoth PTFsBoth PTFs
Time semanticsEvent timeEvent timeProcessing time
Late dataDroppedN/AProcessed normally

4.3.3 State TTL Expiration

When state_ttl is configured and state expires before a related event arrives:

  • UPDATE_BEFORE missing: If state expires before an update arrives, the PTF emits an INSERT instead of UPDATE_BEFORE + UPDATE_AFTER.
  • Pending -U expires (TO_CHANGELOG): If an UPDATE_BEFORE (-U) is buffered waiting for UPDATE_AFTER (+U) and state expires, the -U is dropped. When +U finally arrives, it's emitted as an INSERT.
  • Recommendation: Set state_ttl conservatively based on expected event timing. Use INTERVAL '0' MINUTE to disable expiration if data integrity is critical.

4.3.4 Mismatched -U/+U Pairs

When combining UPDATE_BEFORE and UPDATE_AFTER into a single output record:

  • With watermarks: The PTF waits for watermarks to progress before emitting, allowing time to group -U and +U records for the same key.
  • With state_ttl: The PTF stores the -U record in state and waits for the corresponding +U record to arrive.
  • With both: Watermarks control when to flush, and state_ttl manages how long to retain pending -U records.
  • With neither: Records are emitted as they arrive (may result in separate -U and +U output records instead of a combined update).

4.3.5 NULL Values in Key/Operation Columns

  • NULL in operation column: The pipeline will fail with an error. A NULL operation code indicates corrupt or malformed CDC data that cannot be safely processed.
  • NULL in upsert key: The pipeline will fail with an error. Updates and deletes cannot be processed without a valid key to identify the target row.

4.3.6 Invalid op_mapping Combinations

The op_mapping is validated at planning time. Invalid mappings fail the query before the job starts.

Rules:

  • All RowKind names must be valid: INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER.
  • Each RowKind can appear at most once across all mapping entries.
  • Grouped RowKinds must be semantically compatible. Valid groupings: single RowKind, UPDATE_BEFORE, UPDATE_AFTER, INSERT, UPDATE_AFTER, or INSERT, UPDATE_BEFORE, UPDATE_AFTER. Groupings like INSERT, DELETE are rejected.

Example — invalid mapping:

-- Fails at planning: for each function, one side must be compatible with Flink's RowKind names.
op_mapping => MAP['MY_INSERT', 'NEW_INSERT']

4.3.7 Passthrough Field Validation

Note: The passthrough parameter is not part of the first implementation. These validation rules are documented for completeness.

  • Name conflicts with before/after fields: If a passthrough field has the same name as a field inside before/after, the pipeline will fail with a validation error at planning time. Passthrough fields and payload fields must be disjoint.
  • passthrough without before/after: This is a no-op. In flat mode (no before/after specified), all fields except op are already part of the output. The PTF will not raise an error but the parameter has no effect.

4.4 State Requirements Summary

The following table summarizes when state is required for each conversion type:

ConversionFROM_CHANGELOGTO_CHANGELOG
Retract → RetractNo state (before/after provided)State required (buffer -U for +U)
Retract → UpsertNo state (ignore before image)No state (drop -U events)
Upsert → RetractState required (store for -U)State required (store for before image)
Upsert → UpsertNo state (direct passthrough)No state (direct passthrough)
Flat record → RetractState required (store for -U)N/A
Flat record → UpsertState required (distinguish I vs U)N/A

Key insights:

  • State is required when the output needs information not present in the input (e.g., generating UPDATE_BEFORE from upsert input).
  • State is required for TO_CHANGELOG when combining separate -U and +U events into a single output record.
  • When state is required, state_ttl should be configured to control state retention and prevent unbounded state growth.
  • ORDER BY (with watermarks) can be used in FROM_CHANGELOG to fix event ordering issues in the input stream.
  • Watermarks inside the PTF can help compact the changelog by grouping changes within a watermark window.

4.5 Symmetrical Usage

The PTFs are designed to work symmetrically. The default operation mappings are compatible in both directions.

4.5.1 FROM_CHANGELOG(TO_CHANGELOG(dynamic_table))

Round-trip: dynamic table → changelog append stream → dynamic table

SELECT * FROM FROM_CHANGELOG(
    input => TABLE TO_CHANGELOG(input => TABLE my_table)
)

Input (dynamic table)
-- +I[id: 1, name: 'Alice']
-- +U[id: 1, name: 'Alice Updated']
-- -D[id: 1, name: 'Alice Updated']

Append Stream (after TO_CHANGELOG)
-- +I[op: 'INSERT', id: 1, name: 'Alice']
-- +I[op: 'UPDATE_AFTER', id: 1, name: 'Alice Updated']
-- +I[op: 'DELETE', id: 1, name: 'Alice Updated']

Flink Changelog (after FROM_CHANGELOG)
-- +I[id: 1, name: 'Alice']
-- +U[id: 1, name: 'Alice Updated']
-- -D[id: 1, name: 'Alice Updated']

Table
-- Empty table (after all operations)

4.5.2 TO_CHANGELOG(FROM_CHANGELOG(append_stream))

Round-trip: changelog append stream → dynamic table → changelog append stream

SELECT * FROM TO_CHANGELOG(
    input => TABLE FROM_CHANGELOG(input => TABLE cdc_stream)
)

Input (append stream)
-- +I[op: 'INSERT', id: 1, name: 'Alice']
-- +I[op: 'UPDATE_AFTER', id: 1, name: 'Alice Updated']
-- +I[op: 'DELETE', id: 1, name: 'Alice Updated']

Flink Changelog (after FROM_CHANGELOG)
-- +I[id: 1, name: 'Alice']
-- +U[id: 1, name: 'Alice Updated']
-- -D[id: 1, name: 'Alice Updated']

Append Stream (after TO_CHANGELOG)
-- +I[op: 'INSERT', id: 1, name: 'Alice']
-- +I[op: 'UPDATE_AFTER', id: 1, name: 'Alice Updated']
-- +I[op: 'DELETE', id: 1, name: 'Alice Updated']

Table
-- Empty table (after all operations)

5 Compatibility, Deprecation, and Migration Plan

This FLIP introduces new functionality and does not affect existing SQL behavior. Old syntax is unaffected.

6 Test Plan

Testing will ensure full coverage of the new signatures and all use cases described in the FLIP. This includes but is not limited to:

  1. Changelog Mode Enforcement: Test FROM_CHANGELOG and TO_CHANGELOG with all supported CHANGELOG_MODE settings ('ALL', 'INSERT_ONLY', 'UPSERT') and assert the resulting/required change types.

  2. Custom CDC Mapping Test: Use FROM_CHANGELOG with the op_mapping map argument to process a simulated custom CDC stream and confirm the resulting dynamic table state is correct.

7 Rejected Alternatives

7.1 Naming: TO_CHANGELOG / FROM_CHANGELOG

The names TO_CHANGELOG and FROM_CHANGELOG were chosen to clearly convey the direction of conversion:

  • TO_CHANGELOG means "serialize TO an explicit changelog representation" — converting a dynamic table into an append-only stream with explicit operation codes.
  • FROM_CHANGELOG means "deserialize FROM an explicit changelog representation" — converting an append-only stream with operation codes back into a dynamic table.

The TO/FROM pattern was preferred because it:

  1. Aligns with the existing Flink DataStream API (toChangelogStream() / fromChangelogStream())
  2. Clearly indicates the direction of conversion
  3. Uses "changelog" which is the established Flink terminology for the underlying stream semantics

7.1.1 Alternatives Considered

The following alternative naming schemes were considered:

AlternativeRationale for Rejection
TO_CDC_STREAM / FROM_CDC_STREAM"CDC" is more specific to Change Data Capture systems, while "changelog" is the broader Flink concept. The PTFs work with any changelog representation, not just CDC formats.
ENCODE_CHANGELOG / DECODE_CHANGELOGWhile technically accurate (encoding/decoding the changelog representation), the TO/FROM naming is more intuitive for SQL users and aligns better with existing Flink DataStream API naming (toChangelogStream() / fromChangelogStream()).
FLATTEN_TO_CDC / UNFLATTEN_FROM_CDCImplies the primary operation is flattening/unflattening nested structures, but the PTFs do more than that — they handle operation code mapping, state management, and changelog mode conversion.
DEMATERIALIZE / MATERIALIZEThese terms have specific meanings in database theory (materialized views) that don't precisely match the PTF semantics. Additionally, MATERIALIZE could be confused with operators that persist data, while these PTFs are purely streaming transformations.

If you think that any of these are better or you have any other ideas you want to suggest, feel free to let me know in the dev mailing list.

  • No labels