Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposed solution is in fact quite simple - leverage Pinot’s replication mechanism to alleviate the strong dependency on a deep storage system. By making a change to the segment commit protocol, we can remove this requirement. However, please note - the aim is not to completely get rid of the deep storage system . We probably still need to leverage deep storage for backups / archival purposesbut not use the deep storage differently

Current segment completion protocol

...

  • Use the segment copy in the committer server as the sole backup copy.
  • Use the segment copy in the committer server as well as one more copy asynchronously saved in an external storage system.

The change to commit leader selection is optional for the purpose of deep storage decoupling. We will discuss in detail in the next section about how the above persisting options fit in the overall completion This doc focuses on modifying the phase 2 of the protocol. 

Modified segment completion protocol

...

Use the RealtimeValidationManager to fix LLC segments only has a single copy. Currently the validation manager only fix the most recent 2 segments of each partition. With this change, it will scans all segments's Zk data, if it finds a segment does not have a deep storage copy, it will instruct a server to upload the segment to the deep storage and then updates it segment location url with the deep storage uri. 

Segment Download

In Pinot low level consumer data ingestion, a server may need to download a segment instead of building by its own. Currently the server finds out the download location by reading it from the download url of the segment in zk: the download url can either be in controller (for localFS) or in deep storage.In our new design, the segment upload to deep storage is asynchronous and best effort. As a result, the server can not depend only on deep storage. Instead we allow the server to download the segment from peer servers (although it is not preferred).  There are two options to To discover the servers hosting a given segment:

...

, we utilize the external view of the table to find out the latest servers hosting the segment

...

While (1) seems straightforward, (2) has . It has the advantages of keeping the server list in sync with table state for events like table re-balance without extra writing/reading cost.  The complete segment download process works as follows:

  1. Try to download Download the segment from the download url in segment Zkdeep store uri found in the property store of the zk server.
    1. Note: the download is done with retires and exponential backoff time to cater for the time commit server needs for segment upload.
  2. If (1) failed, get the external view of the table (from Controller) to find out the ONLINE servers hosting the segments.
  3. Download the segment from the server. If failed, move to the next server in the list. The download format is Gzip'd file of the segment on the source server. 

...

  1. Let servers to download the segment store directly from external storage instead from controller. 
    1. This requires Pinot servers to send external storage segment uri to the controller.
  2. Enable splitCommit in both controller and server. 
  3. In LLRealtimeSegmentDataManager.doSplitCommit(),
    1. server skips uploading the segment file (based on a config flag)
    2. segment commitStart() and commitEnd() remain the same – we could do a further optimization here to combine the two calls but leave them as the current status for now. 
    3. With the above changes, this controller will not upload segment anymore because LLCSegmentCompletionHandlers.segmentUpload() is not called any more. Interestingly, skipping this call does not need any change on the controller side because Pinot controller does not change its internal state during segmentUpload().

Appendix 

Alternative designs

...

1) No coordination between Pinot servers

We can simplify the coordination required for selecting a segment commit leader by making a small change:

  • Add new fields to the Segment metadata:
    • 'endOffset’: specifies at what Kafka offset the segment is deemed as complete and is ready to become ONLINE.
    • Modify "segmentLocation" to be a list of String. This enables multiple owners for the corresponding segment. 
  • When the Pinot controller creates a new segment, calculate end offset based on the # rows we want to contain within this segment. Note that #rows is simply used as a guidance. It is not necessary for the offsets to be continuous. End offset is a way for all the replicas to agree in a deterministic fashion.
  • Modify the server so that it will stop ingesting when it reaches this 'endOffset'.
  • The Pinot Server should still call the ‘consumed’ REST API. Each such Pinot server becomes a "commit leader". In other words, in this new model there will be multiple "commit leaders" for each segment.

<TODO: Add diagram depicting the new flow>

The effect of this change is to introduce a deterministic flush trigger for all the Pinot replicas so that all the replicas construct the same segment from the input stream, without the need of an external coordination mechanism. In other words, all the replicas will know exactly when to stop the consumption based on this endOffset. 

Pros:

  • No additional coordination needed between servers and controller hence simplify the Segment commit FSM (Finite State Machine)
  • Potentially speed up ingestion throughput (since the existing coordination has additional RPC overhead).

Cons:

  • This will only work when we want to flush based on # rows within our input streams. This strategy will not work with time based flush approach. This might be OK since even if the current segment isn’t flushed, the in-memory state can still be queried by the broker.
  • One edge case here is if the segment is not flushed for a long time (low QPS input stream). In such a situation, if we restart the servers then we can lose the latest data (since the last committed segment). One way to get around this is to do periodic checkpointing of server’s in-memory state onto local disk (even if the segment is not committed yet).

...

for Segment Download

Segment Download

In Pinot low level consumer data ingestion, a server may need to download a segment instead of building by its own. Currently the server finds out the download location by reading it from the download url of the segment in zk: the download url can either be in controller (for localFS) or in deep storage.

In our new design, the segment upload to deep storage is asynchronous and best effort. As a result, the server can not depend only on deep storage. Instead we allow the server to download the segment from peer servers (although it is not preferred).  There are two options to discover the servers hosting a given segment:

  1. Store the server list in segment zk metadata for the server to read.
  2. Using the external view of the table to find out the latest servers hosting the segment (proposed by @jackie-jiang and also mentionedKishore G).

While (1) seems straightforward, (2) has the advantages of keeping the server list in sync with table state for events like table re-balance without extra writing/reading cost.  The complete segment download process works as follows:

  1. Try to download the segment from the download url in segment Zk.
  2. If (1) failed, get the external view of the table (from Controller) to find out the ONLINE servers hosting the segments.
  3. Download the segment from the server. If failed, move to the next server in the list. The download format is Gzip'd file of the segment on the source server.