You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 34 Next »

 Ting Chen and Chinmay Soman

Problem Statement:

In case of Pinot real-time tables, the current Stream Low Level Consumption (a.k.a. LLC) protocol requires the Pinot Controller to synchronously upload the segment being committed to a deep storage system (behind the PinotFS interface - eg: NFS / HDFS). Fundamentally, this requirement makes LLC not suitable when a Pinot installation has no deep storage or the deep storage is down for an extended period of time. 

This is a big concern because of the following reasons:

  • Makes Pinot deployment more complicated - we need to deploy a deep storage system wherever Pinot is deployed (especially complicated from a Multi-Zone, Multi-Region architecture).
  • Cost: Pinot has to store all the relevant segments on the local disk for query execution. In addition, Pinot cluster already replicates these segments internally. Storing another copy of these segments in the deep storage adds to the total storage cost. Note: The deep storage itself will have its own internal replication, adding to the total cost.
  • Availability concerns: The deep storage has to be as or more available as compared to Pinot. External deep storage can be unavailable to Pinot for uncontrollable causes lasting for hours. Once the outage happens, Pinot realtime tables can lag behind for hours which is unacceptable for data freshness requirements. This places a restriction on the deep storage technology (eg: HDFS architecture doesn't lend itself to be multi-zone aware and hence has availability concerns).
  • Ingestion latency: Making a synchronous call to upload the segment is an overhead and can potentially limit the ingestion speed and hence latency.

Proposed Solution

Principles of a better solution:

  • No synchronous call to an external system
  • Failure isolation: Reduce correlation failures between independent systems (eg: Pinot and HDFS)
  • Cost efficiency: reduce amount of data duplication
  • Operational ease: resulting system should have less moving parts. We could also simplify the handshake between controller and the servers.

The proposed solution is in fact quite simple - leverage Pinot’s replication mechanism to alleviate the strong dependency on a deep storage system. By making a change to the segment commit protocol, we can remove this requirement. However, please note - the aim is not to completely get rid of the deep storage system. We probably still need to leverage deep storage for backups / archival purposes. 

Current segment completion protocol:

Please read this section for understanding the current LLC segment completion protocol: Consuming and Indexing rows in Realtime#Segmentcompletionprotocol

In general, there are 2 important phases in this protocol:

1) Choose a commit leader: one of the replicas is "designated" as a segment commit leader. This replica/server is then responsible for "persisting" this segment.

2) Persist the Segment: In the current design, the commit leader sends the newly created segment to the Pinot Controller which in turn uploads it to deep storage using PinotFS API.

The Persisting Segment phase at its current form requires a dependency and synchronous calls to external storage system like NFS/HDFS/S3. We argue that allowing customizable policies to store the persisted segment will make Pinot LLC easier to operate in different installation environments. Some possible persisting policies other than the current one could be:

  • Use the segment copy in the committer server as the sole backup copy.
  • Use the segment copy in the committer server as well as one more copy asynchronously saved in an external storage system.

The change to commit leader selection is optional for the purpose of deep storage decoupling. We will discuss in detail in the next section about how the above persisting options fit in the overall completion protocol. 

Modified segment completion protocol:

Currently the ‘commit-leader’ makes a synchronous ‘commit’ call to the Pinot Controller to upload the segment being committed to a deep storage system. If this upload succeeds then the segment is deemed as committed and can be brought online.

Our new approach relies on the fact that "segmentLocation" is now a list of different HTTP end points. Any one location should be sufficient to download this segment (needless to say if one location is unavailable, we can retry using the next location). Exactly how this segment is persisted and when can it transition to ONLINE state can be done as follows:

a) Persisting a segment to both Pinot server (synchronously) and a configured deep storage (asynchronously and optional):

  • The commit leader becomes the owner of the segment being committed. It will make this segment downloadable using a new HTTP end point.
  • During segment completion protocol, the committer server uploads the segment to the configured deep storage. It will wait a short period of time (say 5 seconds) and then send the segment metadata to the controller. The wait maximizes the chances that other server can download the segment from deep storage instead of the commit server. The deep storage uri will be added to the segment location list in Zookeeper.
  • Other server replicas will keep updating this 'segment location' during its transition to ONLINE: this will make the segment location information updated and allow operation like Rebalance() able to find data the right places.
  • When the controller asks a server to download the segment for whatever reason, the server will check the segment location list. It will first try to download it from a deep storage uri to minimize the impact on servers. If there is no such uri, it will go down the list of servers to download the segment.
  • All existing error handling routines remain unchanged. The next section lists diagrams showing the critical scenarios of (1) happy path, (2) controller failure and (3) slow server downloading from deep storage or Peer server. 

