You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 48 Next »

 Ting Chen and Chinmay Soman

Problem Statement:

In case of Pinot real-time tables, the current Stream Low Level Consumption (a.k.a. LLC) protocol requires the Pinot Controller to synchronously upload the segment being committed to a deep storage system (behind the PinotFS interface - eg: NFS / HDFS). Fundamentally, this requirement makes LLC not suitable when a Pinot installation has no deep storage or the deep storage is down for an extended period of time. 

This is a big concern because of the following reasons:

  • Makes Pinot deployment more complicated - we need to deploy a deep storage system wherever Pinot is deployed (especially complicated from a Multi-Zone, Multi-Region architecture).
  • Cost: Pinot has to store all the relevant segments on the local disk for query execution. In addition, Pinot cluster already replicates these segments internally. Storing another copy of these segments in the deep storage adds to the total storage cost. Note: The deep storage itself will have its own internal replication, adding to the total cost.
  • Availability concerns: The deep storage has to be as or more available as compared to Pinot. External deep storage can be unavailable to Pinot for uncontrollable causes lasting for hours. Once the outage happens, Pinot realtime tables can lag behind for hours which is unacceptable for data freshness requirements. This places a restriction on the deep storage technology (eg: HDFS architecture doesn't lend itself to be multi-zone aware and hence has availability concerns).
  • Ingestion latency: Making a synchronous call to upload the segment is an overhead and can potentially limit the ingestion speed and hence latency.

Proposed Solution

Principles of a better solution:

  • No synchronous call to an external system
  • Failure isolation: Reduce correlation failures between independent systems (eg: Pinot and HDFS)
  • Cost efficiency: reduce amount of data duplication
  • Operational ease: resulting system should have less moving parts. We could also simplify the handshake between controller and the servers.

The proposed solution is in fact simple - leverage Pinot’s replication mechanism to alleviate the strong dependency on a deep storage system. By making a change to the segment commit protocol, we can remove this requirement. However, please note - the aim is not to completely get rid of the deep storage system but not use the deep storage differently. 

Current segment completion protocol

Please read this section for understanding the current LLC segment completion protocol: Consuming and Indexing rows in Realtime#Segmentcompletionprotocol

In general, there are 2 important phases in this protocol:

1) Choose a commit leader: one of the replicas is "designated" as a segment commit leader. This replica/server is then responsible for "persisting" this segment.

2) Persist the Segment: In the current design, the commit leader sends the newly created segment to the Pinot Controller which in turn uploads it to deep storage using PinotFS API.

The Persisting Segment phase at its current form requires a dependency and synchronous calls to external storage system like NFS/HDFS/S3. We argue that allowing customizable policies to store the persisted segment will make Pinot LLC easier to operate in different installation environments. Some possible persisting policies other than the current one could be:

  • Use the segment copy in the committer server as the sole backup copy.
  • Use the segment copy in the committer server as well as one more copy asynchronously saved in an external storage system.

This doc focuses on modifying the phase 2 of the protocol. 

Modified segment completion protocol

Currently the ‘commit-leader’ makes a synchronous ‘commit’ call to the Pinot Controller to upload the segment being committed to a deep storage system. If this upload succeeds then the segment is deemed as committed and can be brought online.

New server side segment completion steps

  1. The commit server sends the segment metadata to the controller with segment uri set to be the configured (if any) deep storage location.
    1. Note: When the segment uri is set, the segment upload in step (2) has not occurred. Here we use a technique of Presumed Success: it is based on our experiences that the deep storage is up most of the time and thus the following upload step (2) will very likely to succeed. Even the upload is not successful, there are backup download mechanisms (such as download from peer server). To make metadata commit before segment upload has the advantage that if metadata commit failed, there will be no segment data in deep storage.  
  2. If step (1) commit to controller is successful, the server performs best-effort upload of the segment to the deep storage. i.e., the server does not need to wait for the segment load to succeed before proceeding.
  3. Otherwise, the present commit failure handling kicks in.

The following diagrams show how the new protocol behaves under various settings and failure scenarios.  

RealtimeValidationManager changes

Use the RealtimeValidationManager to fix LLC segments only has a single copy. Currently the validation manager only fix the most recent 2 segments of each partition. With this change, it will scans all segments's Zk data, if it finds a segment does not have a deep storage copy, it will instruct a server to upload the segment to the deep storage and then updates it segment location url with the deep storage uri. 

