Page tree
Skip to end of metadata
Go to start of metadata

Developing with the Python SDK

Gradle can build and test python, and is used by the Jenkins jobs, so needs to be maintained.

You can directly use the Python toolchain instead of having Gradle orchestrate it, which may be faster for you, but it is your preference. If you do want to use Python tools directly, we recommend setting up a virtual environment before testing your code.

If you update any of the cythonized files in Python SDK, you must install the cython package before running following command to properly test your code.

The commands below assume that you're in the sdks/python directory.

Virtual Environment Setup

Setting up a virtualenv is required for running tests directly, such as via pytest or an IDE like PyCharm.

This installs Python SDK from source and includes the test and gcp dependencies.

On macOS/Linux:

# Initialize virtual environment called "env" in local directory.
$ virtualenv env

# Activate virtual environment.
$ . ./env/bin/activate

# Install requirements.
(env) $ pip install -r build-requirements.txt

# Install packages.
(env) $ pip install -e .[gcp,test]

On Windows:

> c:\Python27\python.exe -m virtualenv > env\Scripts\activate (env) > pip install -e .[gcp,test]

You can deactivate the virtualenv when done.

(env) $ deactivate

Virtual Environments with pyenv

A more advanced option, pyenv allows you to download, build, and install locally any version of Python, regardless of which versions your distribution supports.

pyenv also has a virtualenv plugin, which manages the creation and activation of virtualenvs.

The caveat is that you'll have to take care of any build dependencies, and those are probably still constrained by your distribution.

These instructions were made on a Linux/Debian system.

Setup pyenv (with virtualenv plugin)

  • Install prerequisites for your distribution.
  • curl | bash
  • Add the required lines to ~/.bashrc (as returned by the script) and open a new shell.

Example: Running unit tests with PyCharm using Python 3.5.2 in a virtualenv

Install Python 3.5.2 and create a virtualenv

  • Optional (install older library development files required by 3.5.2):
        apt install libssl1.0-dev
  • pyenv install 3.5.2
  • Optional (restore newest library development files):
        apt install libssl-dev
  • pyenv virtualenv 3.5.2 ENV_NAME

Upgrade packages (optional)

  • pyenv activate ENV_NAME
  • pip install --upgrade pip
  • pip install --upgrade setuptools [+ any other packages in "pip list"...]
  • pyenv deactivate

Set up PyCharm

  • Start by adding a new project interpreter (from the bottom right or in Settings).
  • Select "existing environment" and select the interpreter, which should be under ~/.pyenv/versions/3.5.2/envs/ENV_NAME/bin/python or ~/.pyenv/versions/ENV_NAME/bin/python.
  • Switch interpreters at the bottom right.


Running Tests using pytest

If you've set up a virtualenv above, you can now run tests directly using pytest.

(env) $ pytest  # all tests

# You can also select specific tests. Try out these examples.
(env) $ pytest -v -k test_progress
(env) $ pytest -v -k TextSourceTest
(env) $ pytest -v apache_beam/io/
(env) $ pytest -v apache_beam/io/

Running Tests using tox

Tox does not require a virtualenv with Beam + dependencies installed. It creates its own.

It also runs tests faster, utilizing multiple processes (via pytest-xdist).

For a list of environments, run tox -l.

tox also supports passing arguments after double dashes to pytest.

(env) $ tox -e py37-gcp  # all tests
(env) $ tox -e py27 -- -k test_progress

Lint Checking

To check just for Python lint errors, run the following command.

$ ../../gradlew lint

Or use tox commands to run the lint tasks:

$ tox -e py27-lint # For python 2.7
$ tox -e py3-lint # For python 3
$ tox -e py27-lint3 # For python 2-3 compatibility