b) Use the RealtimeValidationManager to fix LLC segments only has a single copy. Currently the validation manager only fix the most recent 2 segments of each partition. With this change, it will scans all segments's Zk data, if it finds a segment does not have a deep storage copy, it will instruct a server to upload the segment to the deepstorage and then updates it segment location url with the deep storage uri. 

c) Failure handling

  •  What if deep storage upload success but the following metadata upload to controller fail?

              When this happens, server commit will also fail. The segment completion protocol will retry commit. During the commit retry, the leader       server can check if the segment already exists. If so, it will not re-upload the segment. 

d) New interaction diagrams on segment completion protocol:

Compared with the existing protocol, the main differences are

  1. Server no longer upload segment data. Instead during the commit step, only segment metadata is upload to the controller (to be written into zk).
  2. Server can submit an asynchronous task to upload the segment build to a configured deep storage.
  3. When a slow server is asked to download a segment, it will go through the segmentLocation list for the segment. It will first try to download from the deep storage. If not available, it will then download from the servers. 
  4. To facilitate download from servers, Pinot servers will have a new REST api for segment download. 

                          


Pros:

  • Simple to implement
  • Minimal changes to the current controller/server FSMs used in the LLC segment protocols: some noticeable changes would be in the controller FSM – the Committer Uploading state now becomes obsolete for obvious reason and should be generalized to a state which reflects the segment is being persisted.  

Cons:


Dependency changes

  1. RealtimeSegmentRelocator

    The class moves completed segments to other servers. After the segment move, we should also update the segmentLocation of the LLC segments.

Implementation Phase

  1. Interface and API change
    1. Segment location is changed from a single URI string to a string containing a list of URI.
    2. Server adds a new API to allow segment download.
  2. Segment completion protocol change
  3. Server and controller FSM change
  4. RealtimeValidationManager modification

Appendix 

Alternative designs:

1) No coordination between Pinot servers

We can simplify the coordination required for selecting a segment commit leader by making a small change:

  • Add new fields to the Segment metadata:
    • 'endOffset’: specifies at what Kafka offset the segment is deemed as complete and is ready to become ONLINE.
    • Modify "segmentLocation" to be a list of String. This enables multiple owners for the corresponding segment. 
  • When the Pinot controller creates a new segment, calculate end offset based on the # rows we want to contain within this segment. Note that #rows is simply used as a guidance. It is not necessary for the offsets to be continuous. End offset is a way for all the replicas to agree in a deterministic fashion.
  • Modify the server so that it will stop ingesting when it reaches this 'endOffset'.
  • The Pinot Server should still call the ‘consumed’ REST API. Each such Pinot server becomes a "commit leader". In other words, in this new model there will be multiple "commit leaders" for each segment.

<TODO: Add diagram depicting the new flow>

The effect of this change is to introduce a deterministic flush trigger for all the Pinot replicas so that all the replicas construct the same segment from the input stream, without the need of an external coordination mechanism. In other words, all the replicas will know exactly when to stop the consumption based on this endOffset. 

Pros:

  • No additional coordination needed between servers and controller hence simplify the Segment commit FSM (Finite State Machine)
  • Potentially speed up ingestion throughput (since the existing coordination has additional RPC overhead).

Cons:

  • This will only work when we want to flush based on # rows within our input streams. This strategy will not work with time based flush approach. This might be OK since even if the current segment isn’t flushed, the in-memory state can still be queried by the broker.
  • One edge case here is if the segment is not flushed for a long time (low QPS input stream). In such a situation, if we restart the servers then we can lose the latest data (since the last committed segment). One way to get around this is to do periodic checkpointing of server’s in-memory state onto local disk (even if the segment is not committed yet).

Note: This modification is optional. We can keep the current approach of choosing a commit leader and persist the corresponding segment as described below.








  • No labels