Ting Chen and Chinmay Soman

Problem Statement:

In case of Pinot real-time tables, the current Stream Low Level Consumption (a.k.a. LLC) protocol requires the Pinot Controller to synchronously upload the segment being committed to a deep storage system (behind the PinotFS interface - eg: NFS / HDFS). Fundamentally, this requirement makes LLC not suitable when a Pinot installation has no deep storage or the deep storage is down for an extended period of time. 

This is a big concern because of the following reasons:

  • Makes Pinot deployment more complicated - we need to deploy a deep storage system wherever Pinot is deployed (especially complicated from a Multi-Zone, Multi-Region architecture).
  • Cost: Pinot has to store all the relevant segments on the local disk for query execution. In addition, Pinot cluster already replicates these segments internally. Storing another copy of these segments in the deep storage adds to the total storage cost. Note: The deep storage itself will have its own internal replication, adding to the total cost.
  • Availability concerns: The deep storage has to be as or more available as compared to Pinot. External deep storage can be unavailable to Pinot for uncontrollable causes lasting for hours. Once the outage happens, Pinot realtime tables can lag behind for hours which is unacceptable for data freshness requirements. This places a restriction on the deep storage technology (eg: HDFS architecture doesn't lend itself to be multi-zone aware and hence has availability concerns).
  • Ingestion latency: Making a synchronous call to upload the segment is an overhead and can potentially limit the ingestion speed and hence latency.

Proposed Solution

Principles of a better solution:

  • No synchronous call to an external system
  • Failure isolation: Reduce correlation failures between independent systems (eg: Pinot and HDFS)
  • Cost efficiency: reduce amount of data duplication

The proposed solution is in fact simple - leverage Pinot’s replication mechanism to alleviate the strong dependency on a deep storage system. By making a change to the segment commit protocol, we can remove this requirement. However, please note - the aim is not to completely get rid of the deep storage system but use the deep storage differently. 

Current segment completion protocol

Please read this section for understanding the current LLC segment completion protocol: Consuming and Indexing rows in Realtime#Segmentcompletionprotocol

In general, there are 2 important phases in this protocol:

1) Choose a commit leader: one of the replicas is "designated" as a segment commit leader. This replica/server is then responsible for "persisting" this segment.

2) Persist the Segment: In the current design, the commit leader sends the newly created segment to the Pinot Controller which in turn uploads it to deep storage using PinotFS API.

The Persisting Segment phase at its current form requires a dependency and synchronous calls to external storage system like NFS/HDFS/S3. We argue that allowing customizable policies to store the persisted segment will make Pinot LLC easier to operate in different installation environments. Some possible persisting policies other than the current one could be:

  • Use the segment copy in the committer server as the sole backup copy.
  • Use the segment copy in the committer server as well as one more copy asynchronously saved in an external storage system.

This doc focuses on modifying the phase 2 of the protocol. 

Modified segment completion protocol

Currently the ‘commit-leader’ makes a synchronous ‘commit’ call to the Pinot Controller to upload the segment being committed to a deep storage system. If this upload succeeds then the segment is deemed as committed and can be brought online.

New server side segment completion steps

  1. The commit server uploads the segment to a configurable deep store location* and waits for a timeout period for the upload to finish
    1. If the upload finishes successfully, a segment location URI I will be returned by the uploader.
    2. If the upload fails or times out, a NULL location string will be returned. 
    3. *If no deep store is configured,  NULL will be returned like b) above.
  2. The commit server proceeds to perform segment metadata commit step of the split commit protocol.
    1. The segmentLocation used in the metadata commit is either I (when segment upload succeeds) and a special indicator when upload fails. 
  3. If the metadata commit fails, the present commit failure handling kicks in.

The following diagrams show how the new protocol behaves under various settings and failure scenarios.  

RealtimeValidationManager changes

Use the RealtimeValidationManager to fix LLC segments only has a single copy. Currently the validation manager only fixes the most recent 2 segments of each partition. With this change, it will scans all segments's Zk data, if it finds a segment does not have a deep storage copy, it will instruct a server to upload the segment to the deep storage and then updates it segment location url with the deep storage uri. We need to evaluate the performance penalty of more zk data access by validation manager. We need to deploy optimization techniques to reduce zk data access rate:

  1. Run a periodical job to obtain segment zk data instead of letting the  RealtimeValidationManager to access it every time it runs; and
  2. If commit servers fail to upload the segment, they will report the failures and record them in zk.

Segment Download

In our new design, the segment upload to deep storage is done in a best effort manner. As a result, even if the deep storage is not available, we allow the server to download the segment from peer servers (although it is not preferred).  To discover the servers hosting a given segment, we utilize the external view of the table to find out the latest servers hosting the segment. It has the advantages of keeping the server list in sync with table state for events like table re-balance without extra writing/reading cost.  The complete segment download process works as follows:

  1. Download the segment from the deep store uri found (if any) in the property store of the zk server.
    1. Note: the download is done with retries and exponential backoff time to cater for the time commit server needs for segment upload.
  2. If (1) failed, get the external view of the table (from Controller) to find out the ONLINE servers hosting the segments.
  3. Download the segment from a randomly chosen ONLINE server. The download format is Gzip'd file of the segment on the source server. 

