Proposers
Approvers
- Nishith Agarwal : [APPROVED/REQUESTED_INFO/REJECTED]
- Vinoth Chandar : [APPROVED/REQUESTED_INFO/REJECTED]
- Prashant Wason : [APPROVED/REQUESTED_INFO/REJECTED]
Status
Current state:
Current State | |
---|---|
UNDER DISCUSSION | |
IN PROGRESS | |
ABANDONED | |
COMPLETED | |
INACTIVE |
Discussion thread: here
JIRA: here
Released: <Hudi Version>
Abstract
Hudi enables creation, modification and management of tables on the DFS using an engine agnostic client library. Powered by the flexibility offered by Hudi, along with an easy to use client API, the applications ingesting data can manage tables at scale with ACID guarantees between the writers and the readers. Hudi uses the distributed processing engine, like Spark, for distributing the workload across multiple executors and to accomplish scale. This RFC proposes a framework for collecting metrics from the Hudi running on spark executors, to enable insights, like, (a) are there bottlenecks experienced by the application, if any, (b) whether the application ingesting data into Hudi is impacted by any outlier hosts affecting the overall system performance etc, especially at scale when hundreds of executors are in play.
Background
Using a distributed processing engine, like spark, Hudi achieves horizontal scaling of the workload. When investigating a non-performant application ingesting data into a Hudi dataset, identifying where the bottlenecks are requires collecting/understanding some lower level storage metrics. When these metrics are normalized and compared against (a) the dataset's historical performance and/or (b) against the datasets of similar size, scale and characteristics, we shall visualize the underlying patterns to eliminate some aspects while zeroing in on others.
Hudi Observability metrics as a Hudi table
Publishing the observability metrics collected from Hudi into a Kafka stream, ingesting the observability metrics as a Hudi table would make this data searchable and will be a powerful addition to the Hudi toolkit. Like any downstream Hudi user, we shall build dashboards with ease, to visualize and analyze Hudi internal data.
As a first step in the implementation, Hudi observability metrics will also be published to the existing Hudi Metrics infrastructure, to be mad available as a Graphite dashboard.
Implementation
Metrics Collection framework
Power of this framework comes from its simplicity. At the executor, stats can be reported using a unique “counterName”. Executors can report, timer/span information, counts or any other information as a long integer. By relaxing constraints around pre-registered named stats, executors can report any information as a key value pair. For example, each executor could add its tag to the stat, so that at the time of reporting to the stats dashboard, outliers can be identified/reported across executors within the job or all executors across all Hudi jobs.
Reporting stats from the executor vs the driver
In a stateless online system that continuously receives requests and processes the requests, where no additional states are maintained outside the executor/container, it is critical that stats are reported immediately from the executor/container.
For a distributed batch processing system, we have the luxury of collecting the stats from individual executors/containers, consolidate the stats at the driver and report to the backend storage system in one stretch.
Existing Metric infrastructure in Hudi is designed to collect the metrics throughout various stages of the job and maintains the stats in a metrics registry. Actual reporting of the collected stats to the Graphite happens in the shutdown hook for the Metrics class (when JVM exits).
Reporting from Executors Vs Driver | Report metrics from the Driver | Report metrics from N Executors |
Number of backend connections (to Graphite Carbon - metrics collector; Kafka queue for hive table reporting) | 1 | If each executor receives K tasks reporting stats, then K * N connections opened and closed. |
When does the reporting happen? | Reported at the end of the batch job. | Reported right after the task completion. |
Consolidation of stats | Consolidation/Normalization of stats can be done on the driver. | Queries should be structured to consolidate all the stats, at the time of publishing to the dashboard. |
On the Executor
When an executor is performing a task belonging to a specific stage, it will report observability stats in serializable HoodieObservabilityStats structure, which is a container of <metricName, Counter>. In the first iteration of the implementation, our goal is to collect stats around Hoodie write stage, that could manifest as performance differences between executors due to bottlenecks (imbalance in distribution of work, memory related bottlenecks and/or hosts that are slow or having issues).
HoodieWriterStage
WriteStatus class will be extended to include HudiObservabilityStats, that maintains a collection of StatName to a Counter. Hudi tasks running on executors can populate/update the stats using one of the following APIs.
- getCounter(“CounterName”).add(value)
- getCounter(“CounterName”).increment()
Hudi tasks running on spark executors will continue to report the writeStatus back to the driver, as usual, carrying the additional observability stats information in it.
HoodieIndexing stage
Similar to the write stage, during the index lookup stage the record lookup operation is distributed to the executors. With the global bloom index, the objective is to check whether the given set of records are present in a particular <partionpath, fileId>. KeyLookupResult carries the results of the operation back to the driver. KeyLookupResult will be extended with HoodieObservabilityStats to carry the HoodieBloomIndex related observability stats.
On the Spark Driver
General template/model is for the driver to collect/consolidate HoodieObservabilityStats pertaining to the stages, reported by the executors, and create gauges for the collected metrics. (in the metrics layer). When the JVM shutdown hook for Metrics class is invoked, all collected stats for the job will be reported to Graphite (and/or Kafka) metric sinks.
HoodieWriter stage
At commit time, HoodieObservabilityStats extracted from writeStatus of writer stage are consolidated into HoodieObservabilityMetrics and reported using the MetricsGraphiteReporter upon JVM exit.
In MVP1, a new MetricsKafkaReporter class will be added to report the metrics to Kafka, to be consumed as a searchable Hive table.
HoodieBloomIndexStage
During the HoodieBloomIndex stage, the observability stats are returned by the executors on KeyLookupResult, reporting the matching records for each partitionPath/fileId. KeyLookupResult will be extended to include the HoodieObservabilityStats. Observability stats will be collected/consolidated and reported to the Metrics layer at the completion of the stage. Actual reporting of the stats to the Graphite/Kafka sink will happen at the time of JVM shutdown, Metrics shutdown hook is triggered.
Distributed Registry
Spark execution engine offers Spark accumulatorV2, that enables distributed registry functionality, where individual executors can report stats to a registry on the driver, with custom implementations to merge/consolidate the executor level stats before reporting them (to a dashboard, like Graphite or to Kafka). When reporting stats from the executors, executors will try to consolidate the samples before reporting the data, to avoid performance related issues with stats reporting. For example for collecting per record write times, if an executor were to write million records to a file, we would report one set of amortized stats for the write operation (at the spark task level) as opposed to reporting the per record write time samples for every record.
Power of the framework
From a developer’s perspective, let’s say we are suspecting one or more of the executors/containers used for a dataset are slow or having issues. Hudi code running on the executor must perform the following step to report the stats to Graphite/Kafka.
getCounter(“observability.<dataset>.normalizedParquetWriteTimePerMBInUsec.<hostId>”).add(writeTime)
On the observability dashboard, by setting up a chart to observe the patterns across the runs (and/or datasets) would highlight the host outliers (wireframe example).
By building a dashboard, with ability to filter on hoodie dataset, ingestion run, host information, stage bottlenecks and host/executor outliers can be identified.
Rollout/Adoption Plan
Once the proposed solution is implemented, the observability metrics collected from Hudi, using the above described framework, would be available as a Graphite dashboard and will also be published to a Kafka stream. When a pipeline is setup to ingest the metrics available in Kafka into a Hudi table, the data will be available, for search and visualization, capturing insights specific to a dataset as well as insights across all datasets ingested into Hudi.
Test Plan
Unit tests and Test suite integration