Page tree
Skip to end of metadata
Go to start of metadata

Developing with the Python SDK

  • Gradle can build and test Python, and the Jenkins jobs use it, so it needs to be maintained.
  • You can directly use the Python toolchain instead of having Gradle orchestrate it, which may be faster for you, but it is your preference. If you want to use Python tools directly, we recommend setting up a virtual environment before testing your code.
  • If you update any of the cythonized files in Python SDK, you must install the cython package before running the following command to test your code:

    git diff origin/master 
  • The commands below assume that you're in the SDKs/python directory.

Virtual Environment Setup

Setting up a virtualenv is required for running tests directly, such as via pytest or an IDE like PyCharm. This installs Python SDK from the source and includes the test and GCP dependencies.

On macOS/Linux

  1. Use the following code:

    # Initialize virtual environment called "env" in ~/.virtualenvs or any other directory. (Consider using pyenv, to manage the python version as well as installed packages in your virtual environment)
    $ chmod +x ~/.virtualenvs/env/bin/activate  
    $ . ~/.virtualenvs/env/bin/activate
    # Upgrade other tools. (Optional)
    pip install --upgrade pip
    pip install --upgrade setuptools
    # Install requirements.
    (env) $ pip install -r build-requirements.txt
    # Install Apache Beam package in editable mode.
    (env) $ pip install -e .[gcp,test]

On Windows

  1. Use the following code:

    > c:\Python37\python.exe -m venv c:\path\to\env
    > c:\path\to\env\Scripts\activate.bat 
    # Powershell users should run instead: 
    > c:\path\to\env\Scripts\activate.ps1 
    (env) > pip install -e .[gcp,test]
  2. You can deactivate the virtualenv when done.

    (env) $ deactivate

Virtual Environments with pyenv

  • A more advanced option, pyenv allows you to download, build, and install locally any version of Python, regardless of which versions your distribution supports.
  • pyenv also has a virtualenv plugin, which manages the creation and activation of virtualenvs.
  • The caveat is that you'll have to take care of any build dependencies, and those are probably still constrained by your distribution.
  • These instructions were made on a Linux/Debian system.

Error: No module named distutils. (23/07/2021)

As of 23/07/2021, users of some versions of Debian are currently experiencing the error "ModuleNotFoundError: No module named 'distutils.util'" when using the Python Beam SDK. This can be fixed by using pyenv for your Python installation, rather than relying on the packages included with the Debian distribution.

How to setup pyenv (with pyenv-virtualenv plugin)

  1. Install prerequisites for your distribution.

  2. curl | bash
  3. Add the required lines to ~/.bashrc (as returned by the script).
  4. Open a new shell.

Example on Ubuntu:

# Install pyenv deps
sudo apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev \
libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \
xz-utils tk-dev libffi-dev liblzma-dev python-openssl git

# Install pyenv, and pyenv-virtualenv plugin
curl | bash

# Run the outputted commands to add pyenv to your PATH

Example: How to Run Unit Tests with PyCharm Using Python 3.7.9 in a virtualenv

  1. Install Python 3.7.9 and create a virtualenv
    • pyenv install 3.7.9
    • pyenv virtualenv 3.7.9 ENV_NAME
    • pyenv activate ENV_NAME
  2. Upgrade packages (recommended)

    pip install --upgrade pip setuptools
  3. Set up PyCharm
    1. Start by adding a new project interpreter (from the bottom right or in Settings).
    2. Select Existing environment and the interpreter, which should be under ~/.pyenv/versions/3.7.9/envs/ENV_NAME/bin/python or ~/.pyenv/versions/ENV_NAME/bin/python.
    3. Switch interpreters at the bottom right.

Cleaning up environments

To delete all environments created with pyenv, run:

pyenv virtualenvs --bare --skip-aliases | xargs -n 1 pyenv virtualenv-delete -f


If you have issues, find troubleshooting at pyenv common build problems.

Running Tests

Running Tests Using pytest

If you've set up a virtualenv above, you can now run tests directly using pytest.

(env) $ pytest  # all tests

# You can also select specific tests. Try out these examples.
(env) $ pytest -v -k test_progress
(env) $ pytest -v -k TextSourceTest
(env) $ pytest -v apache_beam/io/
(env) $ pytest -v apache_beam/io/

Running Tests Using tox

Here are some tips for running tests using tox:

  • Tox does not require a virtualenv with Beam + dependencies installed. It creates its own.
  • It also runs tests faster, utilizing multiple processes (via pytest-xdist).
  • For a list of environments, run tox -l.
  • tox also supports passing arguments after double dashes to pytest.

