You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Current »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: 

JIRA or Github Issue: 

Released: <Doris Version>

Google Doc:  DSIP-018: Support Merge-On-Write implementation for UNIQUE KEY data model

Design Doc in PDF:For convenience of Chinese developer, I also uploaded the Chinese version design doc in PDF(使用中文的开发者也可以参考PDF版的中文版设计文档)

 

Motivation

Doris' Unique-Key data model is widely used in scenarios with update requirements such as Flink-CDC synchronization, user portrait, and e-commerce orders, but Unique-Key currently uses the Merge-On-Read implementation, the query performance is not good due to the following reasons:

  1. sIn merge-on-read implementation, it is necessary to read all the filtered data during the query, and perform a merge sort to confirm which row of data is the latest data. This merge sort brings additional cpu-cost

  2. Since it is impossible to confirm whether the data of a row in a segment is the latest data, Unique-Key does not support push-down of aggregate function predicates, which increases the amount of data read

The following figure is a simplified representation of the execution process of the Unique-Key model:

Related Research

We investigated the implementation of other mainstream data warehouses for the data update model. In addition to the Merge-On-Read used by Unique-Key, there are basically the following categories

Merge-On-Write(Mark Delete and Insert)

  • Solution overview:

    • Checks if it is an update request via a primary key index and locates to which line of the file the overwritten key is in

    • Mark the key as delete by adding the rowid into a bitmap, and filter data based on the rowid recorded in the bitmap when reading

  • Pros: key comparison is avoided when reading, and various predicate push-down and secondary indexes are supported
  • Cons: Additional query index and cost of updating bitmap on write
  • Representational system: Hologres, Aliyun ADB, ByteHouse

Copy-On-Write

  • Solution overview:

    • When writing/updating data, first read the data in original file directly, then update the data and generate a new version of the file. Even if only one byte of new data is committed, the entire column data file needs to be rewritten.

  • Pros: Highest query efficiency

  • Cons: The cost of rewriting data is very high, suitable for scenarios of low-frequency loading, high-frequency reading

  • Representational system: Delta Lake, Hudi

Delta-Store

  • Solution overview:

    • Divide data into base data and delta data. Each key is unique in base data

    • When writing/updating data, first query the base data, if the key is found, then it is an update request, the updated data would be written to the delta store, if not found, write the row into memtable, which would be flushed to base data

    • When reading data, the query need to merge the base data and delta data.

  • Pros: Good writing performance, efficient support for partial updates

  • Cons: Poor support for secondary indexes

  • Representational system: Kudu

Solution Selection

Considering Doris' Unique Key usage scenarios (high freshness requirements for data load, high load frequency, and low query latency), Merge-On-Write is a more suitable solution for optimization.

Detailed Design

Key Design Points

To implement the Mark Delete and Insert solution, we should consider the following key points:

  1. Add a primary key index, which can quickly lookup whether the key exists, as well as the file location and line number, when data loading.

  2. Add a delete bitmap to mark the old key as deleted when an update occurs

  3. The update of delete bitmap needs to consider transaction support and multi-version support

  4. Query adaptation on delete bitmap

  5. Conflict handling between Compaction and data load. during compactionother data load jobs may have updated the source rowset and added some new delete bitmap on it. compaction must process these concurrent updates properly

Primary key index

