(Noted: current doc is copied from google doc, you can request access to the doc to see changes/discussion. Some of the content are meant to be changed/update, we will keep improving it until it is accepted)
Motivation
Pinot is a columnar storage system which powers a lot of important use-cases at Uber. It is a realtime distributed OLAP engine that can provide second-level data freshness by ingesting kafka events and capacity to manage months of historical data load from various data sources such as HDFS, schemaless, etc. However, Pinot right now mostly functions as an append-only storage system. It doesn’t allow modify/delete of existing records with the exception of overriding all data within a time range with offline table. This limits the applicability of Pinot system due to a lot of use-cases requiring updates to their data due to the nature of their events or needs for data correction/backfill. In order to extend the capacity of Pinot and serve more use-cases, we are going to implement the upsert features in Pinot which allows users to update existing records in Pinot tables with its kafka data input stream.
How does upsert work in Pinot
This project will only target the full row updates for realtime tables in Pinot. The ingestion model with Pinot will still remain the same - specify a table and its input source, and Pinot will ingest data from that topic. The difference in the Pinot with upsert model is that users need to specify their table with additional information: primary key column, update strategy, timestamp column, etc. Pinot server will ingest data in the same fashion as origin Pinot by pulling data from kafka. However, when Pinot users query data in Pinot, only one record (usually the most recent ones) for each unique primary key will be returned/used in aggregation.
Below is an example of how Pinot upsert works:
- Original table:
UserId (PK column) | AccountBalance |
abc-12 | 100 |
abc-13 | 102 |
Select AccountBalance from <table> where UserId=”abc-12” => 100
Select avg(AccountBalance) from <table> => 101
- New data arrived:
UserId (PK column) | AccountBalance |
abc-12 | 100 |
abc-13 | 102 |
abc-12 | 200 |
With upsert enable:
Select AccountBalance from <table> where UserId=”abc-12” => 200
Select avg(AccountBalance) from <table> => 151
Without upsert enable:
Select AccountBalance from <table> where UserId=”abc-12” => [100, 200]
Select avg(AccountBalance) from <table> => 134
- New data arrived:
UserId (PK column) | AccountBalance |
abc-12 | 100 |
abc-13 | 102 |
abc-12 | 200 |
abc-13 | 300 |
With upsert enable:
Select AccountBalance from <table> where UserId=”abc-12” => 200
Select avg(AccountBalance) from <table> => 250
Without upsert enable:
Select AccountBalance from <table> where UserId=”abc-12” => [100, 200]
Select avg(AccountBalance) from <table> => 175.5
Requirements
The following will be the basic requirements of Pinot upsert features:
- Only support for full update to Pinot event
If Pinot users want to update a record, they will need to regenerate all the fields in the origin record with the updated values.
- Only support for Kafka-compatible queue ingestion model
Although Pinot is moving to an ingestion-independent model, we will still base our implementation on Kafka consumption due to lack of resources. The current design requires the input data has a unique offset/identifier for each message it ingested and it should be compatible with any queueing system with such feature.
- Single Pinot server/table can handle 10k/sec ingestion message rate
- Each Pinot server can handle 1 Billion records or 1TB storage
- Ingestion latency overhead compared to non-upsert model < 1min
- Query latency overhead compared to non-upsert model < 50%
- Data retention < 2 weeks
Table Definition/Schema for upsert table change
In the upsert Pinot table, the new schema will have the following changes:
- updateSemantic field
This is a string field to define whether a table is for upsert or not. It could be append/upsert. If it is defined as upsert, the schema verification will check for all the following fields to ensure the schema definition follows appropriate upsert definition.
- primaryKey field:
This is a string field to indicate which field is the primary key dimension of this upsert table.
- $ValidFrom field definition
This is a virtual column field to indicate which field is used for the validFrom definition and the class associated with it.
- $ValidUntil field definition
This is a virtual column field to indicate which field is used for the validUntil definition and the class associated with it.
- KafkaOffset field definition
This is a column field that stores the kafka offset associated with the current record.
In the table definition, we need to add the following field:
- updateSemantic field
This is a string field to define whether a table is for upsert or not. It could be append/upsert. If it is defined as upsert the table data manager will create data segments for upsert related logics
Table ingestion
When a realtime segment is assigned to a Pinot server for ingestion, the target Pinot server will receive the helix message and start the ingestion for the given segment. These are the changes to the ingestion process on each of the Pinot servers (in each segment replica):
- When segment ingestion starts, Pinot server will pick the output kafka topic T (for sending upsert events) based on table or Pinot server config.
- During the ingestion, Pinot server will extract the primary key from the current row based on the schema definition and the kafka offset for the current record. It will also extract the necessary metadata for the current record (segment name, offset, primary key etc) and produce it to the coordinator service input topic T.
- Messages produced to the topic T should be asynchronous and utilize Kafka at-least-once producing. The messages in T should be partitioned by the primary key of the record data.
- When the segment enters the commit stage, it will wait and ensure all kafka producer requests for the current rows for the current segment is finished (to ensure they are landed on kafka).
Fig 1. Overall design of ingestion of upsert.
Ingestion upsert events (IUE)
Events generated by Pinot ingestor will have the following information:
- Primary Key
- Segment Name (contains table name and input topic partition)
- Offset (input topic offset)
- Timestamp (used in record resolution)
Coordinator Service
Coordinator Service is a new distributed service in Pinot that handles the management of the latest primary key identification and dispatches an appropriate command to Pinot servers to update their virtual columns. In the following session, we will discuss two major parts of the coordinator service.
Key coordinator
Key coordinator is a component that handles the storage of the primary key and its associated context. It will update this internal storage when it receives a “more recent” (algorithm decided by user) message for the given primary key and sends an update event to the next component.
Single key coordinator
- Each individual key coordinator contains a set of key-value stores (rocksdb). These key-value stores will store the mapping between the primary key and its metadata (kafka offset, timestamp, segment name) and has time-to-live for its records the same as the table retention.
- Each individual key coordinator also has a set of assigned kafka input topics and partitions. It will ingest key coordinator input messages from those topic partitions and extract the message containing the record’s metadata (table name, segment name, offset, primary key, timestamp etc).
- Key coordinator then will perform a lookup for the given primary key in the appropriate key-value store and extract the meta-data associated with that primary key (if found).
- Key coordinator will produce a segment update event with type “insert” for the newly ingested record. It will also try to locate the most recent record in the key-value store with the primary key in the event. If there is a record found with the same primary key, key coordinator will perform record resolution, and produce a segment update event with type “delete” for the record that should be deleted. Key Coordinator will ensure all update events in the current batch is delivered and persisted to output queue and ensure the ordering is correct.
- For each insert/delete event (see update event message below), it contains the metadata about that record include the table name, segment name, offset, etc
- If a record associated with a primary key is updated/inserted, it will update its internal key-value store for the primary key and its associated context.
Segment update event message (SUE)
The event generated by key coordinator will have the following information:
- Segment Name (contains table name and input topic partition)
- Origin Offset (the offset of kafka message in the table’s input Kafka stream)
- Input Partition (IUE partition)
- Version Number
- Event type (insert/delete)
Key coordinators as a cluster
- Key coordinator controller will assign the list of input kafka topic partition that each key-coordinator should handle to the individual key coordinator.
- For each key coordinator input topic partition, it will have at least two key-coordinator consumers. One will act as the leader and the rest will act as a follower.
- Each key coordinator will have the state of either follower or leader. The follower and leader will both ingest from the same input kafka topics and update its internal key-value store, but only the leader will generate output to the delivery service. The follower will ingest data up to the last offset the leader committed.
- When the leader of a particular assignment failed, the follower will take over the leader role and start sending output the key coordinator output topic.
Delivery service
Delivery service is the service to send event output from key coordinator to appropriate Pinot servers. In the following session, we will discuss two possible solutions for handling the message delivery from the coordinator service to Pinot server.
Key coordinator based delivery service (kafka-based)
In this design, the key coordinator performs the delivery of update events to individual Pinot servers and provide historical information of update logs for older segments.
Deliver events from key coordinator to server with multiple topics
- Key coordinator will persist a map of Pinot servers and the segments they own. When Pinot server initializes a new segment, it will update key coordinators for the new segment it added.
- When a Pinot server is added to the upsert tenant, we will auto-create a kafka topic for this Pinot server. (we could also re-use the same kafka topic and partition based on Pinot server name)
- When key-coordinator generate an update event, it will send the message to the kafka topics for this table’s segment update event
Deliver events from key coordinator to server with single topic per Pinot table
- For each Pinot table, it will have an update topic for all update events generated by key-coordinator.
- When a key-coordinator generates an update event, it will send the message to the kafka topic correspond to the table.
- Pinot servers will ingest events corresponding to all tables on the current Pinot server and store update logs for segment on the current server.
Deliver existing update logs to server
When a Pinot server is assigned a new segment or a server goes through host rebuild, it will need to re-download all existing update log from upstream service. In this model, Pinot server will contact the corresponding key coordinator and fetch all historical updates for the segments it needs to download.
Update flow
Note: newEvent is an IUE event
oldValue = kvStore.get(newEvent.primaryKey); |
Segment -> server metadata management
Each key coordinator maintains a copy of server -> segment mapping (back by zk). One of the important components in this design is how we maintain the internal segment -> server metadata mapping. This metadata mapping is stored in memory and flushed to zk when a change is made to it. There are 2 major ways to update such metadata
- Add segment -> server mapping
This happens when Pinot controller assigns a new segment to a Pinot server (new consuming segment or rebalance operation). In this case, the server got the new segment assignment will send a request event to key-coordinators for this segment and request a copy of update logs. When key coordinator receives such a request, it will add this server to its internal segment -> server mapping.
- Remove segment -> server mapping
Key-coordinator will periodically scan the ideal state for segment -> server mapping from Pinot helix model and remove any segment -> server mapping that doesn’t exist on idealstate anymore.
Log Coordinator based delivery service (pull-base)
In this design, we will have another service call log coordinator. Each log coordinator will serve the update logs for a set of segments and Pinot server will fetch the recent update logs from the log coordinator
Log coordinator overall architecture
Log coordinator will be a distributed service, similar to what key coordinator does. Each log coordinator will handle the storage of updates logs for a set of segments in each log coordinator. Key coordinator will send the events to log coordinators through kafka and server will fetch all update logs through log coordinators.
Deliver events from key coordinator to log coordinator
To deliver the update events from key coordinator to log coordinator, key coordinator will send all updates events to key coordinator output topic, partitioned by segment name. Each log coordinator will have a list of assigned kafka partition for this topic and accumulate the update logs locally (either in-memory or on-disk).
Deliver events from log coordinator to server
To deliver the update events from log coordinator to server, log coordinator will expose an http API for server to fetch a list of update log given a list of segment names and the position of the last fetch event.
Log coordinator will also expose the list of segment it hosts through helix or HTTP api for server to discover the correct instance to fetch the update from.
Comparison of Kafka-based and pull-based design
Complexity
Kafka-based approach will be simpler to operate for the following:
- It doesn’t require an extra service & coding to handle logics
- Replication/retention management/fetching/offset API are already provided by kafka library
- Low-water-mark is hard to manage in log coordinator.
Operation
Both options have some operation overheads:
- Log coordinator-based method has two service to manage and handle the health status/storage/etc.
- Kafka-based method requires creating a new kafka topic each Pinot server we added to upsert cluster. (we could use one single kafka topic and partition based on server name)
However, Kafka-based approach operation cost mostly one-time cost and can be reduce through further optimization. Log coordinator might have bigger operation overhead due to more service to run/maintain/monitor.
Pluggable/design
Log coordinator is more pluggable as it Pinot server is de-coupled with coordinator service and allow us to extend this upsert component to more systems other than just Pinot.
Performance
Kafka-based approach is less efficient at segment bootstrap as it requires downloading data from multiple key-coordinator.
Log coordinator-based approach is less efficient at the fetching stage as it requires a lot of overhead on passing its segment information to the log coordinator.
Overall
Both solutions have similar trade-offs. For Kafka-based approach, we still believe it is better due to the simplicity in the architecture, lower operation overhead due to fewer moving parts, and the system (Kafka) involved has been proved to a more scalable solution. We would try to ensure the API layer of both solutions would be compatible so we can learn more about our approach as we work on bringing our solution to production.
Pinot segment updater
Pinot segment updater is a new component that runs inside Pinot server that handles fetching updates for the current Pinot server. Pinot segment updater consumes segment update event message (SUE) produced by the key coordinator service. It then apply the updates through the following steps:
- It will store the updates for that segment in a write-ahead-log and ensure it is sync to disk after every batch.
- It then applies the update events to the corresponding records of the affected segment’s virtual columns based on the SUE records’ <segment name, source offset> values.
- Once the changes are applied, it will update offset for the current Pinot server fetcher component.
- Segment updater also records the consumed Version values for each SUE partition consumed of the Pinot upsert table.
Handling server restart
When a Pinot server restarts, it will (1) reconsume the table input stream starting from the stored offset for any previous in memory segment and (2) applies the segment update log for each segment.
Pinot virtual columns
Pinot virtual column is a new type of column that store data that are not backed by physical storage of Pinot. It could be either metadata related to the data (segment name, docId, etc) or other data that are not persistent Pinot data. In the upsert case, we will need to use two different virtual columns: $ValidUntil, $ValidFrom. $validFrom and $validUntil columns store data about when a record was inserted/deleted from Pinot. These two columns are backed by in-memory data structure (long) and they are updated by Pinot segment updater.
$ValidFrom/$ValidUntil
$ValidFrom/$validUntil is a new virtual column to indicate when a record is marked as “insert”/”deleted” into this table. It is used to determine the logical timestamp of insertion/deletion. The value in this column indicates indicates the ordering of this event indicates the kafka offset of the message in key coordinator input topic (see key coordinator output message)
Pinot broker
Pinot broker needs several changes to support upsert use-case. When a Pinot broker starts and recognizes that it needs to support upsert tables, it will periodically fetch the ingestion progress from each Pinot server in the current tenant and calculate the low-water-mark of all Pinot server and use it to generate query when one comes in. When a request comes in for an upsert table, it will use the low-water-mark information and rewrite the query to include the low-water-mark that the broker has seen to ensure that all servers are returning consistent results to the broker after the query finished.
Deciding low-water-mark (LWM)
The following diagram shows the streaming ingesting for Pinot servers. During the ingestion process, a server will ingest updates from two processes: LLC ingestion from user inputs and Segment updater from key coordinator (KC) upsert events. We rely on the following event ordering to mark the progress of server ingestion and there is no event gap:
- LLC ingestion finish one segment before starting a new one.
- When LLC ingestion for a segment ends, all upsert events are sent to KC and ACKed (which means KC have received the events).
Low-water-mark for a given kafka partition on a server
LWM(table t, server i, partition j) = Min(LLC ingestion level on Pj, Segment Updater ingestion level on Pj)
Low-water-mark for a given kafka partition on a Pinot server is the smaller of latest LLC ingestion progress and segment updater ingestion progress.
Low-water-mark for a given Pinot table on a server
LWM(table t, server i) = Min(LWM(table t, server i, partition j)) for all j in 0->N where N the number of partitions
Low-water-mark for a given Pinot table on a particular server is the minimum of all partition ingestion progress for this table on this server
Low-water-mark for a given Pinot table
LWM(Broker, table t) = Min(LWM(table t, server i)) for all Server i serving this table
Low-water-mark for a given Pinot table on Pinot broker is the minimum for all servers serving this table.
Query rewrite
When a query comes to Pinot broker, we will need to rewrite a Pinot query before sending it to Pinot server in order to get consistent results. In the following, we will give an example of how such query rewrite is executed and multiple segments containing records with the same primary key:
Example 1
Pinot server 1 (LWM = 1200):
Segment 1:
PK | value | validUntil | validFrom |
a-1 | 1 | 1000 | 500 |
Pinot server 2 (LWM = 1100):
Segmnt 2-replica1:
PK | value | validUntil | validFrom |
a-1 | 2 | nil | 1000 |
Pinot server 3 (LWM = 900):
Segmnt 2-replica2 (PK a-1 not ingested yet):
PK | value | validUntil | validFrom |
... |
Incoming query:
Select * from upsert_table where PK = “a-1”;
Rewrote query:
Select * from upsert_table where PK = “a-1” and (validFrom < $low_water_mark and validUntil = nil or ValidUntil > $low_water_mark)
Expected result:
Return record PK=”a-1” and value = 1
Explanation:
If $low_water_mark = 800, then the query above will select the record from Pinot server 1 and skip the record from Pinot server 2/3. This is due to the fact that only a subset of Pinot servers have seen the update for those records.
What happens if we don’t use LWM:
If we don’t use LWM but only use the deletion marker, then queries dispatched to server 1 and server 3 will fetch 0 records for the given query because the record is deleted on server 1 and not ingested in server 3. This will create results that is inconsistent of the correct view of the data
Example 2
Pinot server 1 (LWM=1200):
Segment 1:
PK | value | validUntil | validFrom |
a-1 | 1 | 1000 | 500 |
Pinot server 2 (LWM = 1300):
Segmnt 2-replica1:
PK | value | validUntil | validFrom |
a-1 | 2 | nil | 1000 |
Incoming query:
Select * from upsert_table where PK = “a-1”;
Rewrote query:
Select * from upsert_table where PK = “a-1” and ( validFrom < $low_water_mark and validUntil = nil or ValidUntil > $low_water_mark)
Expected result:
Return record PK=”a-1” and value = 2
Explanation:
If $low_water_mark = 1200, then the query above will select the record from Pinot server 2 and skip the record from Pinot server 1. This is because all server contains records related to a-1 has already seen updates up to 1200. By selecting the record from Pinot server 2 and skip record from Pinot server 1, we will get a more up-to-date result from Pinot system.
Implication on message ordering from Low-water-mark
In the design of low-water-mark, we assume the messages we received in Pinot servers are in the order as the updates happen in key coordinator. If this ordering is not consistent, our low-water-mark is not really accurate due to our low-water-mark could be higher than future messages. Below is an example of what will happen if messages arrive out-of-order to Pinot server.
As you can see from the example above, if the key coordinator output message is unordered, update 2 could be either present or missing depends on which Pinot server we send the query to. This would cause data missing or duplication when Pinot broker issue query with low-water mark equals to 3. In order to prevent this, we will need to ensure message delivery between coordinator service and Pinot server preserve ordering of message.
In Kafka based approach, there are two ways to achieve this:
- Relies on kafka producer ordering
According to kafka documentation [1] [2], kafka producer can preserve ordering of message even if we are using asynchronous sending. However, there are a few constraints we need to satisfy:
- Key Coordinator kafka producer needs to be on 0.11.4 or after, with config of retries > 0 and max.in.flight=1
- Key Coordinator kafka producer need to partition based on their input partition number (ordering can only be preserved if all those messages are in the same output partition)
The above two constraints could impost limitation to kafka producer throughput, so we need to perform certain load testing to ensure it is working fine. There is also one rare scenario where kafka producer exhausts all retries and failed but subsequence message sending succeed. This might also cause out-of-order message.
- Re-order message on segment updater
Another way to achieve ordering on Pinot server is to let Pinot segment updater to buffer a list of messages and perform re-ordering. This will require of more memory and increased latency due to the need to buffer a large amount of data and re-order them if necessary. It would also require Pinot server to be able to set a large enough buffer according to the traffic volume from the key coordinator output topic and this solution cannot guarantee 100% accuracy ordering of the messages.
Risk
- Message produce to kafka needs to be persistent (at-least-once) and ordering to ensure low-water-mark calculation is correct and handle high-throughput and low-latency
- Asynchronous sending and setting acks=ALL on producer, with proper config on kafka broker
- Ordering might be tricky to implement due to the asynchronous nature of kafka producing
- We need to evaluate kafka performance as we are aiming for large throughput for at-least-once sending to kafka.
- Kafka topic throughput might be limited and hard to expand
- As we cannot simply expand the number of key coordinator, we will need to ensure we allocate enough capacity for key coordinator in the beginning.
- Key coordinator requires multiple steps in the update transaction and it might limit the throughput of this component
- We might need a way to roll back ingestion progress in order to fix data issue (check point on key coordinator)
- Memory usage of virtual columns & extra components
- Compatible with partial update solution
Feedback from LinkedIn team
The data is now mutable. Is it possible to avoid that?
This is a design tradeoff we have to make between query complexity and ingestion complexity. If we keep the data immutable, the query logic will be more complex and the query performance will be worse. The current design favors the query side, with the sacrifice of ingestion complexity and performance.
Why not using timestamps in the data input messages as version numbers?
The problem of using timestamp as version numbers is that timestamps are not necessarily monotonically increasing. There could be issues in the client side and the message delivery side that make the timestamps in the data input messages out of order. This is essentially relying on our client (to provide the correct timestamps) and we would like to avoid that in the design.
Having multiple filters for validFrom and validUntil in the queries will affect query performance
We are aware of this and have provided a solution for this. See https://docs.google.com/document/d/1iS2gmJnV7s3PtLhAguUetz7kOtBmwwed4aKHbXLih8Q
How do we handle Kafka partition expansion?
We will set the kafka partition number as part of the config and use customized partitioner to partition the data before we sent them to Kafka topics. This should allow us to control the partition assignment in case of kafka partition counts are changed for any reason.
1 Comment
Yupeng Fu
A revisit on the upsert design: https://docs.google.com/document/d/1qljEMndPMxbbKtjlVn9mn2toz7Qrk0TGQsHLfI--7h8/