Execute the following code for running tests using tox:

(env) $ pip install tox
(env) $ tox -e py38-cloud  # all tests
(env) $ tox -e py38 -- -k test_progress

Lint and Formatting Checks

Beam codebase enforces consistency of the code style and formatting rules using linters and an autoformatting tool yapf.

  • To run all consistency checks, run the following commands:

    pip install tox
    ../../gradlew lint       # Runs several linter checks
    tox -e py3-yapf-check    # Runs code formatting checks
  • To auto-format the code in place, run:

    tox -e py3-yapf

Running lint and yapf Automatically on Each Commit with pre-commit Hooks

The pre-commit tool allows you to run pre-configured checks automatically when you commit changes with `git commit`.

  • To enable pre-commit, run: 

    pip install pre-commit
    pre-commit install

    When the pre-commit hook for yapf  applies formatting changes in place, the check  fails with an error  files were modified by this hook, and you have to re-run `git commit`.

  • To disable the pre-commit, run: 

    pre-commit uninstall

Running yapf formatter manually

To run manually:

  1. Install yapf.  

    pip install yapf==0.29.0

    For consistency, use the current version of yapf in

  2. To format changed files in your branch:

    # Run from root beam repo dir
    git diff master --name-only | grep "\.py$" | xargs yapf --in-place
  3. To format just a single directory:

    yapf --in-place --parallel --recursive apache_beam/path/to/files
  4. To format files with uncommitted changes, run: 

    git diff --name-only | grep "\.py$" | xargs yapf --in-place
  5. If you need to exclude one particular file or pattern from formatting, add it to the .yapfignore file (sdks/python/.yapfignore).

Remote testing

You required this step only for testing SDK code changes remotely (not using directrunner). To do this you must build the Beam tarball.

  1. From the root of the git repository, run:

    cd sdks/python/
    python sdist
  2. Pass the --sdk_location flag to use the newly built version. For example:

    python sdist > /dev/null && \
      python -m apache_beam.examples.wordcount ... \
      --sdk_location dist/apache-beam-2.5.0.dev0.tar.gz

Run hello world against modified SDK Harness

To run a hello world against modified SDK Harness, execute the following code:

# Build the Flink job server (default job server for PortableRunner) that stores the container locally.
./gradlew :runners:flink:1.7:job-server:container:docker

# Build portable SDK Harness, which builds and stores the container locally.
# Build for all python versions
./gradlew :sdks:python:container:buildAll
# Or build for a specific python version, such as py35
./gradlew :sdks:python:container:py35:docker

# Run the pipeline.
python -m apache_beam.examples.wordcount --runner PortableRunner --input <local input file> --output <local output file>

Run hello world against modified Dataflow Fn API Runner Harness and SDK Harness

To run a hello world against modified Dataflow Fn API Runner Harness and SDK Harness, execute the following code:

# Build portable worker
./gradlew :runners:google-cloud-dataflow-java:worker:build -x spotlessJava -x rat -x test
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar

# Build portable Pyhon SDK harness and publish it to GCP
./gradlew$USER/beam -p sdks/python/container docker
gcloud docker -- push$USER/beam/python:latest

# Initialize python
cd sdks/python
virtualenv env
. ./env/bin/activate

# run pipeline
python -m apache_beam.examples.wordcount   --runner DataflowRunner   --num_workers 1   --project <gcp_project_name>   --output <gs://path>   --temp_location <gs://path>   --worker_harness_container_image$USER/beam/python:latest   --experiment beam_fn_api   --sdk_location build/apache-beam-2.12.0.dev0.tar.gz   --dataflow_worker_jar './runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-fn-api-worker-2.12.0-SNAPSHOT.jar'   --debug

Run Integration Test

To run an integration test, execute the following code:

Running integration test
clear && clear && ./scripts/ --test_opts "--tests=apache_beam.runners.dataflow.dataflow_exercise_streaming_metrics_pipeline_test --nocapture" --project <gcs_project_name> --gcs_location gs://<bucket_path> --kms_key_name "" --streaming true --worker_jar <Path_To_Jar_Binary>

Pass –sdk_location flag if tar ball is needed and built from python sdist, otherwise tar ball under default location (target directory of Gradle build) will be used.

Run a ValidatesRunner test

To run a ValidatesRunner test, execute the following code:

Running a ValidatesRunner test
./scripts/ --test_opts "--attr=ValidatesRunner --tests=apache_beam.transforms.util_test --nocapture" --project <gcs_project_name> --gcs_location gs://<bucket_path> --kms_key_name "" --worker_jar <Path_To_Jar_Binary> --sdk_location ./dist/apache-beam-<version>.dev0.tar.gz --streaming true