Apache Beam uses yapf formatter ( to ensure that all code conforms to the same style.

Use the following tox command to format every python file under sdks/python/apache_beam:

$ tox -e py3-yapf

It may be faster to format just a single directory or subset of files. This can be done with:

$ pip install yapf==0.29.0    # For consistency, use the version that is currently configured in 
$ yapf --in-place --parallel --recursive apache_beam/path/to/files

To format files with uncommitted changes, run: 

git diff --name-only --relative | xargs yapf --in-place

To format files that were changed in your branch, run:

git diff --name-only --relative your_branch master | xargs yapf --in-place

You can check if your code has been YAPF-formatted by using the following command:

$ tox -e py3-yapf-check

If you need to exclude one particular file or pattern from formatting, just add it to the .yapfignore file (sdks/python/.yapfignore).

Remote testing

This step is only required for testing SDK code changes remotely (not using directrunner). In order to do this you must build the Beam tarball. From the root of the git repository, run:

$ cd sdks/python/
$ python sdist

Pass the --sdk_location flag to use the newly built version.

For example:

$ python sdist > /dev/null && \
    python -m apache_beam.examples.wordcount ... \
    --sdk_location dist/apache-beam-2.5.0.dev0.tar.gz

Run hello world against modified SDK Harness

# Build the Flink job server (default job server for PortableRunner) that stores the container locally.
./gradlew :runners:flink:1.7:job-server:container:docker

# Build portable SDK Harness which builds and stores the container locally.
# Build for all python versions
./gradlew :sdks:python:container:buildAll
# Or build for a specific python version, such as py35
./gradlew :sdks:python:container:py35:docker

# Run the pipeline.
python -m apache_beam.examples.wordcount --runner PortableRunner --input <local input file> --output <local output file>

Run hello world against modified Dataflow Fn API Runner Harness and SDK Harness

# Build portable worker
./gradlew :runners:google-cloud-dataflow-java:worker:build -x spotlessJava -x rat -x test
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar

# Build portable Pyhon SDK harness and publish it to GCP
./gradlew$USER/beam -p sdks/python/container docker
gcloud docker -- push$USER/beam/python:latest

# Initialize python
cd sdks/python
virtualenv env
. ./env/bin/activate

# run pipeline
python -m apache_beam.examples.wordcount   --runner DataflowRunner   --num_workers 1   --project <gcp_project_name>   --output <gs://path>   --temp_location <gs://path>   --worker_harness_container_image$USER/beam/python:latest   --experiment beam_fn_api   --sdk_location build/apache-beam-2.12.0.dev0.tar.gz   --dataflow_worker_jar './runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-fn-api-worker-2.12.0-SNAPSHOT.jar'   --debug

Run integration test

Running integration test
clear && clear && ./scripts/ --test_opts "--tests=apache_beam.runners.dataflow.dataflow_exercise_streaming_metrics_pipeline_test --nocapture" --project <gcs_project_name> --gcs_location gs://<bucket_path> --kms_key_name "" --streaming true --worker_jar <Path_To_Jar_Binary>

Pass –sdk_location flag if tar ball is needed and built from python sdist, otherwise tar ball under default location (target directory of Gradle build) will be used.

Run a ValidatesRunner test

Running a ValidatesRunner test
./scripts/ --test_opts "--attr=ValidatesRunner --tests=apache_beam.transforms.util_test --nocapture" --project <gcs_project_name> --gcs_location gs://<bucket_path> --kms_key_name "" --worker_jar <Path_To_Jar_Binary> --sdk_location ./dist/apache-beam-<version>.dev0.tar.gz --streaming true

This will run all tests with ValidatesRunner attribute from apache_beam/transforms/ in streaming mode. You can manually edit the attributes in the test file to limit which tests you would like to run.

Run integration test from IDE

To run an integration test from an IDE in a debug mode, you can create a Nosetests configuration. For example, to run a VR test on Dataflow runner from IntelliJ/PyCharm, you can adjust the configuration as follows:

  • set Target to Module  and point to the test file. 
  • set Additional arguments (sample, adjust as needed):  
Running a ValidatesRunner test
--test-pipeline-options="--runner=TestDataflowRunner --project=<YOUR PROJECT> --region=us-central1 --temp_location=gs://your_bucket/tmp --sdk_location=./dist/apache-beam-<version>.dev0.tar.gz  --requirements_file=./postcommit_requirements.txt --num_workers=1 --sleep_secs=20" --attr=ValidatesRunner1 --nocapture
  • set Working directory to /path/to/beam/sdks/python

Run a screen diff integration test for Interactive Beam

For Interactive Beam/Notebooks, we need to verify if the visual presentation of executing a notebook is stable.

A screen diff integration test that executes a test notebook and compare results with a golden screenshot does the trick.

Some preparation work:

Test dependencies
# Install additional Python dependencies if absent, under beam/sdks/python, run:
pip install -e .[interactive,interactive_test,test]

# The tests use headless chrome to render visual elements, make sure the machine has chrome executable installed.
# If you're reading this document in a chrome browser, you're good to go for this step.
# Otherwise, e.g., on a Linux machine, you might want to do:
wget --quiet && \
    apt install -y ./google-chrome-stable_current_amd64.deb

# As chrome version advances/differs, the chromedriver-binary needs to stay in sync with chrome.
# Below is a bash example to dynamically install the correct chromedriver-binary.
google_chrome_version=$(google-chrome --product-version)
pip install "chromedriver-binary>=${chromedriver_lower_version},<${chromedriver_upper_version}"

# For consistency of screenshots, roboto-mono font should have been installed.
# You can download the font from
# Otherwise, you can install through CLI, e.g., on Linux:
wget --content-disposition -P /usr/share/fonts/truetype/robotomono \{Bold,BoldItalic,Italic,Light,LightItalic,Medium,MediumItalic,Regular,Thin,ThinItalic}.ttf?raw=true

To run the tests:

Running screen diff integration test
# Under beam/sdks/python, run:
pytest -v apache_beam/runners/interactive/testing/integration/tests

# TL;DR: you can use other test modules, such as nosetests and unittest:
nosetests apache_beam/runners/interactive/testing/integration/tests
python -m unittest apache_beam/runners/interactive/testing/integration/tests/
# To generate new golden screenshots for screen diff comparison:
nosetests apache_beam/runners/interactive/testing/integration/tests --with-save-baseline

Golden screenshots are temporarily taken and stored by system platform. Current supported platforms are: Darwin(MacOS) and Linux.

Each test will generate a stable unique hexadecimal id. The golden screenshots are named after that id.

To add new tests, the simplest way is to put a new test notebook file (.ipynb) under the apache_beam/runners/interactive/testing/integration/test_notebooks directory. Then add a single test under apache_beam/runners/interactive/testing/integration/tests directory.

A test is simple as:

from apache_beam.runners.interactive.testing.integration.screen_diff import BaseTestCase

class SimpleTest(BaseTestCase):
  def test_simple_notebook(self):
    self.assert_notebook('simple_notebook')  # Just put the notebook file name here, e.g., 'simple_notebook'.

How to install an unreleased Python SDK without building it.  

SDK source zip archive and wheels are built continuously after commits are merged to

  1. Click on a recent `Build python source distribution and wheels job` that ran successfully on the master branch from this list
  2. Click on “List files on Google Cloud Storage Bucket” on the right side panel.
  3. Expand “List file on Google Cloud Storage Bucket” in the main panel.
  4. Locate and Download the .zip file.(e.g. from GCS.
  5. Install the downloaded zip file. e.g.

    pip install
    # Or, if you need extra dependencies:
    pip install[aws,gcp]
  6. When you run your Beam pipeline, pass in the --sdk_location flag, pointed at the same zip file. e.g.

  • No labels