Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-53 introduced a fraction based approach for sharing managed memory within a slot. This approach needs to be extended as python operators, which also use managed memory, are introduced. This FLIP proposes a design for extending intra-slot managed memory sharing for python operators and other potential future managed memory use cases.

Background

FLIP-53: Fine Grained Operator Resource Management

In FLIP-53, we introduced the fraction based approach for sharing managed memory across operators in a slot, scaling the memory consumptions w.r.t. whatever memory is available in the slot.

FLIP-53 assumes that:

  • For streaming jobs, RocksDB state backend is the only managed memory use case.
  • For batch jobs, batch operators are the only managed memory use cases.

Based on this assumption:

  • RocksDB state backend will always try to take all the managed memory in the slot
  • When generating the job graph, a weighted fraction will be calculated for each batch operator, indicating what proportion it should take from the slot’s total managed memory.

This assumption is no longer true, as python operators are introduced for both streaming and batch jobs.

Python Operators

Python operators are operators implemented and executed in python language. To execute the python operators, separate python VM processes are needed in addition to the JVM processes.

According to the discussions in FLINK-18738, python operators in the same slot should share the same python process. Memory used by the python processes should also be accounted for by managed memory.

Managed Memory Use Cases

Note: Currently we do not have batch operators and RocksDB state backend co-exist in a slot, since they are dedicated for batch and streaming jobs respectively. However, the memory management mechanism should not be aware of the differences between streaming and batch.

Note: The `?` stands for any future managed memory use cases.

Currently, there are three managed memory use cases: batch operators, RocksDB state backend, and python operators. More categories might be added in future.

For clarity, we categorize these use cases into the following two kinds.

  • Per-Op: Each operator has its own memory budget. Memory is allocated/reserved independently by each operator, and is not shared between different operators.
    • Batch operators
  • Per-Slot: For each memory use case in the slot, only one piece of memory will be allocated/reserved, which is shared within the slot.
    • RocksDB state backend, python process

Combinations of managed memory use cases can be summarized as follows. (New scenarios are bolded.)

  • Streaming
    • None
    • RocksDB state backend
    • Python operators
    • RocksDB state backend + python operators
  • Batch
    • None
    • Batch operators
    • Python operators
    • Batch operators + python operators

Goals

The proposed design should allow sharing managed memory for all the use cases, while achieving the following goals.

  • Never reserve unused managed memory.
    • Unless the job does not have any managed memory use cases, in which cases users should configure managed memory to 0 for the Flink cluster.
  • In existing scenarios (without python operators), the design should not require any configuration changes from the users.
  • The design should be extensible to potential new managed memory use cases in future.

Public Interfaces

This FLIP introduces new public configuration option `taskmanager.memory.managed.consumer-weights`. See Configure Weights for Use Cases for details.

In addition, this FLIP contains changes to existing public involving configuration options for python use cases. See Compatibility, Deprecation, and Migration Plan for details.

Proposed Changes

This proposal extends the fraction based managed memory to work with various use cases at same time. Managed memory is first distributed to all use cases. Then for per-op use cases, the distributed memory is further distributed to the operators. The resulting memory distribution plan will be converted into fractions of the slot’s total managed memory, which can be accessed at execution time for allocating/reserving memory accordingly.

Calculate Fractions for Use Cases

During the compiling stage, StreamJobGraphGenerator should first calculate a fraction for each managed memory use case, indicating what proportion of the slot’s managed memory each use case should take.

To calculate the fractions, each use case should have a managed memory weight configured (by user or planner), and operators should declare what use case they have.

Then the fractions will be calculated as follows.

Where,

  • f_u is the fraction of use case u,
  • w_u is the weight of use case u, and
  • D is the set of all use cases that are declared by operators in the slot.

Configure Weights for Use Cases

We propose to configure weights of managed memory use cases via configuration options. 

  • `taskmanager.memory.managed.consumer-weights`: A map of managed memory use cases and their integer weights. ATM, valid use cases are:
    • DATAPROC : This is a combined configuration option for RocksDB and batch operators, which are guaranteed not mixed.
    • PYTHON

Exposing the weights through configuration options will allow users to tune the weights easily without touching the codes. The limitation it that, users cannot specify different weights for jobs running in the same Flink cluster, which should be fair enough as the first step. If later we see that specifying different weights for jobs is needed, we can expose the weights via ExecutionConfig, so that user can overwrite the weights for a specific job.

To be specific, the proposed interface is as follows.

If not explicitly specified, the following weights will be used by default.

{
    DATAPROC : 70,
    PYTHON : 30
}

The default weights should result in the same behavior as before in existing scenarios (without python operators). All managed memory should be used for RocksDB state backend for streaming jobs, since it’s the only occurring use case with non zero weight. Same for batch operators in batch jobs.

Declare Use Cases

