Pinot has supported realtime ingestion since inception. Low Level consumers were added in 2016/2017. There is more to be done in this area, see the "Contributions" section below

Pinot consumes rows from streaming data (such as Kafka) and serves queries on the data consumed thus far.

Two modes of consumption are supported in Pinot

High Level Consumers

TODO: Add a design description of how High Level consumers work

Low Level Consumers

Problem Statement

Using High Level consumers works has the following problems:

  1. Each consumer needs to consume from all partitions. So, if we run one consumer in a host, we are limited by the capacity of that host to consume all partitions of the topic, no matter what the ingestion rate is. A stream may provide a way by which multiple hosts can consume the same topic, with each host receiving a subset of the messages. However in this mode the stream may duplicate rows that across the machines when the machines go down and come back up. Pinot cannot afford that.
  2. A stream consumer has no control over the messages that are received. As a result, the consumers may have more or less same segments, but not exactly the same. This makes capacity expansion etc.operationally heavy (e.g. start a consumer and wait 5 days before enabling it to serve queries). Having equivalent segments allows us to store the segments in the controller (like the offline segments) and download them onto a new server during capacity expansion, drastically reducing the operational time and effort. If we have common realtime segments across servers, it allows the brokers to use different routing algorithms (like we do with offline segments). Otherwise, the broker needs to route a query to exactly one realtime server so that we do not see duplicate data in results.

Design of Low Level (Partition Level) consumption in Pinot

In this example, the replication factor for the table is set to two, so each of the three partitions are consumed by exactly two real-time servers. There are six PartitionConsumer instances across all servers, distributed across three servers (so each server gets two PartitionConsumers). Note that we could have had two servers, and each server would then get three PartitionConsumers.

Pinot assumes that the underlying stream has messages that are ordered according to their arrival within a partition, and that each message is located at a specific offset in the stream.

The metadata for a MutableSegment (in zookeeper) has the offset in the stream from which consumption should start for that segment. This starting offset value applies to all replicas of the MutableSegment. Pinot controller sets the value of the starting offset in the segment metadata in Zookeeper at the time the segment is created (which is either when the table is first created, or, when the previous segment in that partition is committed).

The consumed rows are stored as uncompressed in-memory data structures in the MutableSegment. These rows are lost if/when the server restarts (e.g. during software roll-out). In such cases the PartitionConsumers restart their consumption from the starting offset of that MutableSegment.

Every so often, PartitionConsumer turns a MutableSegment into an ImmutableSegment by going through the following steps (committing):

  1. Pause consumption
  2. Execute steps of the segment completion protocol (described below) to decide which replica commits the segment.
  3. Build an ImmutableSegment (same format as segments of Offline table)
  4. Commit the segment to the controller (In this step, the controller creates the next segment in the partition)
  5. Await signal for the next segment, and resume consumption, indexing rows into a new MutableSegment.

When a table is created, the controller determines the number of partitions for the table, and “creates” one segment per partition, spraying these segments evenly across all the tagged servers. The following steps are done as a part of creating each segment:

  • Segment metadata is created in Zookeeper. The segments are named as tableName__partitionNumber__segmentSeqNumber__Timestamp. For example: myTable__6__0__20180801T1647Z
  • Segment metadata is set with the segment completion criteria – the number of rows. The controller computes this number by dividing the rows threshold set in table configuration by the total number of segments of the table on the same server.
  • Segment metadata is set with the offset from which to consume. Controller determines the offset by querying the stream.
  • Table Idealstate is set with these segment names and the appropriate server instances in which the segments are hosted. The state is set to CONSUMING
  • Depending on the number of replicas set, each partition could be consumed in multiple servers.

When a server completes consuming the segment and reaches the end-criteria (either time or number of rows as per segment metadata), the server goes through a segment completion protocol sequence (described in diagrams below). The controller orchestrates all the replicas to reach the same consumption level, and allows one replica to “commit” a segment. The “commit” step involves:

  • The server uploads the completed segment to the controller
  • The controller updates that segments metadata to mark it completed, writes the end offset, end time, etc. in the metadata
  • The controller creates a new segment for the same partition (e.g. myTable__6__1__20180801T1805Z) and sets the metadata exactly like before, with the consumption offsets adjusted to reflect the end offset of the previous segmentSeqNumber.
  • The controller updates the IdealState to change the state of the completing segment to ONLINE, and add the new segment in CONSUMING state

