You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Pinot has supported realtime ingestion since inception. Low Level consumers were added in 2016/2017


Pinot consumes rows from streaming data (such as Kafka) and serves queries on the data consumed thus far.

Two modes of consumption are supported in Pinot

High Level Consumers

TODO: Add a design description of how High Level consumers work

Low Level Consumers

Problem Statement

Using High Level consumers works has the following problems:

  1. Each consumer needs to consume from all partitions. So, if we run one consumer in a host, we are limited by the capacity of that host to consume all partitions of the topic, no matter what the ingestion rate is. A stream may provide a way by which multiple hosts can consume the same topic, with each host receiving a subset of the messages. However in this mode the stream may duplicate rows that across the machines when the machines go down and come back up. Pinot cannot afford that.
  2. A stream consumer has no control over the messages that are received. As a result, the consumers may have more or less same segments, but not exactly the same. This makes capacity expansion etc.operationally heavy (e.g. start a consumer and wait 5 days before enabling it to serve queries). Having equivalent segments allows us to store the segments in the controller (like the offline segments) and download them onto a new server during capacity expansion, drastically reducing the operational time and effort. If we have common realtime segments across servers, it allows the brokers to use different routing algorithms (like we do with offline segments). Otherwise, the broker needs to route a query to exactly one realtime server so that we do not see duplicate data in results.


Design of Low Level (Partition Level) consumption in Pinot

When a table is created, the controller determines the number of partitions for the table, and “creates” one segment per partition, spraying these segments evenly across all the tagged servers. The following steps are done as a part of creating each segment:

  • Segment metadata is created in Zookeeper. The segments are named as tableName__partitionNumber__segmentSeqNumber__Timestamp. For example: myTable__6__0__20180801T1647Z
  • Segment metadata is set with the segment completion criteria – the number of rows. The controller computes this number by dividing the rows threshold set in table configuration by the total number of segments of the table on the same server.
  • Segment metadata is set with the offset from which to consume. Controller determines the offset by querying the stream.
  • Table Idealstate is set with these segment names and the appropriate server instances in which the segments are hosted. The state is set to CONSUMING
  • Depending on the number of replicas set, each partition could be consumed in multiple servers.


When a server completes consuming the segment and reaches the end-criteria (either time or number of rows as per segment metadata), the server goes through a segment completion protocol sequence (described in diagrams below). The controller orchestrates all the replicas to reach the same consumption level, and allows one replica to “commit” a segment. The “commit” step involves:

  • The server uploads the completed segment to the controller
  • The controller updates that segments metadata to mark it completed, writes the end offset, end time, etc. in the metadata
  • The controller creates a new segment for the same partition (e.g. myTable__6__1__20180801T1805Z) and sets the metadata exactly like before, with the consumption offsets adjusted to reflect the end offset of the previous segmentSeqNumber.
  • The controller updates the IdealState to change the state of the completing segment to ONLINE, and add the new segment in CONSUMING state


As a first implementation, the end-criteria in the metadata points to the table config. It can be used at some point if we want to implement a more fancy end-criteria, perhaps based on traffic or other conditions, something that varies on a per-segment basis. The end-criteria could be number of rows, or time. If number of rows is specified, then the controller divides the specified number by the number of segments (of that table) on the same server, and sets the appropriate row threshold in the segment metadata. The consuming server simply obeys what is in segment metadata for row threshold.


We change the broker to introduce a new routing strategy that prefers ONLINE to CONSUMING segments, and ensures that there is at most one segment in CONSUMING state on a per partition basis in the segments that a query is to be routed to.

Important tuning parameters for Realtime Pinot

  • replicasPerPartition: This number indicates how many replicas are needed for each partition to be consumed from the stream
  • realtime.segment.flush.threshold.size: This parameter should be set to the total number of rows of a topic that a realtime consuming server can hold in memory. Default value is 5M. If the value is set to 0, then the number of rows is automatically adjusted such that the size of the segment generated is as per the setting realtime.segment.flush.desired.size
  • realtime.segment.flush.desired.size: Default value is “200M”. The setting is used only if realtime.segment.flush.threshold.size is set to 0
  • realtime.segment.flush.threshold.size.llc: This parameter overrides realtime.segment.flush.threshold.size. Useful when migrating live from HLC to LLC
  • pinot.server.instance.realtime.alloc.offheap: Default is false. Set it to true if you want off-heap allocation for dictionaries and no-dictionary column
  • pinot.server.instance.realtime.alloc.offheap.direct: Default is false. Set it to true if you want off-heap allocation from DirectMemory (as opposed to MMAP)
  • pinot.server.instance.realtime.max.parallel.segment.builds: Default is 0 (meaning infinite). Set it to a number if you want to limit number of segment builds. Segment builds take up heap memory, so it is useful to have a max setting and limit the number of simultaneous segment builds on a single server instance JVM

