Skip to end of metadata
Go to start of metadata

Introduction

 

Memory management in Flink serves the purpose to control how much memory certain runtime operations use.
The memory management is used for all operations that accumulate a (potentially large) number or records.

Typical examples of such operations are

  • Sorting - Sorting is used to order records for grouping, joining, or to produce sorted results.
  • Hash Tables - Hash tables are used in Joins and for the Solution set in iterations (pending work to use them for grouping/aggregations)
  • Caching - Caching data is important for iterative algorithms, and for checkpoints during job recovery.
  • (Block-)Nested-Loop-Join - This algorithm is used for Cartesian products between data sets.

Without a means to manage/control the memory, these operations would fail when the data to be sorted (or hashed) was larger than the
memory that the JVM could spare (usually with an OutOfMemoryException). The memory management is a way to control very
precisely how much memory each operator uses, and to let them de-stage efficiently to out-of-core operation, by moving some of the
data do disk. How exactly that happens is dependent on the specific operation/algorithm (see below).

The memory management allows also to divide memory between the different memory-consuming operators in the same JVM.
That way, Flink can make sure that different operators run next to each other in the same JVM, and do not interfere with each other,
but stay within their memory budget. 

 

Note: As of this point, the memory management is used only in the batch operators. The streaming operators follow a different concept.

Flink's Managed Memory

Conceptually, Flink splits the heap into three regions:

  • Network buffers: A number of 32 KiByte buffers used by the network stack to buffer records for network transfer. Allocated on TaskManager startup. By default 2048 buffers are used, but can be adjusted via "taskmanager.network.numberOfBuffers".

  • Memory Manager pool: A large collection of buffers (32 KiBytes) that are used by all runtime algorithms whenever they need to buffer records. Records are stored in serialized form in those blocks.
    The memory manager allocates these buffers at startup.

  • Remaining (Free) Heap: This part of the heap is left to the user code and the TaskManager's data structures. Since those data structures are rather small, that memory is mostly available to the user code.

              

 

While allocating the network and MemoryManager buffers, the JVM usually performs one or more full garbage collections.
This adds some time to the TaskManager's startup, but saves in garbage collection time later, when tasks are executed.

Both the network buffers and the Memory Manager buffers live throughout the entire life of a TaskManager.
They move to the tenured generation of the JVM's internal memory regions and become long live, non-collected objects.

Notes:

  • The size of the buffers can be adjusted via "taskmanager.network.bufferSizeInBytes", but 32K seems to be a good size for most setups.
  • There are ideas about how to unify the NetworkBuffer Pool and the Memory Manager region.
  • There are ideas to add a mode that makes the allocation of the memory buffers by the MemoryManager lazy (allocated when needed). This decreases the startup time of the TaskManager, but will cause more garbage collections later when the buffers are actually allocated.

Memory Segments

Flink represents all its memory as a collection of Memory Segments. The segment represents a region of memory (by default 32 KiBytes) and
provides methods to access the data at offsets (get and put longs, int, bytes, copy between segments and arrays, ...).
You can think of it as a version of the java.nio.ByteBuffer that is specialized for Flink (see below why we are not using the java.nio.ByteBuffer). 

Whenever Flink stores a record somewhere, it actually serializes it into one or more memory segments. The system may store a "pointer" to the record
in another data structure (frequently also build as a collection of memory segments).
This means that Flink relies on efficient serialization that is aware of pages and breaking records across pages.
Flink brings its own type information system and serialization stack for exactly that purpose. 

 

A set of records serialized in Memory Segments.

 

The serialization format is defined by Flink's serializers and is aware of the individual fields of the records. Even though this feature is not yet extensively used
currently, it allows to partially deserialize records during processing, to increase performance.

To further improve the performance, the algorithms that use the Memory Segments try to work on the serialized data to a large extend. This is
made possible by the extended type utility classes TypeSerializer and TypeComparator. For example, most of the comparisons in the
sorter boil down to comparing the bytes in some pages (like a memcmp). That way, working with serialization and serialized data has competitive performance, while being able to control the amount of allocated memory.


The MemoryManager pools the Memory Segments and gives them out to algorithms requesting them and gathers them back after the algorithms are
done. The algorithms hence explicitly request memory (think of malloc for 32k) and release it.
Algorithms have a strict budget of how much memory they may use (how many pages). When this memory is used up, they have to fall back to
an out-of-core variant. This is a highly robust mechanism that does not suffer from heap fragmentation and estimates about data sizes.

Since the data is already in serialized pages, it is very easy to move the data between memory and disk.

Memory Segments versus ByteBuffers

