...
- Identify a pipeline that reliably reproduces the leak. Use runner memory monitoring capabilities to confirm the increase in memory consumption overtime.
- Dataflow UI has JOB METRICS -> Memory Utilization page that can provide this information.
- When executing on the local runner, memory consumption via tools provided for local OS.
- For example, on Linux, one can use the `top` utility that can report memory allocated for a particular process and available memory. Note that Linux might allocate significant amounts of RAM for disk caches, which might appear as if the system were low on ram.
- Dataflow UI has JOB METRICS -> Memory Utilization page that can provide this information.
- When executing on the local runner, memory consumption via tools provided for local OS.
- If you can run the pipeline with the `--runner=DirectRunner` DirectRunner` option, you might be able to make the leak more pronounced by making every bundle run multiple times using `--direct_runner_bundle_repeat` pipeline option.
- If the Direct runner cannot be used to repro, try to reproduce the leak when the pipeline runs only on 1 worker. It might also help to limit the execution only to a single process on the worker. DataflowRunner users could limit execution to 1 process on 1 worker by specifying: `
--num_workers=1 --max_num_workers=1 --experiments=no_use_multiple_sdk_containers`
, and further reduce parallelism if necessary using `--number_of_worker_harness_threads=1`.
Determine if the leak is on Python Heap or a leak in native allocations
...
Memory leak can happen when C/C++ memory allocations are not released. This leak could be caused by Python extensions used in user code, Beam SDK or its dependencies, or (unlikely but possible) by the Python interpreter itself. Such leaks might not be visible when inspecting objects that live in the Python interpreter heap, but might be visible when inspecting allocations performed by the Python interpreter process. It may require a memory profiler that tracks native memory allocations (see below), or substituting a default memory allocator to a custom allocator such as tcmalloc, that can help analyze the heap dump for the entire process (example: https://github.com/apache/beam/issues/28246#issuecomment-1918087115).
Identify whether you have a memory leak or high peak memory usage
See: https://cloud.google.com/dataflow/docs/guides/troubleshoot-oom for discussion of how to evaluate peak memory usage. If the pipeline suffers from memory fragmentation due to inefficient allocations, you might be able to reduce memory footprint by switching the default memory allocator to a different one, such as jemalloc or tcmalloc. Substituting the allocator can be done in a custom container.
A Dockerfile
for a custom container that substitutes memory allocator might look like the following:
FROM apache/beam_python3.10_sdk:2.53.0
RUN apt update ; apt install -y libjemalloc2
# Note: this enables jemalloc globally for all applications running in the container
ENV LD_PRELOAD /usr/lib/x86_64-linux-gnu/libjemalloc.so.2
Consider using an external profiler
...