Several key points of primary key index design and selection:

  1. High QPS (a single BE target to support 10w QPS)

    1. Doris uses columnar storage. When the unique key contains multiple columns, a single lookup needs to read the value of each columns one by one, involving multiple disk IO . In order to improve the performance of the primary key index, we need to store the encoded primary key in the primary key index. The multi column unique key will be encoded in to one primary key in index.

  2. Do not consume memory excessively

    1. The primary key index is designed in a similar way like RocksDB Partitioned Index [refer to https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters ], which is created in the segment file when MemTable flushes. The main considerations for choosing this solution are as follows:

    2. RocksDB has proved that its index solution can support very high QPS. With the reasonable configuration of BlockCache, the query performance can be guaranteed under the 90 +% Cache hit rate of index page 

    3. Because it is a file-based index, it can be loaded on demand, and the consumption of memory is controllable

    4. With the prefix compression on the primary key data, we can further save space, reduce the occupation of memory cache and the amount of data on disk IO

  3. lookup performance optimization under high frequency data load scenario

    1. In the default configuration, a tablet may have at most 500 rowsets, and the performance of seeking a key in so many rowsets one by one may be poor

    2. To optimize this case, we can consider to maintaining an additional CuckooFilter per tablet for small file lookup acceleration. Compared with BloomFilter, CuckooFilter has the advantages of low FP rate (same size), support deletion, and value storage (such as storing line numbers). (This solution is inspired by SlimDB: https://www.vldb.org/pvldb/vol10/p2037-ren.pdf , by using a solution similar to Multi Cuckoo Filter in SlimDB, we can quickly determine whether a key exists with lower memory consumption and IO consumption)

We should also add a corresponding BloomFilter for each primary index. The overall design is shown in the figure below:

Delete Bitmap

Delete Bitmap is recorded using Roaring Bitmap, which is stored in RocksDB together with tablet meta, and each segment corresponds to a bitmap. In order to maintain the original version semantics of Unique Key, Delete Bitmap also needs to support multiple versions. Considering that each data load job generates one version of the bitmap, which may cause a relatively large memory consumption, it is more appropriate to generate an incremental modification for each version .

Write process

The writing process of Delete Bitmap is shown in the following figure:

  1. DeltaWriter will first flush the data to disk

  2. In the publish phase, lookup all the keys in batches, and update the bitmap corresponding to the overwritten keys. In the figure below, the newly written rowset version is 8, which modifies the data in 3 rowsets, so it will generate 3 bitmap modification records

  3. Updating the bitmap during the publish phase ensures that no visible rowset will be added during batch key lookup and bitmap update, ensuring the correctness of bitmap updates

  4. If a segment is not modified, there will be no bitmap created for the corresponding version. For example, segment1 of rowset1 has no bitmap corresponding to version 8

Read process

The reading process of Bitmap is shown in the following figure:

  1. A query that requests version 7 will only see the data corresponding to version 7

  2. When reading the data of rowset5, the bitmap generated by the modification of v6 and v7 will be merged together to obtain the complete DeleteBitmap corresponding to version7, which is used to filter the data

  3. In the example below, the version 8 load job overwrites one row data in segment2 of rowset1, but Query requesting version 7 can still read that row

  4. In the high-frequency data load scenario, there may be a large number of versions of bitmaps, and merging these bitmaps may also have a large CPU computing consumption, so we introduced an LRU cache, and each version of the bitmap only needs to be merged once.

Transaction support

DeltaWriter does not lookup and update the bitmap in the writing stage, but does it in the publish stage

Query layer adaptation

Scan adaptation

  1. TabletReader obtains the specified version of DeleteBitmap in TabletMeta according to the version specified by Query

  2. Pass the DeleteBitmap of the specified rowset to the RowsetReaderContext via RowsetReader

  3. Then pass the DeleteBitmap corresponding to the Segment to the SegmentIterator through StorageReadOptions

  4. SegmentIterator itself maintains a _row_bitmap for quick conditional filtering, we can subtract incoming DeleteBitmap on row_bitmap  directly

 Adaption for point Lookup

  1. At present, Doris' data reading is designed for Scan, and there is no support for point lookup capabilities, so the following interfaces need to be added:

  2. Add minkey/maxkey records to the meta of each Segment, and organize all segments in memory through the interval-tree structure

  3. Through the interval-tree, we can quickly locate which segment may contains a given encoded_key

  4. Add a lookup_row_key interface to SegmentIterator for point lookup

Handling Compaction and Write Conflicts

  1. The normal compaction process

    1. When compaction reads data, it obtains the version Vx of the currently processed rowset, and will automatically filter out the rows marked for deletion through the delete bitmap (see the previous query layer adaptation section)

    2. When all data merged, you can clean up all DeleteBitmaps on the source rowset that are less than or equal to version Vx

    3. If there are some bitmaps have version larger than Vx, it can be proceed in the same way as the following
  2. Handling compaction and write conflicts

    1. During compaction execution, new data load tasks may be committed, assuming the corresponding version is Vy. If the write corresponding to Vy has modified the compaction source rowset, it will add a delete bitmap with version Vy for that rowset

    2. After the compaction ends, check all DeleteBitmaps on the rowset that are larger than Vx, and update the rowid in them to the segment rowid in the newly generated rowset

As shown in the figure below, compaction selects three rowsets [0-5], [6-6], [7-7]. During the compactio, the load job of version8 is finished successfully. In the compaction commit stage, it is necessary to process the bitmap generated by version8 load job.

Scheduling

  1. Index

    1. Add an index for primary key (leverage the `IndexedColumnWriter/Reader`)

    2. Add Bloom filter support

    3. Add a lookup interface, to integrate bloom filter and primary key index
    4. Delete Bitmap
  2. Delete Bitmap
    1. Add a versioned delete bitmap, as well as it's serialization code
    2. Add LRU cache
    3. Used in read process
    4. Update bitmap at the publish stage of a load job
  3. Lookup interface
    1. Add a Interval-Tree structure
    2. Add tablet lookup interface, as well as a structure `RowLocation`
    3. Add min/max key for segment 
    4. Add a structure for rowset tree
    5. Lookup interface implementation, leverage the rowset-tree and segment readers
    6. Optimizations on lookup interface
  4. Compaction
    1. Support rowid conversion
    2. Compaction update bitmap
    3. Some compaction policy optimization
    4. Process range deletion
  5. Other Supports
    1. Add the table option at BE side first
    2. UniqueKey index options on create table
    3. Process cases that duplicate key may be contained in multiple segment in one load job
  6. Performance optimization
    1. Add cuckoo filter support
  • No labels