DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Abstract
The Event Reporter system introduced in FLIP-481 is a powerful addition to Flink's observability stack. As this feature matures, the next logical step is to ensure its stability in demanding production environments. This FLIP proposes a focused enhancement to build upon that foundation: the introduction of a pluggable dispatching architecture.
This will allow operators to choose between the simple, existing synchronous delivery and a new, resilient asynchronous dispatch core. The asynchronous core will decouple event generation from reporter execution using a bounded in-memory queue, ensuring that slow sinks cannot impact JobManager performance. To further enhance its operational maturity, this proposal includes per-reporter monitoring metrics and a circuit breaker pattern for gracefully handling faulty sinks. This FLIP delivers these architectural improvements along with a simple FileEventsReporter, making the system more robust, transparent, and production-ready.
Motivation
FLIP-481 provided the community with a much-needed mechanism for capturing structured operational events. As this feature sees wider adoption with reporters writing to diverse sinks (network services, filesystems, etc.), we can proactively harden the system to ensure the JobManager's stability remains paramount.
While the current synchronous model is simple and effective for many use cases, an optional asynchronous model will provide an additional layer of isolation and resilience for large-scale deployments. This enhancement will ensure that even under heavy load or with slow external systems, the observability layer cannot compromise the stability of the core components it monitors. This allows operators to enable the feature with full confidence.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log formatThe network protocol and api behaviorAny class in the public packages under clientsConfiguration, especially client configurationorg/apache/kafka/common/serializationorg/apache/kafka/commonorg/apache/kafka/common/errorsorg/apache/kafka/clients/producerorg/apache/kafka/clients/consumer (eventually, once stable)
MonitoringCommand line tools and argumentsAnything else that will likely break existing users in some way when they upgrade
Proposed Changes
This proposal is a targeted enhancement to the existing system. The public APIs for creating events (Event, EventBuilder, MetricGroup.addEvent) will remain unchanged.
3.1. Architectural Enhancement: Pluggable Event Dispatcher
We will introduce an internal EventDispatcher interface that the MetricRegistry will delegate to. This allows the dispatching strategy to be configured by the user. This FLIP introduces two initial implementations:
- SynchronousDispatcher (Default)
- This dispatcher preserves the current behavior. It forwards events directly to all configured reporters on the caller's thread. This ensures guaranteed, blocking delivery and maintains full backward compatibility for users who rely on the existing simple behavior.
- AsynchronousDispatcher (Opt-in)
- Non-Blocking Ingestion: The MetricGroup.addEvent() implementation will be modified to perform a non-blocking offer() of the event to a bounded in-memory queue (ArrayBlockingQueue).
- Dedicated Dispatcher Thread: A single, dedicated thread will consume events from the queue and pass them to the configured reporters.
- Service Lifecycle: The dispatch service will be managed within the Flink component lifecycle, ensuring it starts and stops cleanly.
- This dispatcher provides enhanced resilience and isolation. Its key features are:
3.2. Operational Maturity (for AsynchronousDispatcher)
To ensure the asynchronous system is transparent and robust, we will introduce several operational features that apply only when the AsynchronousDispatcher is enabled:
- Backpressure Strategy: If the dispatch queue is full, events will be dropped, and a counter will be incremented. This "Drop and Count" strategy prioritizes JobManager stability.
- Monitoring Metrics:
- events.droppedCount (Counter)
- events.dispatcher.queue.currentUsage (Gauge)
- events.reporter.<reporterName>.latency (Histogram)
- events.reporter.<reporterName>.failureCount (Counter)
- Dispatcher Metrics: We will add metrics to monitor the health of the central dispatcher:
- Per-Reporter Metrics: For granular insight, the dispatcher will measure the performance of each configured reporter:
- Resilience: A circuit breaker will be implemented to temporarily stop sending events to a reporter that fails consecutively, preventing log spam and wasted resources.
3.3. Initial Implementation: A Reference FileEventsReporter
This FLIP includes a simple FileEventsReporter that writes events as single-line JSON to a file. This serves as a lightweight, dependency-free option that complements existing reporters like OpenTelemetry and SLF4J.
3.4. New Configuration Options
We will add the following keys to flink-conf.yaml:
- events.dispatcher.type: (Default: sync, Options: sync, async-queued)
- Selects the dispatching strategy. sync preserves the current blocking behavior, while async-queued enables the new resilient, queue-based dispatcher.
The following keys are only applicable when events.dispatcher.type is set to async-queued:
- events.dispatcher.queue.size: (Default: 1024)
- events.dispatcher.circuit-breaker.failures: (Default: 5)
- events.dispatcher.circuit-breaker.cooldown: (Default: 1m)
Compatibility, Deprecation, and Migration Plan
- Compatibility: This enhancement is fully backward-compatible. By default, the events.dispatcher.type is set to sync, which preserves the exact behavior of the existing system. The new asynchronous functionality is entirely opt-in, ensuring no impact on existing jobs or custom reporters.
- Deprecation: No components are deprecated.
- Migration: No migration is needed.
Test Plan
The test suite will be expanded to validate the new logic:
- Unit Tests: Will cover the SynchronousDispatcher and AsynchronousDispatcher behavior
- For the AsynchronousDispatcher, tests will specifically cover the queue-full/drop behavior, metric updates, and circuit breaker logic.
- Integration Tests: A key scenario will be added using a "slow mock reporter" that introduces artificial latency. This test will verify that events are dropped when the queue fills and that the eventsDroppedCount metric works as expected, all while the main test thread remains unblocked.
- Concurrency Tests: Verify thread-safe event submission during concurrent addEvent() calls, especially during a shutdown race.
Future Work
This FLIP provides a resilient foundation for future enhancements, including:
- Reporter-Specific Policies: Allowing different reporters to have unique backpressure strategies.
- Advanced FileEventsReporter: Enhancing the file reporter with size- and time-based rolling policies.
- Expanded Event Catalog: Adding more high-value event types to the system.