Motivation
Pinot supports nearline and batch ingestion modes. Many usecases rely on Pinot for the ability to consume in realtime from appropriate streams. However, currently, there is no measure of the freshness of data when a query is served. This proposal tackles this issue.
Background and Assumptions
The freshness of a stream can be impacted by various factors:
- Delay in the producer side (application or network) to send the event out after it was generated.
- Delay in the stream service in delivering the message to Pinot.
- Delays in Pinot layers to consume and index the message.
We would like to focus on #3 above. However, due to limitations around where and how meaningful timestamps can be obtained, we might at times end up measuring #2 as well.
Design
- Obtaining “ingestion” timestamp for messages consumed from realtime stream:
- The stream APIs are updated to return the timestamp for a message as provided by the stream service. This is expected to be the time the service received message. For Kafka clients >= v11, the ConsumerRecord API exposes this timestamp.
- Updating Metadata to expose the ingestion and indexing timestamp
- SegmentMetadata is updated to reflect the max ingestion timestamp based on #1 above. In addition, the indexing timestamp (ie, the time that the last record/message was indexed for a given segment) is tracked in SegmentMetadata. This can be used if the stream fails to provide an ingestion timestamp. However, this is much less reliable than ingestion timestamp.
- To make the ingestion timestamp available to the MutableSegment/RealtimeSegment, the index method of the MutableSegment interface is extended to take in additional RowMetadata. RowMetadata is aimed to be extensible to include other metadata, but currently includes just the ingestion timestamp.
- Obtaining ingestion/freshness timestamp at query time:
- At query time, each Server returns the number of consuming segments it was required to query as well as the “min” ingestion/indexing timestamp across such segments.
- In the gather phase, the Broker picks the minimum of the timestamps reported across servers. This is considered as the overall freshness indicator for the query. A “freshnessLag” metric that indicates the delta between current-time and the min-freshness time will be emitted by the broker whenever consuming segments are queried.
- Users can indicate a tolerable freshnessLag SLA in table-config. This can be used to setup alerts when the freshnessLag metric goes beyond a certain threshold.
- Users should include any expected delays in ingestion (perhaps due to periodic nature of the job that publishes events) while setting the SLA.
Discussion and Alternatives
The above proposal hinges on the main notion that typically Broker’s routing table includes all consuming segments to service a query AND that the users are interested in the query time impact of freshness.
However, there are cases where the above assumption won’t hold:
- Consuming segments are pruned by the broker. In this case, we won’t report on freshness (or might report it based only on the consuming segments included in the query set). This is an explicit design choice.
- Segments move from Consuming to committed/online state. The proposal does not make provision for persisting the data through to ImmutableSegments. This is an explicit design choice at this stage to keep the implementation simple. This is based on the expectation that number of times queries will catch segments in this transition state will be low and thus will not impact the overall notion of query freshness. (The metric is expected to be a broader indicator of issues, than to be able to pick up on specific queries that had issues.)
Why not simply measure lag?
Kafka based implementations expose consumer-metrics that include an indicator for consumption lag. For kafka, this manifests as an indicator based on offsets and does not reliably indicate freshness based on wallclock time that the user is interested in. For a queue with high ingestion rate, the offset might indicate a “lag” while in reality there isn’t an issue with consumption. For a queue with low ingestion rate, a low lag can actually indicate an issue.
Why do we need to track this through to indexing a message?
We would like the freshness to provide as holistic a view of the consumption status as possible. This means that if the message is consumed, but hasn’t been indexed due to errors/bugs we would like that to be reflected. Issue 4070 makes a case for stopping consumption in such cases which is a stronger stance for dealing with potential causes that can lead to these situations.
Why do we need to plug the metric into the query layer?
While issues with consumption can be measured different ways (perhaps by simply relying on the metric emitted by the Servers about “LLC_PARTITION_CONSUMING”), it isn’t easy to capture the impact to the user when such issues are detected. The main goal of this proposal is to be able to capture user impact when the service has issues consuming or indexing messages.
Interface updates
Audience-Public interface updates:
MessageBatch : New method to extract message metadata
Internal interface updates:
MutableSegment: index() method updated to take RowMetadata*
SegmentMetadata: New methods added to return indexing and ingestion timestamps
BrokerResponse: New methods to expose freshness timestamp and number of consuming segments.
Interface Additions:
*RowMetadata: Exposes metadata associated with a record/row. Currently includes ingestion-timestamp.
Metric Additions
BrokerMeter.freshnessLagMs: a metric that indicates the lag (in milliseconds) seen by a query that covers consuming segments.
Backward compatibility considerations
The updates to methods or interfaces are completely backwards compatible. The MessageBatch API addition is done through ‘default’ method, so users extending this API are not forced to provide an implementation right away. But if they would like to use the freshness feature, they should hook in the logic appropriate for the stream.