DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This document proposes a comprehensive Pipeline Testing Framework for Apache Flink, enabling developers to test complete data processing pipelines locally with controlled event injection and result verification. The framework provides manual event control, watermark management, and multi-source/sink coordination for end-to-end pipeline testing.
Inspiration: This framework is an implementation of the testing architecture demonstrated in the talk "Testing Production Streaming Applications" by Gyula Fora and Matyas Orhidi, specifically for the new Source/Sink V2 API, with additional API’s.
Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
What is a Pipeline Flow Test?
A Pipeline Flow test focuses on the interaction and data flow between various components of the application.
It involves replacing real input sources with sources that can produce test or mock data, which flows through the business logic and produces a result output stream that can be validated against the expected output before deploying the job.
It also allows for manual input of data and control messages to test specific parts of the data processing pipeline.
Flink applications often involve intricate logic that responds to event timing and order. This testing allows developers to introduce data and events in a controlled manner, verifying that the system behaves as expected under specific conditions. As of today, Flink offers test harnesses that handle stateless/stateful operators, but are insufficient for testing full pipelines. Real-world streaming applications demand:
- Full pipeline validation: From source to sink
- Precise control: Over events, timestamps, and watermarks
- Ease of use: Without spinning up embedded clusters or writing extensive mocks
This framework addresses those needs and makes testing first-class in the Flink developer workflow.
Scope
- The framework is designed to facilitate testing solely within a local development environment, allowing developers to perform comprehensive pre-production validations of their business logic.
- Users are required to create their test cases using the framework, specifying both the inputs and the expected outputs for each test.
- The current framework does not support direct reading from production data sources. All testing must be conducted using simulated data sources provided within the framework.
- Supports both DataStream and Table/SQL pipelines.
Public Interfaces
Core Testing API
PipelineTestRunner :
Builder for a test run. Registers named sources/sinks, binds a per-run contextId, configures parallelism/timeout/config, and exposes getSource(name)/getSink(name) helpers for the user’s pipeline definition
PipelineTestExecution :
Orchestrates a single run: starts the job with executeAsync, waits until RUNNING, lets your test logic inject records (send(...), sendWatermark(...), finishSource(...)), provides assertSink(name) (returns a SinkAssertions), and handles cleanup.
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
- DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
- Classes marked with the @Public annotation
- On-disk binary formats, such as checkpoints/savepoints
- User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
- Configuration settings
- Exposed monitoring information
@PublicEvolving
public class PipelineTestRunner {
public static Builder builder();
public PipelineTestExecution execute(DataStreamDefinition pipelineDefinition);
public PipelineTestExecution executeSql(SqlDefinition pipelineDefinition);
public interface Builder {
<T> Builder withSource(String name, PipelineTestSource<T> source);
<T> Builder withSink(String name, PipelineTestSink<T> sink);
Builder withParallelism(int parallelism);
Builder withTimeout(Duration timeout);
Builder enableSqlTestConnectors();
PipelineTestRunner build();
}
@FunctionalInterface
public interface DataStreamDefinition {
void define(StreamExecutionEnvironment env) throws Exception;
}
@FunctionalInterface
public interface SqlDefinition {
void define(StreamTableEnvironment env) throws Exception;
}
}
@PublicEvolving
public class PipelineTestExecution {
public <T> void send(String sourceName, T record);
public <T> void send(String sourceName, T record, long timestamp);
public void sendWatermark(String sourceName, long timestamp);
public void finishSource(String sourceName);
public <T> SinkAssertions<T> assertSink(String sinkName);
public <T> SinkAssertions<T> assertSideOutput(org.apache.flink.util.OutputTag<T> tag);
}
Source Interfaces
@PublicEvolving
public interface PipelineTestSource<T> {
void send(T record);
void send(T record, long timestamp);
void sendWatermark(long timestamp);
void finish();
Source<T, ?, ?> asFlinkSource();
}
@PublicEvolving
public class ManualTestSource<T> implements PipelineTestSource<T>, Source<T, NoOpSplit, NoOpEnumState> {
public static <T> ManualTestSource<T> of(Class<T> typeClass);
public static <T> ManualTestSource<T> named(String name, Class<T> typeClass);
}
@PublicEvolving
public class FileTestSource<T> implements PipelineTestSource<T> {
public static <T> FileTestSource<T> fromJsonFile(String filePath, Class<T> typeClass);
public static <T> FileTestSource<T> fromCsvFile(String filePath, Class<T> typeClass);
}
Sink Interfaces
public interface PipelineTestSink<T> {
List<T> getRecords();
T poll(Duration timeout);
T poll();
boolean isEmpty();
int size();
void clear();
Sink<T> asFlinkSink();
}
@PublicEvolving
public class CollectingPipelineSink<T> implements PipelineTestSink<T> {}
Assertion API
@PublicEvolving
public class SinkAssertions<T> {
public SinkAssertions<T> eventuallyHas(int expectedCount);
public SinkAssertions<T> eventuallyReceives(T expectedRecord);
public SinkAssertions<T> receivedAll(T... expectedRecords);
public SinkAssertions<T> receivedInOrder(T... expectedRecords);
public SinkAssertions<T> receivedExactly(int count);
public SinkAssertions<T> allMatch(Predicate<T> predicate);
public SinkAssertions<T> within(Duration timeout);
}
Architecture Overview
Proposed Module Structure
flink-test-utils-parent/
└── flink-pipeline-testkit/
├── PipelineTestRunner.java
├── PipelineTestExecution.java
├── assertions/
│ └── SinkAssertions.java
├── sources/
│ ├── PipelineTestSource.java
│ ├── ManualTestSource.java
│ ├── FileTestSource.java
│ ├── ManualSourceReader.java
│ └── NoOp*.java
├── sinks/
│ ├── PipelineTestSink.java
│ └── CollectingPipelineSink.java
|── table/
│ ├── TestManualDynamicTableSource.java
│ ├── TestCollectDynamicTableSink.java
│ └── TestTableFactory.java
└── utils/
├── SharedBuffer.java
When test code sends data via execution.send(), it's placed in a SharedSourceBuffer where the pipeline's source operators can retrieve it. Similarly, when the pipeline writes output to test sinks, it is stored in the SharedSinkBuffer, where test assertions can verify the results (See example below).
Usage
How Flink Users Will Use This Framework
Flink users can write deterministic pipeline tests using this framework by defining their job's topology and executing it in a local, controlled test environment. The framework provides high-level abstractions for setting up the pipeline, feeding events (optionally with timestamps), emitting watermarks, and asserting output correctness.
Steps for Usage:
- Create a PipelineTestRunner
- This serves as the entry point for defining the pipeline and test setup.
- Attach one or more ManualTestSource or FileTestSource instances
- Manual sources allow runtime injection of events and watermarks.
- File sources are suitable for static test inputs in JSON or CSV formats.
- For SQL, the connector options for apt source/sink will be used.
- Attach one or more CollectingTestSink or AssertingTestSink instances
- Collected outputs can be validated against expected values or custom logic.
- Define the pipeline using the standard StreamExecutionEnvironment
- The framework seamlessly integrates with existing Flink operator chains.
- Run the test using the .run() method
- Events are injected via send(), sendWatermark(), and finishSource() methods.
- Output can be asserted using fluent assertion APIs like receivedAll(), eventuallyHas(), etc.
- Example Use Case Scenarios
- Testing event-time windowing with out-of-order data, Validating join behavior with late records, Ensuring multi-source alignment, etc.
SQL Testing
For users, write end-to-end tests entirely in SQL while reusing the same in-memory buffers as the DataStream test kit.
Connectors:
- Source: connector='test-manual' (TestManualDynamicTableSourceFactory)
- Options: name (required) – logical stream name
- Sink: connector='test-collect' (TestCollectDynamicTableSinkFactory)
- Options: name (required)
- Changelog: INSERT-only for both (v1).
Examples
Production Job (SQL):
PipelineTestRunner runner = PipelineTestRunner.builder()
.withParallelism(1) .withTimeout(java.time.Duration.ofSeconds(30)) .enableSqlTestConnectors() .build();
PipelineTestExecution exec = runner.executeSql(tEnv -> {
String ctx = runner.getContextId(); // runner exposes this
tEnv.executeSql("""
CREATE TEMPORARY TABLE orders(
order_id STRING,
ts BIGINT,
WATERMARK FOR ts AS TO_TIMESTAMP_LTZ(ts, 3)
) WITH (
'connector'='test-manual',
'name'='orders'
)
""");
tEnv.executeSql("""
CREATE TEMPORARY TABLE out(order_id STRING) WITH (
'connector'='test-collect',
'name'='out'
)
""");
tEnv.executeSql("INSERT INTO out SELECT order_id FROM orders");
});
long t0 = System.currentTimeMillis();
exec.send("orders", org.apache.flink.types.Row.of("o1", t0), t0);
exec.sendWatermark("orders", t0 + 60_000);
exec.finishSource("orders");
exec.waitForCompletion();
exec.assertSink("out").receivedExactlyInAnyOrder(org.apache.flink.types.Row.of("o1"));
Production Job (DataStream):
public class OrderProcessingJob {
// Testable pipeline builder
public static void buildPipeline(
StreamExecutionEnvironment env,
Source<String, ?, ?> ordersSource,
Source<String, ?, ?> inventorySource,
Sink<String> processedOrdersSink,
Sink<String> alertsSink,
Sink<String> metricsSink) {
// Orders stream with event-time watermarks
DataStream<Order> orders = env
.fromSource(
ordersSource,
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> parseTimestamp(event)),"Orders Source")
.map(json -> parseOrder(json)).keyBy(order -> order.customerId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce((o1, o2) -> mergeOrders(o1, o2));
// Inventory stream (no watermarks needed)
DataStream<Inventory> inventory = env .fromSource(inventorySource, WatermarkStrategy.noWatermarks(), "Inventory Source") .map(json -> parseInventory(json));
// Join and enrich
DataStream<EnrichedOrder> enriched = orders
.connect(inventory.broadcast())
.process(new EnrichmentFunction());
// Route to multiple sinks
enriched.filter(o -> o.isValid()).map(o -> o.toJson()).sinkTo(processedOrdersSink);
enriched.filter(o -> o.needsAlert()).map(o -> createAlert(o)).sinkTo(alertsSink);
enriched.map(o -> o.toMetrics()).sinkTo(metricsSink);
}
}
Testing using Pipeline Test Framework (Basic Example)
@Test
public void testOrderProcessingVerboseStyle() throws Exception {
// Step 1: Build the test runner
PipelineTestRunner runner = PipelineTestRunner.builder()
.withSource("orders", ManualTestSource.of(String.class))
.withSource("inventory", ManualTestSource.of(String.class))
.withSink("processed", CollectingPipelineSink.<String>create())
.withSink("alerts", CollectingPipelineSink.<String>create())
.withSink("metrics", CollectingPipelineSink.<String>create())
.withParallelism(2)
.withTimeout(Duration.ofMinutes(2))
.build();
// Step 2: Execute and get the PipelineTestExecution
PipelineTestExecution execution = runner.execute(env -> {
OrderProcessingJob.buildPipeline(
env,runner.getSource("orders"),runner.getSource("inventory"),runner.getSink("processed"), runner.getSink("alerts"),runner.getSink("metrics"));
});
// Step 3: Now we have full access to all PipelineTestExecution methods
// === Sending Data to Multiple Sources ===
execution.send("inventory", "product-1,100");
execution.send("inventory", "product-2,0");
execution.send("inventory", "product-3,50");
// Send orders with timestamps for event-time processing
long baseTime = System.currentTimeMillis();
execution.send("orders", "order-1,customer-1,500.00," + baseTime, baseTime);
execution.send("orders", "order-2,customer-1,1500.00," + (baseTime + 1000), baseTime + 1000);
execution.send("orders", "order-3,customer-2,200.00," + (baseTime + 2000), baseTime + 2000);
// === Watermark Management ===
// Send watermark to trigger first window
execution.sendWatermark("orders", baseTime + 60000);
// Wait a bit for processing
execution.waitFor(Duration.ofMillis(100));
// Send late arriving data
execution.send("orders", "order-4,customer-3,300.00," + (baseTime + 5000), baseTime + 5000);
execution.send("orders", "order-5,customer-1,10500.00," + (baseTime + 65000), baseTime + 65000);
// Trigger second window
execution.sendWatermark("orders", baseTime + 120000);
execution.send("inventory", "product-1,25");
execution.send("inventory", "product-2,100");
// === Source Completion ===
execution.finishSource("orders");
execution.finishSource("inventory");
// === Wait for Job Completion ===
// Option 1: Wait with default timeout
execution.waitForCompletion();
// Option 2: Wait with custom timeout
// execution.waitForCompletion(Duration.ofSeconds(30));
// === Assertions on Multiple Sinks ===
SinkAssertions<String> processedAssertions = execution.assertSink("processed");
SinkAssertions<String> alertAssertions = execution.assertSink("alerts");
alertAssertions
.receivedAtLeast(2)
.received("ALERT: Order order-2 for customer customer-1 - Status: HIGH_VALUE, Value: 1500.00")
.anyMatch(alert -> alert.contains("10500.00"));
// Get assertions for metrics sink
SinkAssertions<String> metricsAssertions = execution.assertSink("metrics");
// Complex assertions
metricsAssertions
.receivedExactly(5)
.receivedAllInAnyOrder(
"order_count:1,total_value:500.00,customer:customer-1",
"order_count:1,total_value:1500.00,customer:customer-1",
"order_count:1,total_value:200.00,customer:customer-2",
"order_count:1,total_value:300.00,customer:customer-3",
"order_count:1,total_value:10500.00,customer:customer-1"
);
// === Direct Sink Access ===
// Get direct access to sink for custom checks
PipelineTestSink<String> processedSink = execution.assertSink("processed").getSink();
// Poll records with timeout
String firstRecord = processedSink.poll(Duration.ofSeconds(1));
assertNotNull(firstRecord);
List<String> allProcessed = processedSink.getRecords();
assertEquals(5, allProcessed.size());
assertFalse(processedSink.isEmpty());
int recordCount = processedSink.size();
assertEquals(5, recordCount);
// Clear sink if needed for next test phase
processedSink.clear();
assertTrue(processedSink.isEmpty());
// === Job Control ===
// Cancel job if needed (useful for infinite streams)
// execution.cancel();
}
Test Plan
Unit Tests
- Operator chaining and transformations (e.g., map, flatMap, filter)
- Event time and watermark handling
- Manual and file-based source/sink validation
- Basic assertion logic via SinkAssertions
References
FLIP-27, FLIP-238, FLIP-143, FLIP-191
Rejected Alternatives
- None


