Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]Released

Discussion thread: 

JIRA or Github Issue: 

Released: <Doris Version>1.2.0

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.> DSIP-018: Support Merge-On-Write implementation for UNIQUE KEY data model

Design Doc in PDF:For convenience of Chinese developer, I also uploaded the Chinese version design doc in PDF(使用中文的开发者也可以参考PDF版的中文版设计文档)

View file
nameUnique-Key的主键模型支持.pdf
height250
 

Motivation

Doris' Unique-Key data model is widely used in scenarios with update requirements such as Flink-CDC synchronization, user portrait, and e-commerce orders, but Unique-Key currently uses the Merge-On-Read implementation. Due , the query performance is not good due to the following reasons, query performance is poor:

  1. Since it is sIn merge-on-read implementation, it is necessary to read all the filtered data during the query, and perform a merge sort to confirm which row of data is the latest data. This merge sort brings additional cpu-cost

  2. Since it is impossible to confirm whether the data of a row in a segment is the latest data, UniqueKey Unique-Key does not support push-down of aggregate function predicates, which increases the amount of data read

The following figure is a simplified representation of the execution process of the Unique-Key model:

Image RemovedImage Added

Related Research

We investigated the implementation of other mainstream data warehouses for the data update model. In addition to the Merge-On-Read used by UniqueKeyUnique-Key, there are basically the following categories

Merge-On-Write(Mark Delete and Insert)

  • Program Solution overview:

    • Checks if it is an update request via a primary key index and locates to which line of the file the overwritten key is in

    • Delete Mark the key as delete by adding the rowid of the key through into a bitmap record tag, and filter data based on the rowid recorded in the bitmap when reading

  • AdvantagesPros: key comparison is avoided when reading, and various predicate push-down and secondary indexes are supported
  • Cons: Additional query index and cost of updating bitmap on write
  • Representative Representational system: Hologres, Aliyun ADB, ByteHouse

Copy-On-Write

  • Program Solution overview:

    • When writing/updating data, first read the data in original file is directly merged synchronously to directly, then update the data and generate a new version of the file. Even if only one byte of new data is committed, the entire column data file needs to be rewritten.

  • AdvantagesPros: Highest query efficiency

  • DisadvantagesCons: The cost of writing rewriting data is very high, suitable for scenarios of low-frequency mass update dataloading, high-frequency reading scenarios

  • Representative Representational system: Delta Lake, Hudi

Delta-Store

  • Program Solution overview:

    • Divide data into base data and delta data. Each key is unique in base data

    • When writing/updating data, first query the base data, if there the key is found, then it is an update request, write the updated data would be written to the delta store, and merge the data in the delta store with the base data to obtain the latest data when reading; if it cannot be queried in base data, write it to memtable and flush it into base data, if not found, write the row into memtable, which would be flushed to base data

    • When reading data, the query need to merge the base data and delta data.

  • ProsAdvantages: Good writing performance, efficient support for partial updates

  • Cons: Poor support for secondary indexes

  • Representational Representative system: Kudu

Solution Selection

Combining Considering Doris' Unique Key usage scenarios (high real-time freshness requirements for importdata load, high import load frequency, and hope to query as fast as possiblelow query latency), Merge-On-Write is a more suitable solution for optimization.

...

To implement the Mark Delete and Insert schemesolution, we should consider the following key points:

  1. Add a primary key index, which can quickly check lookup whether the key exists, as well as the file location and line number, when importingdata loading.

  2. Add a delete bitmap to mark the old key as deleted when an update occurs

  3. The update of delete bitmap needs to consider transaction support and multi-version support

  4. Query adaptation on delete bitmap

  5. Conflict handling between Compaction and data import. Compaction may have overwritten a row of a rowset by an ongoing import task after reading the data of the rowset before committing. The row needs to be re-marked and deleted in the newly generated rowsetload. during compactionother data load jobs may have updated the source rowset and added some new delete bitmap on it. compaction must process these concurrent updates properly

Primary key index