Migrating use cases from HLC to LLC (No downtime)

Use cases can be migrated to use LLC instead of HLC while continuing to consume and serve data as well. Some use cases may need more attention, but the following steps should work in general. Overall the steps are:

  1. Prepare servers to consume both HLC and LLC streams
  2. Turn on LLC consumption in servers
  3. Turn off HLC consumption in servers

Preparation


  • Set the new configurations as desired (replicasPerPartition, realtime.segment.flush.threshold.size.llc, realtime.segment.flush.threshold.time.llc). Note that the “.llc” versions of the configs are to be used only if you want to do a live migration of an existing table from HLC to LLC and need to have different thresholds for LLC than HLC.
  • Set loadMode of segments to MMAP
  • Set configurations to use offheap (either direct or MMAP) for dictionaries and no-dictinary items (realtime.alloc.offheap, realtime.alloc.offheap.direct)
  • If your stream is Kafka, add stream.kafka.broker.list configurations for per-partition consumers
  • Increase the heap size (doubling it may be useful) since we will be consuming both HLC and LLC on the same machines now. Restart the servers

Consuming streams via HLC as well as LLC

Configure two consumers but keep routing to be KafkaHighLevel.


  • Change the stream.<your stream here>.consumer.type setting to be highLevel,simple. This starts both LLC and HLC consumers on the nodes.
  • Change stream.<your stream here>.consumer.prop.auto.offset.reset to have the value largest (otherwise, llc consumers will start consuming from the beginning which may interfere with HLC operations)
  • Check memory consumption and query response times.
  • Set the broker routingTableuilderName to be KafkaHighLevel so that queries are not routed to LLC until consumption is caught up.
  • Apply the table config
  • Restart brokers and servers
  • Wait for retention period of the table.

Disable HLC


  • Change the stream.<your stream here>.consumer.type setting to be “simple”
  • Remove the routingTableuilderName setting. This will start routing queries to LLC segments
  • Apply the table configs and restart the brokers and servers
  • The HLC segments will slowly age out on their own.

Migrating use cases from HLC to LLC (with Downtime)

If it is all right to take a down time, then the easiest way is to disable the table, do the last step of the previous migration steps, and restart the table once the consumption has caught up.

Details of LLC Design

LLC Zookeeper setup

The segment state model is extended to have a CONSUMING state



The segment metadata is extended to contain a state (to indicate whether the segment is still consuming, or has completed). The idealstate has the new CONSUMING state for the most recent segment in each partition


Server side state machine for a segment


One possible implementation of the server side partition consumer:



Controller side state machine for a completing segment


Segment completion protocol

The ladder diagrams in the document have been drawn using mscgen.

When a pinot server reaches the end-criteria for a segment (either number of rows to consume or a time limit, or any other as indicated in segment metadata/configuration), it begins a segment completion protocol sequence with the lead controller. All the replicas of the segment execute the protocol. Exactly one of them gets to commit the segment with the lead controller.

The following sections demonstrate how this protocol works under happy path, and various failure scenarios

Segment Creation

The segment is created by adding a new segment metadata node in zookeeper and then updating the idealstate to set that segment in CONSUMING state. This should result in a (helix) state transition being sent to the server instance for that segment. The server then starts to consume from the offset as indicated in the segment metadata. The consumption stops when the end-criteria (as set in metadata, or as configured) is reached. Typically this is a row or time limit. Here is how the segment creation happens (at the time the table is created, or when the previous segment of a partition completes):

Segment Creation

Segment Commit: Happy Path 1

Segment Commit Happy Path 1

Segment Commit: Happy Path 2

Server takes too long to build a segment

Server takes too long to build a segment