As a first implementation, the end-criteria in the metadata points to the table config. It can be used at some point if we want to implement a more fancy end-criteria, perhaps based on traffic or other conditions, something that varies on a per-segment basis. The end-criteria could be number of rows, or time. If number of rows is specified, then the controller divides the specified number by the number of segments (of that table) on the same server, and sets the appropriate row threshold in the segment metadata. The consuming server simply obeys what is in segment metadata for row threshold.

We change the broker to introduce a new routing strategy that prefers ONLINE to CONSUMING segments, and ensures that there is at most one segment in CONSUMING state on a per partition basis in the segments that a query is to be routed to.

Important tuning parameters for Realtime Pinot

  • replicasPerPartition: This number indicates how many replicas are needed for each partition to be consumed from the stream
  • realtime.segment.flush.threshold.size: This parameter should be set to the total number of rows of a topic that a realtime consuming server can hold in memory. Default value is 5M. If the value is set to 0, then the number of rows is automatically adjusted such that the size of the segment generated is as per the setting realtime.segment.flush.desired.size
  • realtime.segment.flush.desired.size: Default value is “200M”. The setting is used only if realtime.segment.flush.threshold.size is set to 0
  • This parameter overrides realtime.segment.flush.threshold.size. Useful when migrating live from HLC to LLC
  • pinot.server.instance.realtime.alloc.offheap: Default is false. Set it to true if you want off-heap allocation for dictionaries and no-dictionary column
  • Default is false. Set it to true if you want off-heap allocation from DirectMemory (as opposed to MMAP)
  • pinot.server.instance.realtime.max.parallel.segment.builds: Default is 0 (meaning infinite). Set it to a number if you want to limit number of segment builds. Segment builds take up heap memory, so it is useful to have a max setting and limit the number of simultaneous segment builds on a single server instance JVM

Migrating use cases from HLC to LLC (No downtime)