API changes

  1. A new api for segment download from a Pinot server (via server Admin api port)
    • URI path:  /tables/{tableName}/segments/{segmentName}
    • Usage: Download a realtime or offline table segment as a zipped tar file.
    • Code location:  TablesResource

Config change

  1. Enable best effort segment upload in SplitSegmentCommiter and download segment from peer servers.

New table config: Add a new optional string field peerSegmentDownloadScheme to the SegmentsValidationAndRetentionConfig in the TableConfig. The value can be http or https

During segment completion phase,

  • SplitSegmentCommitter  will check this value. If it exists, the segment committer will be able to finish segment commit successfully even if the upload to the segment store fails. The committer will report to the controller that the segment is available in peer servers.
  • When Pinot servers in LLRealtimeSegmentDataManager fail to download segments from the segment store during goOnlineFromConsuming() transition, they also check this field's value. If it exists, it can initPeerServerSegmentFetcher to
    1. First discover the segment location server URI u.
    2. Construct the complete uri using the configured scheme (http or https) and use the appropriate segment fetcher to download it.

Note this is a table level config. We will test the new download behavior in realtime tables in incremental fashion. Once fully proven, this config can be upgraded to server level config. 

Notes and Caveats:

  1. The new table config also allows server to download offline segments from peers. But one can not simply use peer download of offline segment without check like realtime segments because an offline segment can be refreshed or changed. A race condition can happen for offline segments which can be dangerous. If a segment has been updated with a newer version, and server A and B have old versions. Both of them get notified of the newer version. They may try to fetch the segment and fail, and eventually fetch from each other, and end up thinking that they have the newest version of the segment. (Example given by Subbu Subramaniam)

    While there are some ways to fix this (e.g. in the segment update message, send the crc of the new version), we need to vet these well before adopting these.

Failure cases and handling

  1. The segment upload fails but the preceding metadata commit succeeds
    • In this case, a server can failover to download from the commit server which has a copy of the data.
    • If users want to minimize the chances of downloading from peer servers: the segment completion mode can be set as DEFAULT instead of DOWNLOAD.
    • In the background, RealtimeValidationManager will fix the upload failure by periodically asks the server to upload missing segments.
  2. The segment upload fails and the commit server crashes but the preceding metadata commit succeeds
    • The non-committer server can not download from the committer server
    • In DEFAULT segment completion mode, the non-committer server can still try to finish the segment.
    • In DOWNLOAD segment completion mode, the non-committer server will get into ERROR state for the segment.
    • Wait for the RealtimeValidationManager to fix the segment.
  3. The segment upload succeeded but the the commit server crashes
    • The non-commit servers can download from the segment store.
  4. The segment upload succeeded but the controller crashes
    • Can be handled similar to the current failure handling mechanism.
    • If another server was asked to commit and upload the same segment again, let PinotFS to handle the segment overwrites. 
  5. Another server gets a "download" but the committer has not gotten to ONLINE state yet?  
    • To account for the fact that the metadata commit happens before the segment upload, another server should do retries (with exponential backoff) when downloading.
    • The retries with wait can greatly reduce the issues caused by the above race condition.

Implementation Tasks (listed in rough phase order)

  1. Interface and API change
    1. A new Pinot Server API in TablesResource.java for segment download (PR 4914). 
      1. Start with a straightforward approach to first gzip the segment directory for each request and send it back.
      2. Need performance test here on impact server query performance during segment download.
    2. A new Pinot Server API in TablesResource.java for uploading a segment to a configured deep store. (PR Pending).
      1. Used by RealtimeValidationManager to ask servers to update
  2. Server changes for segment completion (PR 4914)
    1. Add a new config parameter enableSegmentUploadToController to IndexLoadingConfig.java (default true for backward compatibility) to control whether commit servers need to upload segments to controller.
      1. when set as true, the segment completion behavior is exactly the same as the current code.
      2. when set as false, server will not upload built segment to the controller (see controller change below).
    2. Add a new config segment.store.root.dir to HelixInstanceDataManagerConfig.java to config the deep store root dir if necessary. 
    3. In SplitSegmentCommitter.java, if enableSegmentUploadToController is set as false, skip synchronous segmentCommitUpload segment to controller during split commit.
    4. In LLRealtimeSegmentDataManager.java, after the segment commit is done with successful metadata upload in (c) –- by this it implies enableSegmentUploadToController is false,
      1. If there is a configured deep store (by checking if segment.store.root.dir is set), the server does a best effort upload of built segments to the configured external store – i.e., no need to retry even the upload fails.
      2. Otherwise just continue.
    5. Consuming to online Helix state transition: refactor the server download procedure so that it
      1. first tries to download segments from the configured segment store if available – this is done with retries and backoff to get rid of race condition with upload servers; otherwise
      2. Use a new PeerServerSegmentFetcher (which is configured in pinot servers via SegmentFetcherFactory)to
        discovers the servers hosting the segment via external view of the table and then download from the hosting server.
  3. Controller changes for segment completion (PR 4914)
    1. Add a new query parameter enableSegmentUpload to LLCSegmentCompletionHandlers.java's segmentCommitEndWithMetadata() to indicate if the server uploads segment to the controller during segment completion (default true to maintain backward compatibility)

      1. If the value is false, there is no need for the controller to move the segment file to its permeant location in a split commit.
      2. The changes in (i) implies that now the controller accepts segment commit metadata with empty (but not null) deep storage uri.
    2. Related to (a), add a new param uploadToControllerEnabled to SegmentCompletionProtocol.java's Params class.
    3. In PinotLLCRealtimeSegmentManager.java's updateCommittingSegmentZKMetadata() to setDownloadUrl for segment, use the descriptor's segment location instead of the vip address. 
      1. A related change here is to update the segment location field of the committing segment's descriptor after segment moving during a split commit.
  4. RealtimeValidationManager (PR Pending)
    1. During segment validation, for any segment without external storage uri, ask one server to upload it to the store and update the segment store uri in Helix.