Segment Download

In our new design, the segment upload to deep storage is asynchronous and best effort. As a result, the server can not depend only on deep storage. Instead we allow the server to download the segment from peer servers (although it is not preferred).  To discover the servers hosting a given segment, we utilize the external view of the table to find out the latest servers hosting the segment. It has the advantages of keeping the server list in sync with table state for events like table re-balance without extra writing/reading cost.  The complete segment download process works as follows:

  1. Download the segment from the deep store uri found in the property store of the zk server.
    1. Note: the download is done with retires and exponential backoff time to cater for the time commit server needs for segment upload.
  2. If (1) failed, get the external view of the table (from Controller) to find out the ONLINE servers hosting the segments.
  3. Download the segment from the server. If failed, move to the next server in the list. The download format is Gzip'd file of the segment on the source server. 

Failure handling

  1. What happens when the segment upload fails but the preceding metadata commit succeeds ?
    • In this case, if a server needs to download the segment, it needs to download from the commit server which has a copy of the data.
    • If users want to minimize the chances of downloading from peer servers: the segment completion mode can be set as DEFAULT instead of DOWNLOAD.
    • In the background, RealtimeValidationManager will fix the upload failure by periodically asks the server to upload missing segments.
  2. What happens if another server gets a "download" but the committer has not gotten to ONLINE state yet?  
    • To account for the fact that the metadata commit happens before the segment upload, another server should do retries (with exponential backoff) when downloading.
    • The retries with wait can greatly reduce the issues caused by the above race condition.

Implementation Tasks (listed in rough phase order)

  1. Interface and API change
    1. Add to Pinot Server a new API for segment download (need performance test here on impact server query performance). 
  2. Server changes
    1. Segment completion protocol: during segment commit, the commit server asynchronously and optionally uploads built segments to a configured external store. It will wait a short amount of time for the upload to finish. Regardless of the upload result, it will move on to send the segment metadata (with the segment uri location if available) to the controller.
    2. Consuming to online Helix state transition: refactor the server download procedure so that it (1) first tries to download segments from the configured segment store if available; otherwise (2) discovers the servers hosting the segment via external view of the table and then download from the hosting server.
  3. Controller changes
    1. Segment completion protocol: controller skips uploading segments – this can be done without controller code change initially because Pinot server can just stop calling LLCSegmentCompletionHandlers.segmentUpload(). More details next section.
  4. RealtimeValidationManager
    1. During segment validation, for any segment without external storage uri, ask one server to upload it to the store and update the segment store uri in Helix.

Production (Seamless) Transition

  1. Let servers to download the segment store directly from external storage instead from controller. 
    1. This requires Pinot servers to send external storage segment uri to the controller.
  2. Enable splitCommit in both controller and server. 
  3. In LLRealtimeSegmentDataManager.doSplitCommit(),
    1. server skips uploading the segment file (based on a config flag)
    2. segment commitStart() and commitEnd() remain the same – we could do a further optimization here to combine the two calls but leave them as the current status for now. 
    3. With the above changes, this controller will not upload segment anymore because LLCSegmentCompletionHandlers.segmentUpload() is not called any more. Interestingly, skipping this call does not need any change on the controller side because Pinot controller does not change its internal state during segmentUpload().

Appendix 

Alternative designs for Segment Download

Segment Download

In Pinot low level consumer data ingestion, a server may need to download a segment instead of building by its own. Currently the server finds out the download location by reading it from the download url of the segment in zk: the download url can either be in controller (for localFS) or in deep storage.

In our new design, the segment upload to deep storage is asynchronous and best effort. As a result, the server can not depend only on deep storage. Instead we allow the server to download the segment from peer servers (although it is not preferred).  There are two options to discover the servers hosting a given segment:

  1. Store the server list in segment zk metadata for the server to read.
  2. Using the external view of the table to find out the latest servers hosting the segment (proposed by @jackie-jiang and also mentionedKishore G).

While (1) seems straightforward, (2) has the advantages of keeping the server list in sync with table state for events like table re-balance without extra writing/reading cost.  The complete segment download process works as follows:

  1. Try to download the segment from the download url in segment Zk.
  2. If (1) failed, get the external view of the table (from Controller) to find out the ONLINE servers hosting the segments.
  3. Download the segment from the server. If failed, move to the next server in the list. The download format is Gzip'd file of the segment on the source server. 







  • No labels