Why is Flink not simply using the java.nio.ByteBuffer? The MemorySegment has a few benefits over the ByteBuffer:

  • It uses the sun.misc.Unsafe methods on its byte arrays, thus the fetching of types like long is much cheaper (no bit shifting necessary).
  • It has only absolute get/put methods and has those for all types, which makes it thread safe. The byte buffer is missing
    the absolute get/put for byte arrays and other ByteBuffers, forcing you to resort to relative methods, which require locking or giving up thread safety.
  • Since we have only one final implementation class of the MemorySegment, the JIT can perform better work in
    de-virtualizing and inlining the methods than for the ByteBuffer, which exists in at least 5 different implementations.

Impact on Garbage Collection

This mechanism of using memory has good implications on the garbage collection behavior of Flink.

Flink does not gather any records as objects, but stores them serialized inside the long lived buffers. That means there
are effectively no long-lived records - records exist only to be passed through user functions and to be serialized into
the memory segments. The long lived objects are the memory segments themselves, which are never garbage collected.

When running Flink tasks, the JVM will perform many garbage collections on short lived objects in the "new generation".
These garbage collections are typically very cheap. Tenured generation garbage collections (long and expensive) should
occur rarely, since the tenured generation has almost only the never-collected buffers.

This works best when the size of the OldGen (tenured) heap matches the aggregate size of network buffers and
MemoryManager. This ratio is controlled by the JVM option -XX:NewRatio , which defines how many times as
large the OldGen is, compared to the NewGen.

By default, Flink aims to use settings where the OldGen to be twice as large as the NewGen (-XX:NewRatio=2, JVM default on newer GCs)
and the MemoryManager and NetworkBuffers to use 70% of the heap. This should let the memory pools overlap (roughly).

 

          

 

Configuration

Flink tries to keep the configuration of the memory manager very simple. The configuration only defines how much memory is to be
used by the memory manager (and thus available for sorting, hashing, caching, ...).
The remaining heap space will be left for the user's functions and the TaskManager data structures (which are usually rather small,
so the vast majority is available to the user functions).

The amount of Flink's managed memory can be configured in two ways:

  • Relative value (default mode): In that mode, the MemoryManager will evaluate how much heap space is left after all other TaskManager services have been started.
    It will then allocate a certain fraction of that space (by default 0.7) as managed pages. The fraction can be specified via "taskmanager.memory.fraction".

  • Absolute value: When specifying "taskmanager.memory.size" in the flink-conf.yaml, the MemoryManager will allocate that many megabytes of memory as managed pages upon startup.

Off Heap Memory

Since all access to the managed memory in Flink is abstracted behind the MemorySegment class, we can easily add a variant of the MemorySegment that
is backed not by a byte[], but by some memory outside the JVM heap.

This pull request introduces exactly that implementation (work in progress). The basic idea is very similar to the case of Java's ByteBuffer,
where various implementations exist - heap byte array backed, or direct memory backed. The argument why we do not simply use the ByteBuffer is the
same as above (see section Memory Segments).


Note: This realization of Off-Heap memory goes much further than storing the results of operators somewhere outside the JVM
(like a memory mapped file or a distributed memory file system). With that addition, Flink can actually do all its work on data (sorting, joining) outside the JVM heap,
letting sort buffers and hash tables grow to sizes that are very challenging for the garbage-collected heap.

 

Since the off-heap memory is not garbage collected, the amount of memory to garbage collect becomes much smaller. This may be a serious improvement
for JVM heap sizes in the 100s of gigabytes. Further more, the off-heap memory can be zero-copy spilled to disk/ssd and zero-copy sent through the network.

 

Considerations for using Off-Heap memory:

  • The JVM needs to be configured properly. The heap size becomes much smaller, and the maximum amount of direct memory should be increased (-XX:MaxDirectMemorySize)
  • The system has two implementation of MemorySegment, the HeapMemorySegment and the OffHeapMemorySegment. The MemorySegment class must be abstract,
    as do the methods to put/get data. The very best JIT / inlineing characteristics can be achieved when only one of the two classes is ever loaded (de-virtualization via class hierarchy analysis)

Algorithms with managed memory

The Flink approach to memory management is not simply pluggable into any Java application. It requires the algorithms to be implemented in a way that
they use the Memory Segments, rather than just using Java objects and arrays.

The default in-memory sorter with managed memory is the NormalizedKeySorter, the default hash join with managed memory is the MutableHashTable

The figure below shows the basic behavior of algorithms putting data in to memory segments, and moving the memory segments to and from disk, if the
pool of pages runs out. The algorithm implementations make sure that pages are moved to disk or SSD in batches and asynchronously, to preserve performance. 

 

  • No labels