Use cases can be migrated to use LLC instead of HLC while continuing to consume and serve data as well. Some use cases may need more attention, but the following steps should work in general. Overall the steps are:

  1. Prepare servers to consume both HLC and LLC streams
  2. Turn on LLC consumption in servers
  3. Turn off HLC consumption in servers


  • Set the new configurations as desired (replicasPerPartition,, Note that the “.llc” versions of the configs are to be used only if you want to do a live migration of an existing table from HLC to LLC and need to have different thresholds for LLC than HLC.
  • Set loadMode of segments to MMAP
  • Set configurations to use offheap (either direct or MMAP) for dictionaries and no-dictinary items (realtime.alloc.offheap,
  • If your stream is Kafka, add configurations for per-partition consumers
  • Increase the heap size (doubling it may be useful) since we will be consuming both HLC and LLC on the same machines now. Restart the servers

Consuming streams via HLC as well as LLC

Configure two consumers but keep routing to be KafkaHighLevel.

  • Change the stream.<your stream here>.consumer.type setting to be highLevel,simple. This starts both LLC and HLC consumers on the nodes.
  • Change stream.<your stream here> to have the value largest (otherwise, llc consumers will start consuming from the beginning which may interfere with HLC operations)
  • Check memory consumption and query response times.
  • Set the broker routingTableuilderName to be KafkaHighLevel so that queries are not routed to LLC until consumption is caught up.
  • Apply the table config
  • Restart brokers and servers
  • Wait for retention period of the table.

Disable HLC

  • Change the stream.<your stream here>.consumer.type setting to be “simple”
  • Remove the routingTableuilderName setting. This will start routing queries to LLC segments
  • Apply the table configs and restart the brokers and servers
  • The HLC segments will slowly age out on their own.

Migrating use cases from HLC to LLC (with Downtime)

If it is all right to take a down time, then the easiest way is to disable the table, do the last step of the previous migration steps, and restart the table once the consumption has caught up.

Details of LLC Design

LLC Zookeeper setup

The segment state model is extended to have a CONSUMING state

The segment metadata is extended to contain a state (to indicate whether the segment is still consuming, or has completed). The idealstate has the new CONSUMING state for the most recent segment in each partition

Server side state machine for a segment

This and other state machine diagrams were drawn using Graphviz

One possible implementation of the server side partition consumer:

Controller side state machine for a completing segment

Segment completion protocol

The ladder diagrams in the document have been drawn using mscgen.

When a pinot server reaches the end-criteria for a segment (either number of rows to consume or a time limit, or any other as indicated in segment metadata/configuration), it begins a segment completion protocol sequence with the lead controller. All the replicas of the segment execute the protocol. Exactly one of them gets to commit the segment with the lead controller.

The following sections demonstrate how this protocol works under happy path, and various failure scenarios

Segment Creation

The segment is created by adding a new segment metadata node in zookeeper and then updating the idealstate to set that segment in CONSUMING state. This should result in a (helix) state transition being sent to the server instance for that segment. The server then starts to consume from the offset as indicated in the segment metadata. The consumption stops when the end-criteria (as set in metadata, or as configured) is reached. Typically this is a row or time limit. Here is how the segment creation happens (at the time the table is created, or when the previous segment of a partition completes):

Segment Creation

Segment Commit: Happy Path 1

Segment Commit Happy Path 1

Segment Commit: Happy Path 2

Server takes too long to build a segment

Server takes too long to build a segment

In this case, we had a bug where some segments just took too long to build. Since then we have introduced a lease request from the server to the controller, periodically asking the controller for more time (essentially indicating to the controller that the server has not died, but is working on the committing the segment)

Designated committer fails

Designated committer fails

Controller failure during commit

Controller failure during commit

Multiple failures during commit

Multiple failures during commit

Split Commit

In order to support deep storage better, we decided to (with a configured option) split the segment commit into three steps. In the earlier diagrams the segment commit stage was a single step where the server issues a POST operation on /segmentCommit URI to the lead controller. The lead controller then does three steps:

  1. Verify (and mark) the committing server in the SegmentCompletionHandler FSM (by sending a segmentCommitStart event to the FSM)
  2. Upload the file to the correct place in the segment store
  3. Commit the segment metadata (by sending a segmentCommitEnd even to the FSM). This process uses the storage path of the segment and other segment metadata

In the split commit, the pinot server issues three independent operations to the controller:

  1. Invokes the /segmentCommitStart URI on the lead controller to do the first step above
  2. Invokes the /segmentUpload URI on any of the controllers to store the segment.
  3. Invokes the /segmentCommitEnd URI on the lead controller to commit the segment metadata.

If any of the steps fail, the pinot server does not continue with the process. The failure mode is handled the same way as the ladder diagrams for a single commit.

Here is the ladder diagram for a split commit.

Improvements to Realtime Pinot

Over the course of time after learning from deployment experience, the following improvements have been done in Pinot in the Realtime Low Level Consumers

Self-healing consumption

Permanent errors in Server during consumption

It is possible that pinot servers face intermittent problems consuming a segment. A common one is an intermittent issue with the stream (or network connectivity to stream source).  Pinot servers attempt to differentiate between such temporary and permanent exceptions. The retry a few times on temporary exceptions and then mark them as permanent.

Upon encountering a permanent exception the server informs the controller via a segmentStoppedConsuming message, indicating the reason due to which the segment stopped consuming. The controller, upon receipt of this message, marks the segment to be in OFFLINE state in the Idealstate. As a result, Helix walks the server through a CONSUMING to OFFLINE state transition, and the server safely releases all resources associated with the consuming segment.

In the picture above, Server-1 encountered a permanent exception and took action. If  Server-2 and Server-3 did not encounter any permanent exceptions or were able to recover with local retries, then the this consuming partition will have one less replica until such time as it is completed. When one of the other replicas complete the segment, Server-1 will get a Helix state transition from OFFLINE to ONLINE, download the segment and be able to return to normal redundancy for that segment. Note that the state machine and consumer for the next segment in the same stream partition is independent, and Server-1 will attempt to start consuming the next segment from that segment's starting offset, since it will get a Helix state transition for OFFLINE to CONSUMING for that segment.

Another scenario could be that Server-2 and Server-3 also face the same issue as Server-1 did (say, because the stream source had an issue). In this case, they will go through the same action as Server-1 and effectively the partition will be marked as OFFLINE for all three replicas. This state will render the table from not being able to consume that partition (or, more partitions if there were such errors across multiple partitions). Stream events starting from the starting offset of that partition will not be present in the responses to queries.

Controller errors during segment completion

Recall from the earlier ladder diagrams that the controller does the following steps when a segment is completed (or, committed):

  1. Mark the ZK segment metadata for the completing segment as DONE (as opposed to IN_PROGRESS). This is done in an atomic fashion so that this can be marked as DONE only once. Any other server segment completion action attempting to re-do this action will not succeed and commit will fail on the server side.
  2. Create a new segment ZK Metadata for the next segment in sequence. For example, if the previous segment name was mytable__17__427__20190225T1537Z (indicating that the segment belonged to the table mytable, created while consuming from stream partition 17, was the 427th segment in sequence on that partition, and started consuming roughly around 15:37 UTC on Feb 25, 2019), then the next segment should be named something like mytable__17__428__20190225T2127Z (assuming that the previous one completed around 21:27 UTC). The controller creates metadata for this segment (with the starting offset, etc.) in zookeeper (PROPERTYSTORE) and marks the status as IN_PROGRESS.
  3. Update the ideal state to change the state of  mytable__17__427__20190225T1537Z from CONSUMING to ONLINE and add a new segment in the idealstate for mytable__17__428__20190225T2127Z in CONSUMING state, on the same instances that the 427th segment existed. This is done as one atomic update to the idealstate, considering other updates may also happen to it (e,g. another stream partition could be completing at the same time)

It is possible that the controller is killed between these steps (e.g during an upgrade, or other errors such as OOM, etc.), so a segment could be left in dangling state, never to recover again.

New Stream partitions added

Streams may increase the number of a partitions (say, because of organic growth in ingestion rate, or other resource re-balance by the stream). In this case, the table will be left with no consumer for the new partitions, and data returned in the response will be partial.

Healing algorithm

The (lead) controller runs a periodic task (RealtimeSegmentValidationManager) that walks over realtime segments of all tables periodically (time period configurable) and looks for tables that may have some partitions in a state where:

  • All replicas of the latest segment are OFFLINE
  • All segments that have a dangling state, with either step 2 or step 3 not done.

The task then proceeds to fix these segments by:

  • Completing the partially done transactions
  • Creating a new CONSUMING segment if all replicas are in OFFLINE state. So, if segment 427 has all replicas in OFFLINE, segment 428 will be created with the same start offset as 427, attempting to not lose data. It is possible (depending on how late the healer runs) that Kafka has already lost this offset (due to retention limits), in which case it finds the earliest offset available and uses that offset (logging a warning and bumping a metric so the admin can be alerted for lost data).

The task also polls the stream to see if the number of partitions has increased (decrease in number of partitions is not supported), and adds new consumers for the new partitions, drawing from the servers tagged for that table.

Both segment completion and healing algorithm look for changes in server instances for a table, and adjust the ideal state so that new consuming segments are only allocated on the current set of tagged instances for a table. Therefore in order to expand capacity, the operator only needs to tag additional instances and over time, segments will be balanced across all instances (there is also an option to trigger re-balance right away, but it is not necessary in this case).


We need to be aware that there is a race condition between segment completion and segment repair. It is possible that a segment completes around the same time as the healer runs. A few conditions are possible here:

  1. The segment metadata commit may be partially done, but the healer may try to fix it, adding a new segment (with perhaps a new name because the time is slightly off). So, we may get into a situation where there are two consumers for the same partition, consuming form the same starting offset. Once this cycle starts, it is hard to break it and duplicate data will be returned for those stream partitions. Worse, the servers could run out of memory, etc. To avoid this, we check the timestamp of the znode of the completing segment metadata before fixing that segment. If that time stamp is less than MAX_ (currently set to 2 mins) then MAX_SEGMENT_COMPLETION_TIME then we skip fixing the segment. One of two things should happen:  Either the committing thread completes all three steps, or the next run of the healer fixes things.
  2. The committing thread could encounter a GC or otherwise be very slow. It could happen that the step 1 is done at time T, and step 3 at time T+10 mins. In that case, it is also possible that the healer runs during this time, and fixes the segment. In this case, we will run into a similar situation as above, with duplicate data being returned for that stream partition. To minimize the chances of this happening, we check the time before Step 3 of idealstate update. If the duration is already past MAX_SEGMENT_COMPLETION_TIME, then we abort the segment metadata commit and let the healer handle this.

Even with these checks in place, there are possibilities of race condition. We therefore strongly suggest that installations of Pinot should monitor long GC cycles (which could lead to other issues as well, so they are well-worth monitoring) and take action.

Off-heap memory allocation for consuming segments

Segments once completed, are loaded in off-heap memory (either mmap or direct allocation). However, for the consuming segment, most data structures were maintained on heap. Note that the consuming segments need the following data structures (that vary in some way with more rows ingested)

  1. Forward index: An array of dictionary IDs (for dictionary columns) or an array of actual values (for no dictionary columns). An array of arrays of dictionary IDs for multi-valued columns. This increases as number of rows increase.
  2. Dictionary: A bi-map that can map from a dictionary ID to an actual value as well as from a value to a dictionary ID. This increases initially but stabilizes after sometime when column values are repeated.
  3. Inverted index: RoaringBitmap index for each unique value of a column. In general, this will increase somewhat as new rows are ingested, but good compression may be achieved minimizing such increase.

While it keeps the code simple to us plain old Java data structures,  to maintain these objects could result in un-predictable (and un-measurable) increase in heap size, leading to long GC cycles, etc.

Other than inverted index, everything else is now off-heap.

Forward indices

All forward indices are allocated when a segment starts to consume, since we know the maximum number of entries in the array. This works for all dictionary encoded columns, and fixed-width no-dictionary columns. (See the section below on "Contributions" for other column types). For multi-value columns we assume a certain maximum number of values per row, and allocate the accordingly. The algorithms for storing the values ensure that we make use of the memory efficiently so that one row having more than the maximum does not cause us to run out of memory if another row compensates for it.


Off-heap dictionaries are tricky since they need a bi-map that is off-heap. We need to map a dicionary ID to value, and vice versa, in near constant time (for, these are done during query execution). To build a dictionary of (say) N fixed width (of width w bytes) values, we use two data structures:

  • A single block of N * w bytes that holds the values indexed by the dictionary ID (from 0 to N-1)
  • A list of blocks of size 4 * M bytes holding a matrix. A single row of the matrix has all the dictionary IDs that hash to the same row number. Each matrix holds at most M hash collisions, more matrices are created as we see more collisions.

The following picture illustrates the data structures (see BaseOffHeapMutableDictionary):

To get a value from a dictionary ID is a straight forward lookup on the first block of memory.

To get the dictionary ID from a value:

  • Hash the value to reach a particular row in the matrix
  • Walk the row along matrix, retrieving each dictionary ID. Use the dictionary ID to look up the value and compare it to the given value.

To insert a new value

  • Place the value to the first available slot in the first block (also happens to be the dictionary ID of the value)
  • Hash the value, and the walk the row of the matrix until a -1 is seen for the dictionary ID, creating new matrices as necessary.

Lock-free walking of lists is implemented by taking advantage of the fact that:

  • We can only add values to the dictionary, never remove one.
  • We allow querying a document only after it has been completely indexed. Until then the max document ID queried is retained as the older value, so the newer values are never looked up.

A small on-heap overflow hash map (of bounded size) is maintained to avoid the scenario of creating an extra matrix when only some values have more than M collisions. If the overflow fills up, new matrix block is created, moving the IDs from the overflow hash into the newly created matrix block.

For variable length columns we use (in addition to the forward lookup map above) a data structure that keeps the variable sized values as efficiently as possible. We allocate a block of memory, in which:

  • The values are stored contiguously starting from the end of the buffer, in reverse order
  • The starting offsets of the values are stored in the beginning of the buffer (each offset is 4 bytes in length). We allocate a new buffer when we have less than len(value) + 4 bytes left (in the middle of the buffer).

The following diagram illustrates the idea (see MutableOffHeapByteArrayStore)

Auto-tuning of segment sizes

The configuration for consumption limit used to be two-fold:

  • Row limit
  • Time limit

The row limit was taken from the table configuration and divided into the number of segments (of a table) hosted on a server instance, and set in the segment metadata. This was done so that machines with more segments got a smaller row limit and vice versa. (Applying these across tables is another matter)

Soon the configuration for the tables became hard, with the Operators having to guess how many rows could be held in memory. Each table was different, some with 3 columns and some with 120 columns. Some with minimal cardinalities (small dictionary) whereas others with strings and high cardinality.  So, we turned to setting the number of rows by reaching a target segment size.

We provided a utility which the operator could run on a sample data consumed over time, that provided the estimated memory used for a given number of server instances and segment size. The operator can then pick the sweet spot and pick a segment size configuration to set in the table config.

At the time of segment completion, we estimate the number of rows that need to be consumed in the next segment based on the current segment size and the number of rows consumed in the current segment, so that the target segment size is reached. Given that the segment sizes can potentially vary a lot due to some values in a single segment, we take into account the past values of segment sizes as well. Instead of maintaining the segment sizes over time, we maintain the ratio of segment size to number of rows, improving the ratio each time a segment completes, so that we can estimate the number of rows reasonably for the next segment.

Using the realtime provisioning helper

The provisioning helper allows an operator to choose the most reasonable option for a given QPS and installation. For example:

# Provision a few hosts with a single replica and allow consumption for (say) 12 hours. Assuming that we get one of the sample segments as: myTable__0__156__20180809T2006Z that consumed for 12 hours, we can
# use the following command to help in provisioning. The tool prints out the used memory (offheap) in each host, assuming the number of replicas as in table config. It also estimates the segment size
# given the number of hours. So, if the operator decides to deploy 6 hosts, and has a memory budget of 7 GB (total) per host, she can decide that 6 hours is the optimal time for a segment. The segment size for
# 6 hours turns out to be about 18.3MB. So, the operator can set the segment size in table config to be 18MB and let pinot roll. RealtimeProvisioningHelper -tableConfigFile myTableConfig.json  -numPartitions 8 -sampleCompletedSegmentDir myTable__0__156__20180809T2006Z -periodSampleSegmentConsumed 12h -numHosts 2,3,4,5,6 -numHours 2,4,6,8,10,12,14,16,18,20,22,24

Memory used per host
numHosts --> 2        |4        |6        |8        |10       |12       |14       |16       |
 2 --------> 19.61GB  |9.8GB    |6.68GB   |4.9GB    |4.01GB   |3.34GB   |2.9GB    |2.45GB   |
 4 --------> 20.18GB  |10.09GB  |6.88GB   |5.04GB   |4.13GB   |3.44GB   |2.98GB   |2.52GB   |
 6 --------> 20.67GB  |10.34GB  |7.05GB   |5.17GB   |4.23GB   |3.52GB   |3.05GB   |2.58GB   |
 8 --------> 21.17GB  |10.59GB  |7.22GB   |5.29GB   |4.33GB   |3.61GB   |3.13GB   |2.65GB   |
10 --------> 23.77GB  |11.88GB  |8.1GB    |5.94GB   |4.86GB   |4.05GB   |3.51GB   |2.97GB   |
12 --------> 22.17GB  |11.08GB  |7.56GB   |5.54GB   |4.53GB   |3.78GB   |3.27GB   |2.77GB   |
14 --------> 25.81GB  |12.91GB  |8.8GB    |6.45GB   |5.28GB   |4.4GB    |3.81GB   |3.23GB   |
16 --------> 25.26GB  |12.63GB  |8.61GB   |6.31GB   |5.17GB   |4.31GB   |3.73GB   |3.16GB   |
18 --------> 23.66GB  |11.83GB  |8.07GB   |5.91GB   |4.84GB   |4.03GB   |3.5GB    |2.96GB   |
20 --------> 26.25GB  |13.13GB  |8.95GB   |6.56GB   |5.37GB   |4.48GB   |3.88GB   |3.28GB   |
22 --------> 28.85GB  |14.42GB  |9.83GB   |7.21GB   |5.9GB    |4.92GB   |4.26GB   |3.61GB   |
24 --------> 25.15GB  |12.58GB  |8.57GB   |6.29GB   |5.14GB   |4.29GB   |3.72GB   |3.14GB   |
Segment size

 2 --------> 6.1MB    |
 4 --------> 12.2MB   |
 6 --------> 18.3MB   |
 8 --------> 24.4MB   |
10 --------> 30.5MB   |
12 --------> 36.6MB   |
14 --------> 42.7MB   |
16 --------> 48.8MB   |
18 --------> 54.9MB   |
20 --------> 61MB     |
22 --------> 67.1MB   |
24 --------> 73.2MB   |

Algorithm for computing the row limit for next segment

We assume that the ratio of segment size to number of rows is a constant for each table (say, R). (Since there is a fixed overhead for creating a segment even with one row, R is not really a constant, but is a good first approximation).  Each time a segment completes, we compute the value of R and adjust the learned value R to be  more accurate, as below:

Rn+1 = Rn * α + R * (1 - α),    where α < 1

Here, R is the ratio of the segment that is in the process of completing. We choose α to be a number higher than 0.5 so that we weigh the learned value more than the new value.

The number of rows threshold for the next segment is computed as:

Tn+1 = optimalSegmentSize / Rn+1

We adjust for the fact that we don’t want to go too rapidly towards the optimal segment size when we begin, so we ramp up slowly towards the optimal segment size. We start applying the equation above when the segment size is “near” the optimal value.

If we reach time threshold for a segment, then it is below the optimal segment size, so we adjust the number of row for the next segment to be slightly higher than the number of rows actually consumed by the current segment. This allows us not to over-allocate memory for low ingestion kafka topics.

Here is a more detailed version of the algorithm:

final optimalSegmentSize = getDesiredSegmentSizeFromTableConfig();
Double sizeToRowsRatio; // Value of 'R'
static final double ALPHA = 0.1

 * Returns the number of rows of a completing segment
 * @param segmentSize is the size of the last completed segment. 0 if this is the first segment
 * @param numRowsConsumed is the number of rows consumed in the segment that is completing.
 * @param stoppedDueToTimeLimit
public int computeRowThreshold(long segmentSize, int numRowsConsumed, boolean stoppedDueToTimeLimit) {
  if (segmentSize == 0) { // first segment of table
    return 100_000; // First guess on number of rows
  double currentRatio = (double)segmentSize/numRowsConsumed;
  // Update the value of R
  if (sizeToRowsRatio != null) {
	sizeToRowsRatio = sizeToRowsRatio * (1 - ALPHA) + currentRatio * ALPHA;
  } else {
    sizeToRowsRatio = currentRatio;
  if (stoppedDueToTimeLimit) {
    // Increase the number of rows a little bit beyond, aim to hit the row threshold next time
	newNumRows = numRowsConsumed * 1.1;
  } else {
	if (segmentSize <= 0.5 * optimalSegmentSize) {
      // Need quicker ramp up
	  newNumRows = numRowsConsumed * 1.5;
	} else if (segmentSize >= 2 * optimalSegmentSize) {
      // Need quicker ramp down
	  newNumRows /= 2;
	} else { // Within range, apply formula
	  newNumRows = optimalSegmentSize/sizeToRowsRatio; // Most of the time we will be in this case
  return newNumRows;

Note that there is no state maintained in zookeeper for this. The algorithm works in case of controller leadership transfer (or restart) since we set R based on completing segment if we don't have a previous value (thus naturally transferring over the value of R to the new leader.

Further, the re-computation of R is done only for one partition of a topic. We do not want to influence variations between stream partitions into the value of R, thus slowing the learning process (since we attach more weight to the previously learned values). We assume that all stream partitions will behave more or less the same way during the time period in which they complete segments. Indeed, we see in practice that all segments of a partition complete roughly at the same time, give or take a few minutes.

Learning from previous segments

Introduction of off-heap memory provided tremendous benefits, since the size of values stored are also small off-heap (e.g. an Integer is 16 bytes, Long an Double take up 24 bytes in 64-bit Java). However, it will be nice if we did not have to walk the lists in the string dictionary values, or in the reverse hash while looking up value to dictionary ID.

We therefore store the characteristics of a topic in a circular buffer, saved as a local file on the servers – one file per topic. See RealtimeSegmentStatsHistory. Th file stores the following items for the last 16 completed segments:

  • Cardinality of each column
  • Average width (in bytes) of each column

An average of the last 16 segments is used to compute the size of the buffers to be allocated (or mapped) when a segment starts up on a new host. Default values are chosen if the file is not present. For example, the size of the reverse hash bucket is taken to be the cardinality, so that we hopefully don't have to handle overflows beyond M.

We take care not to update the statistics from all segments completing around the same time within a host. They are likely to have the same characteristics. We want to make sure that we get the long-term trend of these metrics rather than weighing the most recent segments heavily.

Segments taking too long to build

A server may fail (or be taken down for an upgrade) during segment completion. In order to accommodate for this condition the state machine in the controller picks a new committer if it has not heard from the previous committer. However it could well be that the segment just takes too long to build (the segment is huge, needs multiple indexes to be built, GC cycles, whatever). Further, it is possible that every replica takes long to build a segment (just because of the characteristics of the data in the stream). This can lead to a situation where a server gains committer status, builds a segment, tries to commit it, but the commit fails because the controller has selected another committer in the meantime, so goes back to HOLD state. Eventually, the same server gets commit status again (because other replicas also went through the same phenomena), and the cycle goes on forever.

To alleviate this, two mechanisms have been added:

  • A server saves the segment it has built, so that if it ever gets committer status for the same segment (with the same end offset, of course) then it can re-use the previously built segment. The commit would pass in this case.
  • The controller exposes an API called extendBuildTime() that the server can invoke, to let the controller know that the server is still alive, building the segment. The server periodically calls this API so that the controller does not give up on it (until a pre-set maximum).

Separation of consuming and completed segments

The memory usage characteristics of consuming segments are very different from those of completed segments. Even with off-heap allocation where we may memory-map areas and use only some parts of it, the resident memory grows as new rows are consumed. Also, the server needs CPU to handle queries. In our experience we found that moving the completed segments off to another set of hosts helps us with hosting multiple tables in the same set of machines.

When a segment completes, it stays in the same host that was consuming the stream partition. Periodically, the (lead) controller executes a task that moves segments (with at most one replica down at any time) from the host that it was consumed in to a different tagged host. Over time, all replicas of the segments are moved away from the consuming hosts. This allows us to characterize the memory usage of consuming hosts independently, and scale them differently, as needed. See "Contributions" section below.

Pluggable Streams

At the time of initial open source, Pinot was coded to be dependent on Kafka for real time support. After significant re-organization an re-factorization of the code, Pinot is now independent of the underlying stream. The interfaces for adding support for new streams are now well-defined. There are still a few areas left to tackle to make this more generic and open to other streams, especially to support partition-level consumption with streams other than Kafka or EventHubs.

See the section below on contributions.

Contributions Welcome

The following will be good to have, contributions are invited:

  1. A tool to display the status of realtime table in a table or in a server. This tool or some derivative of the tool can be used later for automatic diagnosis and more self-healing techniques or simply for identifying probable root cause for issues:
    • What does the zk metadata say about consuming  segments? 
    • What are the current consuming offsets of the segments in each of the replicas?
    • What is the ingestion rate of the stream partitions?
    • What is the average segment size and how long does it normally take to make segments for this table?
    • How many permanent exceptions, and when did they occur?
    • How much memory are the servers using? Breakdown by indexes will be useful
  2. Support for variable byte no-dictionary columns in consuming segment. The implementation should be able to re-use the class OffHeapMutableByteArrayStore
  3. Support for off-heap inverted index for consuming segments
  4. Reducing heap usage while building completed segments. Currently, the segment builder is designed to read incoming data row by row, and build dictionaries in a hash table before translating them to the on-disk format of a dictionary. We can by-pass these steps since we already have the segment in columnar format (realtime consumers ingest rows but store in a columnar format for serving queries). Initial prototype has shown significant reduction in heap usage during segment builds. If we reduce heap usage (better yet, move completely to off-heap based segment completion) more segments can be packed into a single host, saving hardware cost. If a higher latency can be tolerated, these hosts could use SSDs and map off-heap memory from files (Pinot already provides primitives for doing these)
  5. Servers should attempt to complete the segment if it runs out of memory while consuming the segment. Ideally, we should be able to allocate a quota for each partition of a table, and the consuming segments should stop when the quota is reached. This will be useful in multi-tenant environments where we do not want one table to consume too much memory thus affecting another table to lose out.
  6. We should use minions to merge realtime segments. This is very useful for tables that end up having a large number of partitions ingesting data very slowly in each partition. Typically these are administrative mistakes (or perhaps constraints) but the result in Pinot is that there are too many small segments and too much memory allocated that is unused.
  7. CPU isolation algorithms allocating some CPU for consuming data, and leaving some for query serving.
  8. Smart allocation of a new consuming segment (created either when new stream partitions are detected or a previous segment is completed) depending on the amount of CPU and memory the instances have, for consumption as well as serving queries.
  9. Pluggable streams currently assumes that for LLC support, the stream should have an offset that increases monotonically, and is a single 64-bit (or smaller) value. Further, it assumes that offsets with higher value have more recent events. It will be awesome to remove some of these restrictions, e.g. an offset can be a string, perhaps a Comparable, but not necessarily so. Offsets may be URLs. Essentially, if a stream can support the idea of partitions, and, given an offset, a stream can return a set of events and the next offset in the partition (the one after the events delivered), that should be enough to keep the consumer going.
  10. The LLC consumer (as well as HLC) pause consumption while building segments. In theory this time is unbounded, but in practice we do not want to have huge segments that take minutes to build. A tuned installation probably loses seconds or tens of seconds while building and committing segments. We have seen asks consumption delays in the order of tens of milliseconds – essentially a pause-free consumer. 

  • No labels