To prevent reserving managed memory for use cases which do not occur, StreamJobGraphGenerator should only take into consideration the use cases that are actually used by the operators in the slot. Therefore, operators need to declare their managed memory use cases.

A use case that is not declared by any operators in the slot will not get any managed memory (fraction  = 0) even if its weight is configured non-zero in the execution config. On the other hand, if a use case declared by operators is not contained in the configured weights, Flink should fail with a descriptive error message.

To be specific, we propose to replace Transformation.setManagedMemoryWeight with the following interface.

@Internal
public class Transformation {
    // ...
    // replacing: public void setManagedMemoryWeight(int managedMemoryWeight);
    public void declareManagedMemoryUseCase(MemoryUseCase memoryUseCase, int operatorWeight);
}

public enum MemoryUseCase {
    BATCH_OP,
    ROCKSDB,
    PYTHON;

    public final Scope scope;

    public enum Scope {
        SLOT,
        OP
    }
}

operatorWeight will be used for further computing operator fractions for per-op use cases, while ignored for per-slot use cases.

Retrieve Fractions on Execution

The calculated fractions are set to StreamConfigs as a map. Keys of the map are use cases. For a per-slot use case, the value is the fraction for the use case, which is shared by all operators in the slot. For a per-op use case, the value is the operator’s fraction for the use case, which is dedicated for the operator.

@Internal
public class StreamConfig {
    // ...
    // replacing: public void setManagedMemoryFraction(double managedMemFraction);
    public void setManagedMemoryFractions(Map<MemoryUseCase, Double> managedMemoryFractions);

    // replacing: public double getManagedMemoryFraction();
    public Map<MemoryUseCase, Double> getManagedMemoryFractions();
}

Extensibility

The proposed design is extensible to future managed memory use cases. With the introduction of enum class MemoryUseCase and usage of maps for passing weights and fractions, adding a new use case should be as simple as adding a new value to the enum.

Implementation Steps

  1. Introduce the memory weights configuration option.
  2. Implement the new fraction calculation logic. This also means migrating the batch operator use cases.
  3. Make RocksDB respect the calculated fraction.
  4. Make python processes uses memory manager's share resources, with respect to the calculated fraction.

Compatibility, Deprecation, and Migration Plan

This FLIP introduces incompatible changes for the Python UDF use cases.

  • `python.fn-execution.memory.managed` should be removed, since python processes should always use managed memory.
  • Semantics of `python.fn-execution.framework.memory.size` and `python.fn-execution.buffer.memory.size` should be changed. These options no longer decide the memory used for python processes. They can still be used for sanity checks to make sure python processes get minimum required memory.

This FLIP should be compatible with previous versions in other use cases without python UDFs.

  • Changes to public APIs are all additions. No modifications or removals.
  • The changes should not require any configuration changes for use cases without python UDFs.
  • The changes do not affect DataSet use cases, where fractions are configured, calculated and passed to task execution in a completely different code path.

Test Plan

The changes introduced by this FLIP should be verified with unit tests.

Rejected Alternatives

This FLIP proposes to first distribute managed memory to use cases, according to their configured weights and whether they are declared by operators. Then further distribute memory for per-op use cases.

With the proposed approach, for per-op use cases, the operator weights will not affect how memory is distributed across use cases. E.g., a slot needs a lot memory for RocksDB state backend and only a few memory for python operators, and a slot needs a few memory for RocksDB state backend and a lot memory for python operators, assuming they are from the same job, would have managed memory distributed among use cases in the same way.

An alternative is to not configure weights for use cases, but to configure a weight for each operator and each use case. E.g., a slot contains 2 batch operators (b1 with weight 30 and b2 with weight 40) and 3 python operators (p1 with weight 20, p2 and p3 with weight 5), will have 70% of its managed memory for batch operators (b1 30%, b2 40%) and 30% for python operators (p1 20%, p2 5%, p3 5%).

This alternative approach dynamically adjusts the memory distribution among various use cases based on how operators share the slots, which brings the following benefits.

  • Users do not need to understand how operators share slots.
  • Changing of slot sharing groups will not require changing of memory weights.
  • For jobs split into multiple slot sharing groups, it allows independent memory distribution over different slots.

The limitations of this alternative are that

  • It complicates the memory configuring by requiring specifying operator weights for per-slot use cases.
    • It is not absolutely necessary, given that memory is shared by all operators in the slot, unlike per-op use cases where each operator needs to know exactly how much memory to use.
  • It requires defining weights consistently across various use cases.
    • It would be hard to decide the relative relations between different use cases, such as “if rocksdb uses 100MB, then the python process should use 50MB”.
  • Memory configuration for different use cases are closely coupled.
    • It would be hard for memory tuning, if changing the weight of one operator for one use case will result in memory changes for all other operators and use cases.

The benefits of the alternative approach are strongly based on the assumption that operator weights are properly configured, which is unlikely. Therefore, we have chosen the proposed approach, which might be less optimal but easier to use.