Status

Discussion thread:https://lists.apache.org/thread/hltxy61jnqsllq8vcj1zpwft7vvzf2kx
Vote Thread: https://lists.apache.org/thread/rwctfqfkcf3mxqjbwml2xqrck86pocww
JIRA:

FLINK-34554 - Getting issue details... STATUS

Released: 

Current state

Accepted

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka sink needs to use Kafka transactions in exactly once mode to atomically write data aligned with the checkpoints in a similar way to how all other exactly once sinks work.

Open transactions are optimistically cleaned up during shutdown when possible in the writer. However, Flink guarantees a clean shutdown only when using stop-with-savepoint. For any other scenario including failures, shutdown may be unclean leading to lingering transactions after shutdown. 

Usually these lingering transactions are cleaned up eventually on target systems but they have dire consequences on Kafka: transactions are serialized in Kafka and block the progress of the LSO in respective topics. Hence, READ_COMMITTED consumers will not see new, data cannot be offloaded, and topics cannot be compacted.

To address these cases, Flink sinks try to find and abort lingering transactions on startup (recovery being the common case). However, Flink has no inherent way to track opened transactions:

  • State is persisted only during successful checkpoints while transactions need to be opened at the beginning of a new checkpoint phase.
  • In batch queries, there is not even a state storage necessary but we still need atomic writes.

For now the assumption for exactly once sinks was:

  • Target systems allow to introspect open transactions (e.g., file systems have directory listing, JDBC XA returns all open transactions)
  • Else we can use the target system itself to store meta information to minimize system dependencies.

In Kafka sink, we used a third a approach so far that turned out to cause severe issues as follows. Note that all approaches need to be precise in the case of Kafka: Aborting too many transactions result in data loss. Aborting too few transactions stalls the progress from the point of consumers.

Probing transactions

The Kafka producer does not have an API to check for the state of a transaction. We used the following observation to develop a probing scheme to find lingering transactions by checking for the state of transaction T: The Kafka sink inits T and checks the epoch. For epoch=0, it is assumed that the transaction is new and for epoch>0, it is assumed that the transaction had existed before. The latter case implies a lingering transaction.

Building on top of the observation, we devised the following naming scheme in Flink to reliably abort transactions: transactionalIdPrefix-subtaskId-checkpointId.

Upon restart with checkpoint C on subtask S, Flink re-commits all transactionalIdPrefix-S-C that we recorded in the checkpoint. Committing an already committed transaction is side-effect free, so for safety Flink commits them all (ideally, we would just commit those that are uncommitted). Flink then looks at all potential failed checkpoints C’>C that may have come after the successful checkpoint C and re-initializes transactionalIdPrefix-S-C’, which effectively aborts the lingering transactions. If a checkpoint C’ has at least one lingering transaction (epoch>0), C’+1 is probed and so on. Flink repeats until all aborted transactions have epoch=0 and assumes that it has aborted all lingering transactions for that subtask.

However, Flink may have been up- and downscaled in any of those failed checkpoints, meaning that some subtasks are not checked. So we also need to check subtask S’>parallelism for both C and C’. Again, if we see epoch=0, for S’ we can stop looking into this direction. In general, any restart of Flink initializes at least 2 additional transactions (one along checkpoint dimension and one along subtask dimension). So we effectively initialize 3*parallelism transactions in the best case, which is barely noticeable on Flink.

In the worst case scenario, however, Flink never manages to get another successful checkpoint running inside a restart loop. In that case, each restart will probe the same previously uninitialized transactions, which is not side-effect free: A new transaction will be initialized and on next restart Flink assumes a lingering transaction and expands the search radius!

Each restart of the same checkpoint will thus triple the number of transactions to be checked on the next restart! The transaction explosion will be stopped once a successful checkpoint has been conducted at which point any restart loop will start at 3*parallelism again.

The good, the bad, and the ugly

The described approach has some advantages: It reliably aborts all opened transactions and will never abort transactions that need to be committed.

The obvious downside is that recovery times are unpredictable and may be substantial in case of recovery loops. Since restart loop usually imply another issue, it's usually an amplification of a bad state but this issue can be avoided by ensuring that restarts are sparse through proper setup.

However, the biggest issue with that approach is not apparent at first: It wastes a ton of memory on the broker FLINK-34554 - Getting issue details... STATUS . What we didn't know when we devised the schema is that Kafka retains metadata about each transaction for 7 days in memory. Since having unique ids per checkpoint imply that a transaction usually lives only for 1 min, it's a disproportionate amount of memory that is needed in a usual application (7days*24hours*60min*parallelism~10000*parallelism transaction ids per sink with a 1min checkpointing interval). Kafka transaction ids were designed to be closely reused: in KStreams and Kafka connect each subtask has exactly one transaction id and reuses it. Since those systems don't use 2PC, the performance hit of waiting for the commit to happen is minimal.

So while probing can become an issue, the deeper issue is the employed transaction name scheme.

The solution

Since Kafka 3.0, there is the ListTransaction API to actually introspect the state of Kafka. If we use that, we don't need to probe transaction ids anymore to reliably abort open transactions. Consequently, we don't need to use unique transaction ids anymore. Instead we use a dynamic pool of transaction ids.

