Status

Discussion thread
Vote thread
JIRA

FLINK-15776 - Getting issue details... STATUS

Release
ReasonObsoleted by FLINK-143

Motivation

While implementing JDBC exactly once sink I found that the current abstractions (TwoPhaseCommitSinkFunction) don’t suit this use case. Having a requirement to avoid code duplication, I propose a new abstraction, with the following goals in mind:

  1. accommodate the needs of the existing Kafka sinks
  2. accommodate the needs of the new JDBC sink:
    1. commits are retried in case of transient failures instead of failing the job
    2. rollbacks are retried
    3. need to distinguish between transactions started during this run and restored from the state; ignore commit failures (with reason “unknown”) for the latter; this is a consequence of a lack of timeouts 
    4. when committing a group of transactions: an option to stop commits as soon as one failed; otherwise consistency can be violated (if the failure was transient then failed commit and all the further commits will be retried later)
    5. transaction timeouts aren’t used to ignore commit failures, as most DBs don’t support them
    6. state will probably need to include all to-commit transactions (as union list)
    7. minor API changes required
  3. accommodate the needs of other 2PC-sinks in future; these could be existing file sink, WAL; or potential DynamoDb, pulsar
  4. and non-sinks (see this question)
  5. batch jobs support in which sinks may not be running at the time when the job finishes and pre-committed checkpoints need to be committed
  6. improve testability; currently, TwoPhaseCommitSinkFunction requires a lot of mocking

Public Interfaces

All new abstractions (see below) should eventually become public (@Public or @PublicEvolving) to allow extension for different use cases.

Proposed Changes

Introduce a new set of abstractions to interact with two-phase commit systems. The main idea is a more fine-grained separation of concerns:

  • 2PC Resource: facade to an external 2PC-compatible system
  • 2PC GroupOperations: perform multiple commits/rollbacks applying error handling logic and collecting failures for retry
  • 2PC Function State: represent function state (in memory)
  • 2PC Function StateHandler: load and store State
  • Abstract 2PC function: wire all this together (including types); and with the rest of the system


This enables customization of various aspects independently and finer grained testing. 

Extracting 2PC Resource also allows to run it not as a Sink (might be needed for a batch jobs to commit final pre-committed transactions when Tasks are not running anymore).


Serialization can be viewed as implementation detail of StateHandler. Though API to build it or some default implementation should be provided.

FlinkKafkaProducer state

Currently, transactions are effectively grouped by subtask that produced them.

This can NOT be simply flattened: on scale-up transactions could end up on different subtasks and commit-rollback order will be violated as well as relation between tx and id pool will be lost.

In general, changes to existing state should be minimized.

Write-Ahead-Log sink over 2PC

Current WAL sink implementation has some similarities with 2PC and therefore can reuse parts of it. However, it is much simpler in sense that it has nothing to rollback: there are no external transactions and current state handle is discarded automatically if not closed properly.

If we decide to implement it on top of 2PC the following should be kept in mind:

- function needs access to 2pc-resource

- snapshotState uses RuntimeContext

- transaction object is can only be "finalized" at pre-commit phase (not on start); so it can't be immutable

- if checkpoint was already taken then NEW data is discarded

Naming

  • Epoch vs TwoPhaseCommit vs Transaction
    • Transaction is not necessarily serial; and for files there is no transaction
    • TwoPhaseCommit is long and  ambiguous (internally 2PC is used for checkpoints)
    • Epoch is too general

Diagram

Compatibility, Deprecation, and Migration Plan

  • TwoPhaseCommitSinkFunction can be either
    • ported to a new abstraction
    • left intact, having a new abstraction in a “stealth mode”, for some “incubation period”
    • deprecated and eventually removed
  • depending on TwoPhaseCommitSinkFunction, KafkaProducerSinks can be either left intact or rebased to a new function
    • state compatibility needs to be provided for KafkaTransactionState (depending on de/serialization)

Test Plan

Existing Kafka tests and new JDBC tests shouldn’t be affected and should cover the changes.

Rejected Alternatives

  1. Use TwoPhaseCommitSinkFunction with minimal modifications and losing some functionality from JDBC sink: 
    1. adds technical debt by having more code based on the wrong abstraction
    2. cuts the JDBC functionality
    3. still requires to (slightly) change the existing abstraction
  2. Use two different implementations for now; gather some feedback; probably, design other 2pc sinks to understand their needs; provide the abstraction based on this.
    1. duplication of logic in the intermediate state
    2. lack of confidence that it won’t become an eventual state?