Following investigation avenues might be helpful when investigating a suspected  memory leak.

Identify a minimal pipeline that reliably reproduces the leak 

  • Identify a pipeline that reliably reproduces the  leak. Use runner memory monitoring capabilities to confirm the increase in memory consumption overtime. 
    • For example, on Linux, one can use the `top` utility that can report memory allocated for a particular  process and available memory. Note that Linux might allocate significant amounts of RAM for disk caches, which might appear as if the system were low on ram.
    • Dataflow UI has JOB METRICS -> Memory Utilization page that can provide this information. 
    • When executing on the local runner, memory consumption via tools provided for local OS. 
  • If you can run the pipeline with the `--runner=DirectRunner`  option, you might be able to make the leak more pronounced by making every bundle run multiple times using `--direct_runner_bundle_repeat` pipeline option.
  • If the Direct runner cannot be used to repro, try to  reproduce the leak when the pipeline runs only on 1 worker. It might also help to limit the execution only to a single process on the worker. DataflowRunner users could limit execution to 1 process on 1 worker by specifying: `--num_workers=1 --max_num_workers=1 --experiments=no_use_multiple_sdk_containers` , and further reduce  parallelism if necessary using `--number_of_worker_harness_threads=1`. 

Determine if the leak is on Python Heap or a leak in native allocations

Memory usage growth can happen if a Python process accumulates references to objects on the heap that are no longer being used. Because references are kept, these objects are not  garbage-collected. In this case, the objects will be visible to the Python garbage collector.  

There are couple of ways how Python heap could be inspected:

Memory leak can happen when C/C++ memory allocations are not released. This leak could be caused by Python extensions used in user code, Beam SDK or its dependencies, or (unlikely but possible) by the Python interpreter itself. Such leaks might not be visible when inspecting objects that live in the Python interpreter heap, but might  be visible when inspecting allocations performed by the Python interpreter process. It may require  a memory profiler that tracks native memory allocations (see below), or substituting a default memory allocator to a custom allocator such as tcmalloc, that can help analyze the heap dump for the entire process (example: https://github.com/apache/beam/issues/28246#issuecomment-1918087115).       

Identify whether you have a memory leak or high peak memory usage

See: https://cloud.google.com/dataflow/docs/guides/troubleshoot-oom for discussion of how to evaluate peak memory usage. If the pipeline  suffers from memory fragmentation due to inefficient allocations, you might be able to reduce memory footprint by switching the default memory allocator to a different one, such as jemalloc or tcmalloc. Substituting the allocator can be done in a custom container.

A Dockerfile for a custom container that substitutes memory allocator might look like the following:

FROM apache/beam_python3.10_sdk:2.53.0
RUN apt update ; apt install -y libjemalloc2
# Note: this enables jemalloc globally for all applications running in the container 
ENV LD_PRELOAD /usr/lib/x86_64-linux-gnu/libjemalloc.so.2

Consider using an external profiler

Using an off-the-shelf memory profiler, such as memray (see: https://pypi.org/project/memray/),  can be effective to identify leaks and memory usage patterns, but requires additional instrumentation. 

Profiling Beam Python pipeline is most effective when the Python program that leaks memory (such as Python SDK harness), is launched by the profiler as opposed to attaching the profiler at runtime, after the process has already started. If the leak can be reproduced in Python direct runner this is straightforward, (example: https://github.com/apache/beam/issues/28246#issuecomment-1918120673 ). If however the leak is only reproduced in a running pipeline, starting SDK harness from the profiler requires changing the code that launches the SDK harness, such as the Beam SDK harness container entrypoint, example: https://github.com/apache/beam/pull/30151/files.

Such modification requires providing a custom container (example: https://github.com/apache/beam/issues/28246#issuecomment-1918101657

It is best if the profiler can collect and output memory allocation statistics while the process is still running. Some tools only output the collected data after the process under investigation  terminates, in which case collecting and accessing profiles from workers might be  more problematic. Retrieving profiles may require connecting to the worker and fetching profiles from the running SDK container (example: https://github.com/apache/beam/issues/28246#issuecomment-1918114643),  if the profiler stores the collected output on the container instance. 

Analyzing the profile and creating reports from the collected profile needs to happen in the environment where the profiled binary runs, and since profiler might need to access symbols from shared libraries used by the profiled binary. Therefore, reports should be generated in the same running container, or in a container created from the same image. Once reports have been generated, they can be extracted and further inspected on a standalone machine. 

Memray has many different reporters that can analyze the profiling data.  It may be worth trying several reporters as depending on the source of the leak, some might work better than others. Memray also supports tracking allocations by native code, and profiles can be collected and analyzed while the process under investigation is still running.

The https://github.com/apache/beam/issues/20298 issue  tracks future improvements to make memory profiling easier. 

  • No labels