Several key points of primary key index design and selection:

  1. High QPS (a single BE target can to support 10w QPS)

    1. Doris uses columnar storage. When the primary unique key column contains multiple columns, a single click lookup needs to read the value of each column column column by columncolumns one by one, involving multiple disk IO . In order to improve the performance of the primary key index, we need to store the encoded primary key in the primary key index. When the primary key column contains multiple columns, these columns The multi column unique key will be encoded together and stored in the to one primary key in index.

  2. Do not consume internal memory excessively

    1. The primary key index selection uses is designed in a scheme similar to way like RocksDB Partitioned Index [refer to https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters ], which is created in Segment the segment file when MemTable flushes. The main considerations for choosing this scheme solution are as follows:

    2. RocksDB has proved that its index scheme solution can support very high QPS. With the reasonable configuration of BlockCache, the query performance can be guaranteed under the condition of ensuring the index page 90 +% Cache hit rate of index page 

    3. Because it is a file-based index, it can be loaded on demand, and the consumption of memory is controllable

    4. The scheme prefixes With the prefix compression on the primary key data, which we can further save space, reduce the occupation of memory cache and the amount of data on disk IO

  3. Influence of Too Many Small Files on Check Performance under High Frequency Importlookup performance optimization under high frequency data load scenario

    1. In the default configuration, a tablet may have at most 500 rowsets, and the performance of seeking a key in so many rowsets one by one may be poor

    2. In response to this situation, each tablet To optimize this case, we can consider to maintaining an additional CuckooFilter per tablet for small file click lookup acceleration. Compared with BloomFilter, CuckooFilter has the advantages of low FP rate (same size), support deletion, and value storage (such as storing line numbers). (This scheme solution is inspired by SlimDB: https://www.vldb.org/pvldb/vol10/p2037-ren.pdf , by using a scheme solution similar to Multi Cuckoo Filter in SlimDB, we can quickly determine whether a key exists with lower memory consumption and IO consumption)

For the primary key index, add the corresponding BloomFilterWe should also add a corresponding BloomFilter for each primary index. The overall design is shown in the figure below:

Image RemovedImage Added

Delete Bitmap

Delete Bitmap is recorded using Roaring Bitmap, which is stored in RocksDB together with tablet meta, and each segment corresponds to a bitmap. In order to maintain the original version semantics of Unique Key, Delete Bitmap also needs to support multiple versions. Considering that each import data load job generates a one version of the bitmap, which may cause a relatively large memory consumption, it is more appropriate to generate an incremental modification for each version .

...

  1. DeltaWriter will first flush the data to disk

  2. In the publish phase, check lookup all the keys in batches, and update the bitmap corresponding to the overwritten keys. In the figure below, the newly written rowset version is 8, which modifies the data in 3 rowsets, so it will generate 3 bitmap modification records

  3. Updating the bitmap during the publish phase ensures that no visible rowset will be added during batch key checking and lookup and bitmap update, ensuring the correctness of bitmap updates

  4. If a segment is not modified, there will be no bitmap record created for the corresponding version. For example, segment1 of rowset1 has no bitmap corresponding to version 8

Image RemovedImage Added

Read process

The reading process of Bitmap is shown in the following figure:

  1. A query that requests version 7 will only see the data corresponding to version 7

  2. When reading the data of rowset5, the bitmap generated by the modification of v6 and v7 will be merged together to obtain the complete DeleteBitmap corresponding to version7, which is used to filter the data

  3. In the example below, the version 8 import overwrites a piece of load job overwrites one row data in segment2 of rowset1, but Query requesting version 7 can still read the piece of datathat row

  4. In the high-frequency import scenariodata load scenario, there may be a large number of versions of bitmaps, and merging these bitmaps may also have a large CPU computing consumption, so we introduced an LRU cache, and each version of the bitmap only needs to be merged once.

Image RemovedImage Added

Transaction support

DeltaWriter does not check lookup and update the bitmap in the writing stage, but does it in the publish stage

...

  1. TabletReader obtains the specified version of DeleteBitmap in TabletMeta according to the version specified by Query

  2. Pass the DeleteBitmap of the specified rowset to the RowsetReaderContext via RowsetReader

  3. Then pass the DeleteBitmap corresponding to the Segment to the SegmentIterator through StorageReadOptions

  4. SegmentIterator itself maintains a _row_bitmap for quick conditional filtering by subtracting , we can subtract incoming DeleteBitmap on row_bitmap  directly

 Adaption for point Lookup

  1. At present, Doris' data reading is designed for Scan, and there is no support for point -and-see lookup capabilities, so the following interfaces need to be added:

  2. Add minkey/maxkey records to the meta of each Segment, and organize all segments in memory through the line segment interval-tree structure

  3. Through the line segment interval-tree, you we can quickly locate which segment may contains a given encoded_key may be in

  4. Add a lookup_row_key interface to SegmentIterator for point checkinglookup

Handling Compaction and Write Conflicts

  1. Compaction The normal flowcompaction process

    1. When compaction reads data, it obtains the version Vx of the currently processed rowset, and will automatically filter out the rows marked for deletion through the delete bitmap (see the previous query layer adaptation section)

    2. After the compaction is overWhen all data merged, you can clean up all DeleteBitmaps on the source rowset that are less than or equal to version Vx

    3. If there are some bitmaps have version larger than Vx, it can be proceed in the same way as the following
  2. Handling compaction and write conflictsHandling Compaction and Write Conflicts

    1. During compaction execution, new import data load tasks may be submittedcommitted, assuming the corresponding version is Vy. If the write corresponding to Vy has modifications to modified the compaction source rowset, it will be updated to the Vy of the DeleteBitmap of the add a delete bitmap with version Vy for that rowset

    2. After the compaction ends, check all DeleteBitmaps on the rowset that are larger than Vx, and update the row number rowid in them to the segment row number rowid in the newly generated rowset

As shown in the figure below, compaction selects three rowsets [0-5], [6-6], [7-7]. During the compactio, the load job of version8 is finished successfully. In the compaction commit stage, it is necessary to process the bitmap generated by version8 load job.

Image Added

Scheduling

...

  1. Index

    1. Add an index for primary key (leverage the `IndexedColumnWriter/Reader`)

    2. Add Bloom filter support

    3. Add a lookup interface, to integrate bloom filter and primary key index
    4. Delete Bitmap
  2. Delete Bitmap
    1. Add a versioned delete bitmap, as well as it's serialization code
    2. Add LRU cache
    3. Used in read process
    4. Update bitmap at the publish stage of a load job
  3. Lookup interface
    1. Add a Interval-Tree structure
    2. Add tablet lookup interface, as well as a structure `RowLocation`
    3. Add min/max key for segment 
    4. Add a structure for rowset tree
    5. Lookup interface implementation, leverage the rowset-tree and segment readers
    6. Optimizations on lookup interface
  4. Compaction
    1. Support rowid conversion
    2. Compaction update bitmap
    3. Some compaction policy optimization
    4. Process range deletion
  5. Other Supports
    1. Add the table option at BE side first
    2. UniqueKey index options on create table
    3. Process cases that duplicate key may be contained in multiple segment in one load job
  6. Performance optimization
    1. Add cuckoo filter support