Authors: Aihua Li, George Chen, Yu Li

Discussion thread
Vote thread
JIRA

FLINK-14917 - Getting issue details... STATUS

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Since there's no widely accepted performance testing method in the stream-computing field at this moment, we've built an end-to-end performance-testing framework for Flink, which will collect delay and throughput of test jobs. Those collected metrics will indicate the engine's performance directly and can be used for finding performance regression by comparing data among different engine versions.

Goal

We propose to include at least 3 categories of end-to-end performance test suites, including:

  • Test suite for basic operations
  • Test suite for state backend
  • Test suite for shuffle service

And we need to monitor the result in two main aspects:

  • Job performance, mainly include throughput and latency
  • Hardware consumption, mainly include CPU/Memory/Network/Disk consumption

Roadmap

We plan to split the implementation into 4 phases:

  1. Add test suite for basic operations, and a visible WebUI to check the throughput data, pretty much like our existing flink speed center.
  2. Add more software and hardware metrics for the benchmark.
  3. Add test suite for state backend.
  4. Add test suite for shuffle service. 

Design

The detailed design of each test suite will be illustrated in this section.

Test Suite for Basic Operations

In this test suite we will use the default backend (heap) and shuffle service, to make sure of no regression on the basic end-to-end performance of flink job.

Job Topology

Instead of simulating each user scenario, we just choose the most basic topologies for performance test, i.e. SingleInputOperator and TwoInputOperator. These two basic topologies can form any complicated topologies through combination and deformation. Figure 1 and figure 2 show these two basic topologies:

 

Figure 1. One Input Topology

 

Figure 2. Two Input Topology 

Test Scenarios

The following dimensions of Flink job are taken into account when setting the test scenarios:

Topology

Logical Attributes of Edges

Schedule Mode

Checkpoint Mode

OneInput

Broadcast

Lazy from Source

ExactlyOnce

TwoInput

Rescale

Eager

AtLeastOnce


Rebalance




KeyBy



There're also other dimensions other than Flink characteristics, including:

  • Record size: to check both the processing (records/s) and data (bytes/s) throughput, we will test the 10B, 100B and 1KB record size for each test job.
  • Resource for each task: we will use the Flink default settings to cover the most used cases.
  • Job Parallelism: we will increase the parallelism to saturate the system until back-pressure. 
  • Source and Sink: to focus on Flink performance, we generate the source data randomly and use a blackhole consumer as the sink.

Test Job List

The above test scenarios could form 32 test jobs as shown below:

  •  OneInput + Broadcast + LazyFromSource + ExactlyOnce
  •  OneInput + Rescale + LazyFromSource + ExactlyOnce
  •  OneInput + Rebalance + LazyFromSource + ExactlyOnce
  •  OneInput + KeyBy + LazyFromSource + ExactlyOnce
  •  OneInput + Broadcast + Eager + ExactlyOnce
  •  OneInput + Rescale + Eager + ExactlyOnce
  •  OneInput + Rebalance + Eager + ExactlyOnce
  •  OneInput + KeyBy + Eager + ExactlyOnce
  •  OneInput + Broadcast + LazyFromSource + AtLeastOnce
  •  OneInput + Rescale + LazyFromSource + AtLeastOnce
  •  OneInput + Rebalance + LazyFromSource + AtLeastOnce
  •  OneInput + KeyBy + LazyFromSource + AtLeastOnce
  •  OneInput + Broadcast + Eager + AtLeastOnce
  •  OneInput + Rescale + Eager + AtLeastOnce
  •  OneInput + Rebalance + Eager + AtLeastOnce
  •  OneInput + KeyBy + Eager + AtLeastOnce
  •  TwoInput + Broadcast + LazyFromSource + ExactlyOnce
  •  TwoInput + Rescale + LazyFromSource + ExactlyOnce
  •  TwoInput + Rebalance + LazyFromSource + ExactlyOnce
  •  TwoInput + KeyBy + LazyFromSource + ExactlyOnce
  •  TwoInput + Broadcast + Eager + ExactlyOnce
  •  TwoInput + Rescale + Eager + ExactlyOnce
  •  TwoInput + Rebalance + Eager + ExactlyOnce
  •  TwoInput + KeyBy + Eager + ExactlyOnce
  •  TwoInput + Broadcast + LazyFromSource + AtLeastOnce
  •  TwoInput + Rescale + LazyFromSource + AtLeastOnce
  •  TwoInput + Rebalance + LazyFromSource + AtLeastOnce
  •  TwoInput + KeyBy + LazyFromSource + AtLeastOnce
  •  TwoInput + Broadcast + Eager + AtLeastOnce
  •  TwoInput + Rescale + Eager + AtLeastOnce
  •  TwoInput + Rebalance + Eager + AtLeastOnce
  •  TwoInput + KeyBy + Eager + AtLeastOnce

Result Check

In this initial stage we will saturate the system until back-pressure, so we mainly monitor and display throughput of the jobs.

Add More Metrics for the Benchmark

Including software metrics like job-scheduling, task-launching, etc. and hardware metrics like cpu usage, network/disk IO consumption, etc. We plan to implement this at stage 2, and will write down detailed design in a separate child FLIP of this one.

Test Suite for State Backend

This test suite is mainly for making sure the performance of IO intensive applications. We plan to implement this at stage 3, and will write down detailed design in a separate child FLIP of this one.

Test Suite for Shuffle Service

This test suite is mainly for making sure the performance of batch applications. We plan to implement this at stage 4, and will write down detailed design in a separate child FLIP of this one.

Implementation

The test cases are written in java and scripts in python. We propose a separate directory/module in parallel with flink-end-to-end-tests, fwith the name of flink-end-to-end-perf-tests.

The test cases will be executed in a small cluster to better reflect network cost, triggered through Jenkins service for micro benchmark and show the result on code-speed center.