...
Design of Low Level (Partition Level) consumption in Pinot
In this example, the replication factor for the table is set to two, so each of the three partitions are consumed by exactly two real-time servers. There are six PartitionConsumer instances across all servers, distributed across three servers (so each server gets two PartitionConsumers). Note that we could have had two servers, and each server would then get three PartitionConsumers.
Pinot assumes that the underlying stream has messages that are ordered according to their arrival within a partition, and that each message is located at a specific offset in the stream.
The metadata for a MutableSegment (in zookeeper) has the offset in the stream from which consumption should start for that segment. This starting offset value applies to all replicas of the MutableSegment. Pinot controller sets the value of the starting offset in the segment metadata in Zookeeper at the time the segment is created (which is either when the table is first created, or, when the previous segment in that partition is committed).
The consumed rows are stored as uncompressed in-memory data structures in the MutableSegment. These rows are lost if/when the server restarts (e.g. during software roll-out). In such cases the PartitionConsumers restart their consumption from the starting offset of that MutableSegment.
Every so often, PartitionConsumer turns a MutableSegment into an ImmutableSegment by going through the following steps (committing):
- Pause consumption
- Execute steps of the segment completion protocol (described below) to decide which replica commits the segment.
- Build an ImmutableSegment (same format as segments of Offline table)
- Commit the segment to the controller (In this step, the controller creates the next segment in the partition)
- Await signal for the next segment, and resume consumption, indexing rows into a new MutableSegment.
When a table is created, the controller determines the number of partitions for the table, and “creates” one segment per partition, spraying these segments evenly across all the tagged servers. The following steps are done as a part of creating each segment:
...