Production (Seamless) Transition

  1. Let servers to download the segment store directly from external storage instead from controller. 
    1. This requires Pinot servers to send external storage segment uri to the controller.
  2. Enable splitCommit in both controller and server. 
  3. In LLRealtimeSegmentDataManager.doSplitCommit(),
    1. server skips uploading the segment file (based on a config flag)
    2. segment commitStart() and commitEnd() remain the same – we could do a further optimization here to combine the two calls but leave them as the current status for now. 
    3. With the above changes, this controller will not upload segment anymore because LLCSegmentCompletionHandlers.segmentUpload() is not called any more. Interestingly, skipping this call does not need any change on the controller side because Pinot controller does not change its internal state during segmentUpload().

  • No labels


  1. It is not clear to me what problem we are trying to solve here.  From the opening paragraph, I assume that the use case is that the deep store may be down at the time segment is completed, and we don't want to stop consumption in that case. (We are somehow OK with offline segments not getting uploaded to the cluster)

    Please specify the length of time you expect the deep store to be down. Is this a temporary failure we are talking about? (minutes) Or down for maintenance (may be hours)? Or, a serious data center failure (maybe days)? Or we just want to get rid of deep store in the realtime path (failed forever)? Depending on that, the solution may be different.

    1. Added the most common failure scenarios we expected. After talking to our HDFS team, the HDFS can be down (and it happened before) for hours. 

  2. The motivation is to remove Pinot's strict dependence on deep store for realtime tables. Here's why this is important:

    • If a deep store is unavailable for > 5 mins this will be considered as an outage (at least within our eco-system). We have seen deep store to be unavailable for 10s of mins to hours. 
    • Yes, I am proposing to get rid of deep store in the realtime path. 
    • For offline tables, we can still have a deep store system since the dependency is less critical. In fact, it could even be sitting in a different region than Pinot and that will still work.
  3. Meeting minutes Sep 30, 2019. Need to handle these cases:

    (1) have lead committer server commit to hdfs and wait for a while before uploadig metadatra

    (2) committer always adds its own hri during segment commit end

    (3) followers update their own url during online transition

    (4) validation manager periodically polls the metadata and reminds servers to upload to hdfs if it is not there (uses helix msg)

        also cleans up servers that are not there in the idealstate

    (5) What happens when hdfs upload succeeds but metadata upload fails?

    (6) while updating metadata from server, use two URIs instead of one URI. Need to do this in a backward compatible fashion

    (7) XX Relocation manager and rebalance need to work right.

            replace metadata with new server during swap. but we call jhelix rebalance.

            can we release withiout having this rebalance?

            No, we are ok, see point 8 below

    (8) Servers will add themselves on the segment metadata duirng ONLINE transition if they are NOT there in it.

  4. Consider adding a new download scheme called (say) "peer". So, a URI can be constructed as : "peer://path/to/server/download/" and added to the map https://github.com/apache/incubator-pinot/blob/master/pinot-common/src/main/java/org/apache/pinot/common/segment/fetcher/SegmentFetcherFactory.java#L49

    The advantages of this approach:

    1. In https://github.com/apache/incubator-pinot/blob/master/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java we do not need to do a null check and load from peer.
    2. We don't need to worry about null checks any other place (e.g. some metadata validator, or the realtime segment upload code to handle a special null case, not knowing whether it is a buggy server, or is this really  null).
    3. We can add other schemes later if we want to.

    Yes, we need to add a prioritizer which will prioritize the URIs to use for any segment. This can be added in the fetcher-and-loader class.