In this case, we had a bug where some segments just took too long to build. Since then we have introduced a lease request from the server to the controller, periodically asking the controller for more time (essentially indicating to the controller that the server has not died, but is working on the committing the segment)

Designated committer fails

Designated committer fails

Controller failure during commit

Controller failure during commit

Multiple failures during commit

Multiple failures during commit

Split Commit

In order to support deep storage better, we decided to (with a configured option) split the segment commit into three steps. In the earlier diagrams the segment commit stage was a single step where the server issues a POST operation on /segmentCommit URI to the lead controller. The lead controller then does three steps:

  1. Verify (and mark) the committing server in the SegmentCompletionHandler FSM (by sending a segmentCommitStart event to the FSM)
  2. Upload the file to the correct place in the segment store
  3. Commit the segment metadata (by sending a segmentCommitEnd even to the FSM). This process uses the storage path of the segment and other segment metadata


In the split commit, the pinot server issues three independent operations to the controller:

  1. Invokes the /segmentCommitStart URI on the lead controller to do the first step above
  2. Invokes the /segmentUpload URI on any of the controllers to store the segment.
  3. Invokes the /segmentCommitEnd URI on the lead controller to commit the segment metadata.

If any of the steps fail, the pinot server does not continue with the process. The failure mode is handled the same way as the ladder diagrams for a single commit.

Here is the ladder diagram for a split commit.

LLC Extensions

Over the course of time after learning from deployment experience, the following improvements have been done in Pinot in the Realtime Low Level Consumers

Self-healing consumption

Permanent errors in Server during consumption

It is possible that pinot servers face intermittent problems consuming a segment. A common one is an intermittent issue with the stream (or network connectivity to stream source).  Pinot servers attempt to differentiate between such temporary and permanent exceptions. The retry a few times on temporary exceptions and then mark them as permanent.

Upon encountering a permanent exception the server informs the controller via a segmentStoppedConsuming message, indicating the reason due to which the segment stopped consuming. The controller, upon receipt of this message, marks the segment to be in OFFLINE state in the Idealstate. As a result, Helix walks the server through a CONSUMING to OFFLINE state transition, and the server safely releases all resources associated with the consuming segment.


In the picture above, Server-1 encountered a permanent exception and took action. If  Server-2 and Server-3 did not encounter any permanent exceptions or were able to recover with local retries, then the this consuming partition will have one less replica until such time as it is completed. When one of the other replicas complete the segment, Server-1 will get a Helix state transition from OFFLINE to ONLINE, download the segment and be able to return to normal redundancy for that segment. Note that the state machine and consumer for the next segment in the same stream partition is independent, and Server-1 will attempt to start consuming the next segment from that segment's starting offset, since it will get a Helix state transition for OFFLINE to CONSUMING for that segment.

Another scenario could be that Server-2 and Server-3 also face the same issue as Server-1 did (say, because the stream source had an issue). In this case, they will go through the same action as Server-1 and effectively the partition will be marked as OFFLINE for all three replicas. This state will render the table from not being able to consume that partition (or, more partitions if there were such errors across multiple partitions). Stream events starting from the starting offset of that partition will not be present in the responses to queries.

Controller errors during segment completion

Recall from the earlier ladder diagrams that the controller does the following steps when a segment is completed (or, committed):

  1. Mark the ZK segment metadata for the completing segment as DONE (as opposed to IN_PROGRESS). This is done in an atomic fashion so that this can be marked as DONE only once. Any other server segment completion action attempting to re-do this action will not succeed and commit will fail on the server side.
  2. Create a new segment ZK Metadata for the next segment in sequence. For example, if the previous segment name was mytable__17__427__20190225T1537Z (indicating that the segment belonged to the table mytable, created while consuming from stream partition 17, was the 427th segment in sequence on that partition, and started consuming roughly around 15:37 UTC on Feb 25, 2019), then the next segment should be named something like mytable__17__428__20190225T2127Z (assuming that the previous one completed around 21:27 UTC). The controller creates metadata for this segment (with the starting offset, etc.) in zookeeper (PROPERTYSTORE) and marks the status as IN_PROGRESS.
  3. Update the ideal state to change the state of  mytable__17__427__20190225T1537Z from CONSUMING to ONLINE and add a new segment in the idealstate for mytable__17__428__20190225T2127Z in CONSUMING state, on the same instances that the 427th segment existed. This is done as one atomic update to the idealstate, considering other updates may also happen to it (e,g. another stream partition could be completing at the same time)