This will run all tests with ValidatesRunner an attribute from apache_beam/transforms/ in streaming mode. You can manually edit the attribute in the test file so that ValidatesRunner1 limit which tests you would like to run.

Run Integration Test from IDE

To run an integration test from an IDE in a debug mode, you can create a Nosetests configuration. For example, to run a VR test on Dataflow runner from IntelliJ/PyCharm, you can adjust the configuration as follows:

  1. Set Target to Module and point to the test file. 
  2. Set Additional arguments (sample, adjust as needed):  

    Running a ValidatesRunner test
    --test-pipeline-options="--runner=TestDataflowRunner --project=<YOUR PROJECT> --region=us-central1 --temp_location=gs://your_bucket/tmp --sdk_location=./dist/apache-beam-<version>.dev0.tar.gz  --requirements_file=./postcommit_requirements.txt --num_workers=1 --sleep_secs=20" --attr=ValidatesRunner1 --nocapture
  3. Set Working directory to /path/to/beam/sdks/python

Run a screen diff integration Test for Interactive Beam

For Interactive Beam/Notebooks, we need to verify if the visual presentation of executing a notebook is stable. A screen diff integration test that executes a test notebook and compares results with a golden screenshot does the trick. To run a screen diff integration Test for Interactive Beam:

  1. Execute the following code for preparation work:

    Test dependencies
    # Install additional Python dependencies if absent, under beam/sdks/python, run:
    pip install -e .[interactive,interactive_test,test]
    # The tests use headless chrome to render visual elements, make sure the machine has chrome executable installed.
    # If you're reading this document in a chrome browser, you're good to go for this step.
    # Otherwise, e.g., on a Linux machine, you might want to do:
    wget --quiet && \
        apt install -y ./google-chrome-stable_current_amd64.deb
    # As chrome version advances/differs, the chromedriver-binary needs to stay in sync with chrome.
    # Below is a bash example to dynamically install the correct chromedriver-binary.
    google_chrome_version=$(google-chrome --product-version)
    pip install "chromedriver-binary>=${chromedriver_lower_version},<${chromedriver_upper_version}"
    # For consistency of screenshots, roboto-mono font should have been installed.
    # You can download the font from
    # Otherwise, you can install through CLI, e.g., on Linux:
    wget --content-disposition -P /usr/share/fonts/truetype/robotomono \{Bold,BoldItalic,Italic,Light,LightItalic,Medium,MediumItalic,Regular,Thin,ThinItalic}.ttf?raw=true
  2. To run the tests:

    Running screen diff integration test
    # Under beam/sdks/python, run:
    pytest -v apache_beam/runners/interactive/testing/integration/tests
    # TL;DR: you can use other test modules, such as nosetests and unittest:
    nosetests apache_beam/runners/interactive/testing/integration/tests
    python -m unittest apache_beam/runners/interactive/testing/integration/tests/
    # To generate new golden screenshots for screen diff comparison:
    nosetests apache_beam/runners/interactive/testing/integration/tests --with-save-baseline
  3. Golden screenshots are temporarily taken and stored by the system platform. The currently supported platforms are Darwin (macOS) and Linux.

  4. Each test will generate a stable unique hexadecimal id. The golden screenshots are named after that id.

  5. To add new tests, put a new test notebook file (.ipynb) under the apache_beam/runners/interactive/testing/integration/test_notebooks directory.

  6. Add a single test under apache_beam/runners/interactive/testing/integration/tests directory. A test is simple as:

    from apache_beam.runners.interactive.testing.integration.screen_diff import BaseTestCase
    class SimpleTest(BaseTestCase):
      def test_simple_notebook(self):
        self.assert_notebook('simple_notebook')  # Just put the notebook file name here, e.g., 'simple_notebook'.

How to Install an Unreleased Python SDK without Building It 

SDK source zip archive and wheels are continuously built after merged commits to

  1. Click on a recent `Build python source distribution and wheels job` that ran successfully on the master branch from this list
  2. Click on List files on Google Cloud Storage Bucket on the right-side panel.
  3. Expand List file on Google Cloud Storage Bucket in the main panel.
  4. Locate and Download the ZIP file. For example, from GCS.
  5. Install the downloaded zip file. For example:

    pip install
    # Or, if you need extra dependencies:
    pip install[aws,gcp]
  6. When you run your Beam pipeline, pass in the --sdk_location flag pointed at the same ZIP file. 

  • No labels