Normal execution

  • The writer maintains a growing list of open transaction ids. It will have almost the same format as now: <prefix>-<subtask id>-<counter>, where counter increases to ensure an unused transaction id.
  • On start, the writer creates a transaction and subsequently writes into it.
  • On checkpoint, the transaction is finalized and passed to the committer. A new transaction is opened with an unused id. The writer also stores all open transactions in its state.
  • On checkpoint completed, the committer commits the transaction and notifies the writer that the id is available again.
  • On next checkpoint, the writer consolidates its state. It reuses transactional ids if possible.
  • On shutdown, the writer aborts the current transaction.

Recovery

  • The writer uses ListTransaction to receive a list of all open transactions. From the recovered checkpoint state, it also knows which transactions still need to be re-committed. It aborts all other open transactions.
  • The committer receives a number of Flink states that contain the transaction ids to be committed. It recommits them and notifies the writer about finished transactions.
  • There is no special handling of transactions that have already been committed and are retried. They silently succeed.

Expected number of transactions

  • There is one open transaction in the writer at all times.
  • There is at least one finalized transaction waiting to be committed. The exact number depends on the checkpointing times and some temporary factors like retries. For 1 minute checkpointing time, the expectation is 1 additional transaction.
  • During recovery, the ids that need to be re-committed are blocked from usage. The number corresponds to the number of finalized transactions of the previous run.
  • So for most Flink applications, we expect 3 transactional ids to be in use per subtask. Abnormal situations may temporarily increase the number.
  • To limit strain on the broker, safeguards should be implemented for abnormal situations. If there's a temporary spike to 100 transactional IDs, for example, it will take 7 days before the corresponding broker memory can be reclaimed. But that's 2 orders of magnitude fewer IDs than we have now in the common case.

Public Interfaces

We are extending KafkaSinkBuilder and table options with one respective feature flag.

public class KafkaSinkBuilder<IN> {
    ...
public KafkaSinkBuilder<IN> setTransactionNamingStrategy(
TransactionNamingStrategy transactionNamingStrategy);

}


Where the TransactionNamingStrategy is the same enum that can also be used in table API/SQL as an option.

/** (Table) options for {@link KafkaSink}. */
@PublicEvolving
public class KafkaConnectorOptions {
...

public static final ConfigOption<TransactionNamingStrategy> TRANSACTION_NAMING_STRATEGY =
ConfigOptions.key("sink.transaction-naming-strategy")
.enumType(TransactionNamingStrategy.class)
.defaultValue(TransactionNamingStrategy.DEFAULT);
}

@PublicEvolving
public enum TransactionNamingStrategy {
// old behavior
INCREMENTING(
TransactionNamingStrategyImpl.INCREMENTING, TransactionAbortStrategyImpl.PROBING),
// new behavior
POOLING(TransactionNamingStrategyImpl.POOLING, TransactionAbortStrategyImpl.LISTING);

public static final TransactionNamingStrategy DEFAULT = INCREMENTING;
}
The new option is considered to be an expert option to opt into the new behavior. At some point (see Migration Plan), we set DEFAULT = POOLING and then the main purpose is to opt out of the new behavior. Using an enum allows us to explore other strategies in the future.

Proposed Changes

  • Extend writer state to capture ongoing transaction ids.
  • Introduce internal TransactionAbortStrategyImpl and TransactionNamingStrategyImpl enums.
  • Extract the existing code into TransactionAbortStrategyImpl.PROBING and TransactionNamingStrategyImpl.INCREMENTING.
  • Add the new strategies.
  • Add respective configuration (see public interfaces).

Compatibility, Deprecation, and Migration Plan

  • By default, the existing behavior is chosen. This ensure two things: behavior is as expected and write state is prepared for the new strategies.
  • The user needs to opt-in the new behavior for the release version. If a user doesn't experience memory issues on the broker, they can try LISTING+INCREMENTING first for predictable recovery times.
  • Ultimately, we gather experience on production systems with the new behavior. If everything works as expected, we change the DEFAULT to the new strategies.
  • After a few versions, we optionally can deprecate the old strategies. If no other new strategies pop up, we may even opt to remove the configuration options.

Test Plan

  • The changes will be tested with regular ITCases and E2ECases.
  • After passing these basic tests, we need to test the new strategies on dev clusters. Since it's opt-in, it should be safe to deploy for the whole dev environment and then gradually turn on the feature.
  • After passing dev tests, we need to test on productive systems with larger scale. Again, because it's hidden by a feature toggle, we can gradually enable and revert back when we discover issues.

Rejected Alternatives

We briefly discussed and rejected other alternatives.

Use Flink HA to store open transactions

Flink HA store is not meant to be used for a larger amount of data. Currently, JM writes one entry per successful checkpoint. We (naively) would need to write one entry per TM per started checkpoint.

Use Kafka topic to store metadata

To avoid the dependency on Kafka 3.0 on the brokers, we could store metadata itself in Kafka topics. However, that would have made the connector much more complicated with additional synchonization points. We would replicate the data that's already available. Since Kafka 3.0 is out since 4 years already, we simply assume that the brokers support ListTransaction API already.

Use static pool of IDs

The old FlinkKafkaProducer used a static pool of IDs. However, they can deplete on concurrent checkpoints, lost notifyCheckpointCompleted, and aborted checkpoints in general (long recovery times). Static pools also don't solve the downscaling cases sufficiently well.

  • No labels