It is possible that the controller is killed between these steps (e.g during an upgrade, or other errors such as OOM, etc.), so a segment could be left in dangling state, never to recover again.

New Stream partitions added

Streams may increase the number of a partitions (say, because of organic growth in ingestion rate, or other resource re-balance by the stream). In this case, the table will be left with no consumer for the new partitions, and data returned in the response will be partial.

Healing algorithm

The (lead) controller runs a periodic task (RealtimeSegmentValidationManager) that walks over realtime segments of all tables periodically (time period configurable) and looks for tables that may have some partitions in a state where:

  • All replicas of the latest segment are OFFLINE
  • All segments that have a dangling state, with either step 2 or step 3 not done.

The task then proceeds to fix these segments by:

  • Completing the partially done transactions
  • Creating a new CONSUMING segment if all replicas are in OFFLINE state. So, if segment 427 has all replicas in OFFLINE, segment 428 will be created with the same start offset as 427, attempting to not lose data. It is possible (depending on how late the healer runs) that Kafka has already lost this offset (due to retention limits), in which case it finds the earliest offset available and uses that offset (logging a warning and bumping a metric so the admin can be alerted for lost data).

The task also polls the stream to see if the number of partitions has increased (decrease in number of partitions is not supported), and adds new consumers for the new partitions, drawing from the servers tagged for that table.

Both segment completion and healing algorithm look for changes in server instances for a table, and adjust the ideal state so that new consuming segments are only allocated on the current set of tagged instances for a table. Therefore in order to expand capacity, the operator only needs to tag additional instances and over time, segments will be balanced across all instances (there is also an option to trigger re-balance right away, but it is not necessary in this case).

Caveat

We need to be aware that there is a race condition between segment completion and segment repair. It is possible that a segment completes around the same time as the healer runs. A few conditions are possible here:

  1. The segment metadata commit may be partially done, but the healer may try to fix it, adding a new segment (with perhaps a new name because the time is slightly off). So, we may get into a situation where there are two consumers for the same partition, consuming form the same starting offset. Once this cycle starts, it is hard to break it and duplicate data will be returned for those stream partitions. Worse, the servers could run out of memory, etc. To avoid this, we check the timestamp of the znode of the completing segment metadata before fixing that segment. If that time stamp is less than MAX_ (currently set to 2 mins) then MAX_SEGMENT_COMPLETION_TIME then we skip fixing the segment. One of two things should happen:  Either the committing thread completes all three steps, or the next run of the healer fixes things.
  2. The committing thread could encounter a GC or otherwise be very slow. It could happen that the step 1 is done at time T, and step 3 at time T+10 mins. In that case, it is also possible that the healer runs during this time, and fixes the segment. In this case, we will run into a similar situation as above, with duplicate data being returned for that stream partition. To minimize the chances of this happening, we check the time before Step 3 of idealstate udpate. If the duration is already past MAX_SEGMENT_COMPLETION_TIME, then we abort the segment metadata commit and let the healer handle this.

Even with these checks in place, there are possibilities of race condition. We therefore strongly suggest that installations of Pinot should monitor long GC cycles (which could lead to other issues as well, so they are well-worth monitoring) and take action.

Segments taking too long to build

A server may fail (or be taken down for an upgrade) during segment completion. In order to accommodate for this condition the state machine in the controller picks a new committer if it has not heard from the previous committer. However it could well be that the segment just takes too long to build (the segment is huge, needs multiple indexes to be built, GC cycles, whatever). Further, it is possible that every replica takes long to build a segment (just because of the characteristics of the data in the stream). This can lead to a situation where a server gains committer status, builds a segment, tries to commit it, but the commit fails because the controller has selected another committer in the meantime, so goes back to HOLD state. Eventually, the same server gets commit status again (because other replicas also went through the same phenomena), and the cycle goes on forever.

