Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposed solution is in fact quite simple - leverage Pinot’s replication mechanism to alleviate the strong dependency on a deep storage system. By making a change to the segment commit protocol, we can remove this requirement. However, please note - the aim is not to completely get rid of the deep storage system. We probably still need to leverage deep storage for backups / archival purposes. 

Current segment completion protocol

...

Please read this section for understanding the current LLC segment completion protocol: Consuming and Indexing rows in Realtime#Segmentcompletionprotocol

...

The change to commit leader selection is optional for the purpose of deep storage decoupling. We will discuss in detail in the next section about how the above persisting options fit in the overall completion protocol. 

Modified segment completion protocol

...

Currently the ‘commit-leader’ makes a synchronous ‘commit’ call to the Pinot Controller to upload the segment being committed to a deep storage system. If this upload succeeds then the segment is deemed as committed and can be brought online.

...

  • The commit leader becomes the owner of the segment being committed. It will make this segment downloadable using a new HTTP end point.
  • During segment completion protocol, the committer server uploads the segment to the configured deep storage. It will wait a short period of time (say 5 seconds) and then send the segment metadata to the controller. The wait maximizes the chances that other server can download the segment from deep storage instead of the commit server. The deep storage uri will be added to the segment location list in Zookeeper.
  • Other server replicas will keep updating this 'segment location' during its transition to ONLINE: this will make the segment location information updated and allow operation like Rebalance() able to find data the right places.
  • When the controller asks a server to download the segment for whatever reason, the server will check the segment location list. It it will first try to download it from a deep storage uri stored in the zk as segment metadata to minimize the impact on servers. If there is no such uri, it will go down the list of servers to download the segment. More details on how to discover the server list is discover later.
  • All existing error handling routines remain unchanged. The next section lists diagrams showing the critical scenarios of (1) happy path, (2) controller failure and (3) slow server downloading from deep storage or Peer server. 

b) Use the RealtimeValidationManager to fix LLC segments only has a single copy. Currently the validation manager only fix the most recent 2 segments of each partition. With this change, it will scans all segments's Zk data, if it finds a segment does not have a deep storage copy, it will instruct a server to upload the segment to the deepstorage deep storage and then updates it segment location url with the deep storage uri

Segment Download

In Pinot low level consumer data ingestion, a server may need to download a segment instead of building by its own. Currently the server finds out the download location by reading it from the download url of the segment in zk: the download url can either be in controller (for localFS) or in deep storage.

In our new design, the segment upload to deep storage is asynchronous and best effort. As a result, the server can not depend only on deep storage. Instead we allow the server to download the segment from peer servers.  There are two options to discover the servers hosting a given segment:

  1. Store the server list in segment zk metadata for the server to read.
  2. Using the external view of the table to find out the latest servers hosting the segment (proposed by @jackie-jiang and also mentionedKishore G).

While (1) seems straightforward, (2) has the advantages of keeping the server list in sync with table state for events like table re-balance without extra writing/reading cost.  The complete segment download process works as follows:

  1. Try to download the segment from the download url in segment Zk.
  2. If (1) failed, get the external view of the table (from Controller) to find out the ONLINE servers hosting the segments.
  3. Download the segment from the server. If failed, move to the next server in the list. The download format is Gzip'd file of the segment on the source server. 

Failure handling

  •  What if deep storage upload success but the following metadata upload to controller fail?

...

  1. Interface and API change
    1. Segment location is changed from a single URI string to a string containing a list of URI.
    2. Server adds a new API to allow segment download.
  2. Segment completion protocol change
    1. Server directly uploads segments to a configured HDFS store.
  3. Server and controller FSM change
  4. Controller FSM change
  5. RealtimeValidationManager modification

...