To alleviate this, two mechanisms have been added:

  • A server saves the segment it has built, so that if it ever gets committer status for the same segment (with the same end offset, of course) then it can re-use the previously built segment. The commit would pass in this case.
  • The controller exposes an API called extendBuildTime() that the server can invoke, to let the controller know that the server is still alive, building the segment. The server periodically calls this API so that the controller does not give up on it (until a pre-set maximum).

Off-heap memory allocation for consuming segments

Segments once completed, are loaded in off-heap memory (either mmap or direct allocation). However, for the consuming segment, most data structures were maintained on heap. Note that the consuming segments need the following data structures (that vary in some way with more rows ingested)

  1. Forward index: An array of dictionary IDs (for dictionary columns) or an array of actual values (for no dictionary columns). An array of arrays of dictionary IDs for multi-valued columns. This increases as number of rows increase.
  2. Dictionary: A bi-map that can map from a dictionary ID to an actual value as well as from a value to a dictionary ID. This increases initially but stabilizes after sometime when column values are repeated.
  3. Inverted index: RoaringBitmap index for each unique value of a column. In general, this will increase somewhat as new rows are ingested, but good compression may be achieved minimizing such increase.

While it keeps the code simple to us plain old Java data structures,  to maintain these objects could result in un-predictable (and un-measurable) increase in heap size, leading to long GC cycles, etc.

Other than inverted index, everything else is now off-heap.

Forward indices

All forward indices are allocated when a segment starts to consume, since we know the maximum number of entries in the array. This works for all dictionary encoded columns, and fixed-width no-dictionary columns. (See the section below on contributions for other column types). For multi-value columns we assume a certain maximum number of values per row, and allocate the accordingly. The algorithms for storing the values ensure that we make use of the memory efficiently so that one row having more than the maximum does not cause us to run out of memory if another row compensates for it.

Dictionaries

Off-heap dictionaries are tricky since they need a bi-map that is off-heap. 

Auto-tuning of segment sizes

Learning from previous segments

Separation of consuming and completed segments

The memory usage characteristics of consuming segments are very different from those of completed segments. Even with off-heap allocation where we may memory-map areas and use only some parts of it, the resident memory grows as new rows are consumed. Also, the server needs CPU 

Contributions Welcome

The following will be good to have, contributions are invited:

  1. A tool to display the status of realtime table in a table or in a server. This tool or some derivative of the tool can be used later for automatic diagnosis and more self-healing techniques or simply for identifying probable root cause for issues:
    • What does the zk metadata say about consuming  segments? 
    • What are the current consuming offsets of the segments in each of the replicas?
    • What is the ingestion rate of the stream partitions?
    • What is the average segment size and how long does it normally take to make segments for this table?
    • How many permanent exceptions, and when did they occur?
    • How much memory are the servers using? Breakdown by indexes will be useful
  2. Support for variable byte no-dictionary columns in consuming segment. The implementation should be able to re-use the class OffHeapMutableByteArrayStore
  3. Support for off-heap inverted index for consuming segments
  4. Reducing heap usage while building completed segments. Currently, the segment builder is designed to read incoming data row by row, and build dictionaries in a hash table before translating them to the on-disk format of a dictionary. We can by-pass these steps since we already have the segment in columnar format (realtime consumers ingest rows but store in a columnar format for serving queries). Initial prototype has shown significant reduction in heap usage during segment builds. If we reduce heap usage (better yet, move completely to off-heap based segment completion) more segments can be packed into a single host, saving hardware cost. If a higher latency can be tolerated, these hosts could use SSDs and map off-heap memory from files (Pinot already provides primitives for doing these)
  5. Servers should attempt to complete the segment if it runs out of memory while consuming the segment. Ideally, we should be able to allocate a quota for each partition of a table, and the consuming segments should stop when the quota is reached. This will be useful in multi-tenant environments where we do not want one table to consume too much memory thus affecting another table to lose out.
  6. We should use minions to merge realtime segments. This is very useful for tables that end up having a large number of partitions ingesting data very slowly in each partition. Typically these are administrative mistakes (or perhaps constraints) but the result in Pinot is that there are too many small segments and too much memory allocated that is unused.
  7. CPU isolation algorithms allocating some CPU for consuming data, and leaving some for query serving.
  8. Smart allocation of a new consuming segment (created either when new stream partitions are detected or a previous segment is completed) depending on the amount of CPU and memory the instances have, for consumption as well as serving queries.


  • No labels