Discussion thread | https://lists.apache.org/thread/7qjzbcfzdshqb3h7ft31v9o3x43t8k6r |
---|---|
Vote thread | |
ISSUE | Release | TBD
https://github.com/apache/incubator-paimon/issues/2898 | |
Release | 0.8 |
Motivation
Position delete is a solution to implement the Merge-On-Read (MOR) structure, which has been adopted by other formats such as Iceberg[1] and Delta[2]. By combining with Paimon's LSM tree, we can create a new position deletion mode called `deletion vectors mode` unique to Paimon.
Under this mode, extra overhead (lookup and write delete file) will be introduced during writing, but during reading, data can be directly retrieved using "data + filter with position deletedeletion vector", avoiding additional merge costs between different files. Furthermore, this mode can be easily integrated into native engine solutions like Spark + GlutonGluten[3] in the future, thereby significantly enhancing read performance.
Goals
Must
- Data read and write operations are accurate.
- Reader can directly obtain the final data through "data + filter with position delete" without additional merging.
Should
- The number of delete files written each time is controllable.
- The additional overhead caused by writing is controllable.
- Read performance is superior to the original LSM merge.
- Unused delete files can be automatically cleaned up.
Implement
1. Delete File
Delete file is used to mark the deletion of original file. The following figure illustrates how data updating and deleting under the delete file mode:
Currently, there are two ways to represent the deletion of records:
- Position delete: marking the specific row in a file as deleted.
- Equality delete: writing a filter directly to represent the deletion.
Taking into account:
- Paimon can obtain the old records during lookup compaction.
- Inserts in paimon may also result in updates (deletes), which are difficult to represent using equality delete.
- Position delete is sufficiently efficient for reader.
...
Therefore, we do not consider equality delete and will only implement delete file using the position delete. There are three design approachs as follows:
1.1. Approach 1
Store deletes as list<file_name, pos>
, which is doubly sorted by file_name and pos.
...
Here is an exmaple, suppose we have deleted some lines of the files:
When writing new deletions, write it directly into a new delete file.
Suppose we need to delete the 4th line of data_file1 and the 6th line of data_file5:
Advantages:
- Simple storage, can be directly written into formats like Parquet, Orc, Avro, etc.
...
- High redundancy, with the file_name being repeated extensively.
- When reading, it is necessary to read all the delete files first, and then construct the bitmap for the corresponding data file.
Approach 1 is inefficient, don’t choose it, Approach 2 and Approach 3 both directly store bitmap in delete file, but the implementations are different.
1.2. Approach 2(pick)
One delete file per bucket, with a structure of map<file_name, bitmap>
. When reading a specific data file, read it and construct the map<file_name, bitmap>, and then get the corresponding bitmap by file_name.
Here is an exmaple (bin {3} means a bitmap in binary which has added line 3) :
When writing new deletions, a new delete file is created, and marking the old one as removed at the same time:
Advantages:
- Simple implementation, similar to Paimon's existing index (one index file per bucket), just adding a delete file manifest.
- The number of delete files is stable and corresponds to the number of buckets.
- The logic for cleaning up delete files is simple, just clean the older one.
...
- Reading and writing of delete file is on bucket-level.
- In extreme cases, if the deletion is distributed across all buckets, the delete files for all buckets will need to be rewritten.
1.3. Approach 3
One delete file per writing, with a structure of list<bitmap>,and
add additional metadata <delete file name, offset, size>
to point to its bitmap (this structure is also called delete vector
).
When reading a specific data file, obtain the delete_file's file name based on the metadata, and then according to the offset + size, retrieve the corresponding bitmap.
...
- More changes to the Paimon protocol are needed, file become a tuple <data_file, delete_meta>, and the logic for cleaning up delete files is more complex.
- When writing, it is necessary to merge the bitmaps generated by each bucket into a single delete file.
- In extreme cases, if there are deletions with every write, then a new delete file will be generated with each write operation (however, there is a maximum number guaranteed because with each full compaction, all delete files become invalid).
1.4. Test
Before deciding on which approach to go with, let's first conduct a performance test on bitmaps, based on org.roaringbitmap.RoaringBitmap[4]
. The reasons for choosing it are as follows:
...
"data rate / max num" = 20% / 2,000,000, means randomly call "RoaringBitmap.add(x)", which "x" is randomly in the range of 0 to 2,000,000 for a total of 20% * 2,000,000 = 400,000 times to build the bitmap, then serialize it to file, next deserialize from file, finally call "RoaringBitmap.addcontains(x)" for 400,000 times to simulate filter.
measure: the total time of calling "RoaringBitmap.add(x)", time of serialize to file, time of deserialize from file, the serialized file size, and the total time of calling "RoaringBitmap.contains(x)".
data rate / max num | add(ms) | serialization(ms) | deserialization(ms) | file size(MB) | constains(ms) |
20% /2,000,000 | 43 | 5 | 26 | 0.24 | 7 |
50% /2,000,000 | 47 | 3 | 52 | 0.24 | 5 |
80% /2,000,000 | 57 | 1 | 24 | 0.24 | 8 |
20% /20,000,000 | 450 | 13 | 247 | 2.4 | 49 |
50% /20,000,000 | 629 | 6 | 222 | 2.4 | 76 |
80% /20,000,000 | 1040 | 5 | 222 | 2.4 | 121 |
20% /200,000,000 | 5079 | 44 | 2262 | 24 | 442 |
50% /200,000,000 | 9469 | 43 | 2773 | 24 | 1107 |
80% /200,000,000 | 13625 | 38 | 2233 | 24 | 1799 |
20% /2,000,000,000 | 93753 | 568 | 22290 | 239 | 5747 |
50% /2,000,000,000 | 166070 | 679 | 22339 | 239 | 14735 |
80% /2,000,000,000 | 218233 | 553 | 22684 | 239 | 26504 |
Summarize the following points:
- The serialization and deserialization of the bitmap and its file size and add&contains cost are basically proportional to the amount of data.
- When the data volume reaches 2 billion, it is essentially unusable.
Let's do some choices:
1. RoaringBitmap
or Roaring64NavigableMap
?
...
Therefore, considering both implementation and performance aspects, Approach 2 is ultimately chosen.
2. protocal design
2.1. layout
.
├── deleteFile
│ ├── delete-32f16270-5a81-4e5e-9f93-0e096b8b58d3-0
│ └── delete-32f16270-5a81-4e5e-9f93-0e096b8b58d3-1
├── manifest
│ ├── delete-manifest-40d555d3-dc04-4363-a2b3-3860829c427b-0
│ ├── delete-manifest-4c3b5202-986f-4ea3-aa8e-941531e451ac-0
│ ├── manifest-5aeccf75-6dfa-4619-9df0-520488aabe76-0
│ ├── manifest-e93ed16f-91b7-47fe-919d-e8b518992ed3-0
│ ├── manifest-list-8a724129-758d-478c-989c-1b2538e76d44-0
│ └── manifest-list-8a724129-758d-478c-989c-1b2538e76d44-1
├── pt=p
│ ├── bucket-0
│ │ └── data-78c9ef18-7c31-4cf1-8cd0-4772cc0a1678-0.orc
│ ├── bucket-1
│ │ └── data-1ce952bf-8514-49ce-9a55-1b70ed509d78-0.orc
├── schema
│ └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
├── snapshot-1
└── snapshot-2
snapshot-x
{
...
"deleteFileManifest" : "delete-manifest-40d555d3-dc04-4363-a2b3-3860829c427b-0",
}
delete-manfiext-xxx (avro)
{
"org.apache.paimon.avro.generated.record": {
"_VERSION": 1,
"_KIND": 0,
"_PARTITION": "\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p\u0000\u0000\u0000\u0000\u0000\u0000",
"_BUCKET": 0,
"_TYPE": "RoaringBitmap",
"_FILE_NAME": "delete-32f16270-5a81-4e5e-9f93-0e096b8b58d3-0",
"_FILE_SIZE": x,
"_ROW_COUNT": x
}
}
delete-xxx
The tentative format is Avro, with json as example to show
{
"fileName1": the serialized bitmap1,
"fileName2": the serialized bitmap2,
...
}
2.2. Implementation
It's essentially the same logic as the paimon index.
3. Write
3.1. Overview
Refer to the existing lookup mechanis, design a deleteFile generation mechanism based on compaction + lookup:
- New data is written to the level-0 layer.
- Perform a compaction with each write and force merge of level-0 layer data, see
ForceUpLevel0Compaction
. - Implement a merge like
LookupDeleteFileMergeFunctionWrapper
, which has the following characteristics:
- a. When records do not belong to the level-0 layer, no deletiton is generated.
- b. When records belong to the level-0 + level-x layers, no deletiton is generated.
- c. When records belong only to the level-0 layer, look up other layers and update the
map<fileName, bitmap>
.
4. After the compaction finish, the bitmap of the before files
in this merge is no longer useful and will be deleted from map<fileName, bitmap>
.
5. Finally, write the new deleteFile and mark the old deleteFile as remove.
6. For full compaction, an optimization can be made: directly clear the map<fileName, bitmap>
.
Example:
Assume the LSM has a total of four layers, the initial stage is as follows (left to right are: file content, LSM tree, delete file):
Then, a new file f7
, is initially added to the level-0. Suppose compaction picks the level-0 layer and the level-2 layer, the following changes will occur:
- 1 belongs only to the level-0 layer, it needs to look up old data and finds that
f1
also contains 1, sof1
's bitmap is modified to add 1. - 2 and 9 belong to both the level-0 and level-2 layers, there's no need to modify the bitmap.
- The bitmap of
f6
can be removed because it has been compacted. f5
,f6
, andf7
are marked asREMOVE
, and the old delete file is marked asREMOVE
.
FInally, assuming that compaction has generated f8
and f9
, the final result is as follows:
f8
andf9
are marked asADD
, and the new delete file is marked asADD
.
3.2. Implementation
Considerations for implementation:
- Currently, when set
'changelog-producer' = 'lookup'
, the data write behavior is not atomic but divided into two steps: first, data is written to create snapshot1, then lookup compaction generates snapshot2. We need to consider the atomicity of this. - In most cases, the data will be transferred to level-0 first, and then rewritten. The writing overhead is a bit high, and perhaps some optimization can be done in this regard.
- If change log needs to be generated, in theory, change log and delete file can be produced simultaneously (without reading twice).
- The merge engine is still available.
4. Read
4.1. Overview
- For each read task, load the corresponding deleteFile.
- Construct a
map<fileName, bitmap>
. - Get the bitmap based on the filename, then pass it to the reader.
4.2. Implementation
Considerations for implementation:
- During the POC, bitmaps can be used to perform the final filtering of data, but in the final version, it is necessary to pass the bitmap to the reader layers of various formats as much as possible.
- The bitmap should be passed to reader as top as possible.
5. Maintenance
5.1. compaction
We can incorporate bitmap evaluation during compaction pick, such as when the proportion of deleted rows in a file reaches like 50%, we can pick it for compaction.
5.2. expire
Determine whether to delete based on the delete
and add
records in the deleteFileManifest.
6. Other considerations
...
Reuse the current index layout and just treat the deletionVectors as a new index file type
2.2. Deletion vectors index file encoding
Like hash index, one bucket one deletionVector index. Therefore, a deletionVector index file needs to contain bitmaps of multiple files in the same bucket, its structure is actually a map<fileName, bitmap>
, to support high-performance reads, we have designed the following file encoding to store this :
In IndexFileMeta:
{
"org.apache.paimon.avro.generated.record": {
"_VERSION": 1,
"_KIND": 0,
"_PARTITION": "\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p\u0000\u0000\u0000\u0000\u0000\u0000",
"_BUCKET": 0,
"_TYPE": "DELETION_VECTORS",
"_FILE_NAME": "index-32f16270-5a81-4e5e-9f93-0e096b8b58d3-0",
"_FILE_SIZE": x,
"_ROW_COUNT": count of the map,
"_DELETION_VECTORS_RANGES": "binary Map<String, Pair<Integer, Integer>>", key is the fileName, value is <start offset of the serialized bitmap in Index file, size of the serialized bitmap>
}
}
In IndexFile:
- First, record version by a byte.
- Then, record <serialized bitmap' size , serialized bitmap, serialized bitmap's checksum> in sequence.
For each serialized bitmap:
- First, record a const magic number by an int.
- Then, record serialized bitmap.
e.g:
3. Write
3.1. Overview
Refer to the existing lookup mechanis, design a deleteFile generation mechanism based on compaction + lookup:
- New data is written to the level-0 layer.
- Perform a compaction with each write and force merge of level-0 layer data, see
ForceUpLevel0Compaction
. - Implement a merge like
LookupDeleteFileMergeFunctionWrapper
, which has the following characteristics:
- a. When records do not belong to the level-0 layer, no deletiton is generated.
- b. When records belong to the level-0 + level-x layers, no deletiton is generated.
- c. When records belong only to the level-0 layer, look up other layers and update the
map<fileName, bitmap>
.
4. After the compaction finish, the bitmap of the before files
in this merge is no longer useful and will be deleted from map<fileName, bitmap>
.
5. Finally, write the new deleteFile and mark the old deleteFile as remove.
6. For full compaction, an optimization can be made: directly clear the map<fileName, bitmap>
.
Example:
Assume the LSM has a total of four layers, the initial stage is as follows (left to right are: file content, LSM tree, delete file):
Then, a new file f7
, is initially added to the level-0. Suppose compaction picks the level-0 layer and the level-2 layer, the following changes will occur:
- 1 belongs only to the level-0 layer, it needs to look up old data and finds that
f1
also contains 1, sof1
's bitmap is modified to add 1. - 2 and 9 belong to both the level-0 and level-2 layers, there's no need to modify the bitmap.
- The bitmap of
f6
can be removed because it has been compacted. f5
,f6
, andf7
are marked asREMOVE
, and the old delete file is marked asREMOVE
.
FInally, assuming that compaction has generated f8
and f9
, the final result is as follows:
f8
andf9
are marked asADD
, and the new delete file is marked asADD
.
3.2. Implementation
Considerations for implementation:
Currently, when set'changelog-producer' = 'lookup'
, the data write behavior is not atomic but divided into two steps: first, data is written to create snapshot1, then lookup compaction generates snapshot2. We need to consider the atomicity of this.- In most cases, the data will be transferred to level-0 first, and then rewritten. The writing overhead is a bit high, and perhaps some optimization can be done in this regard.
- If change log needs to be generated, in theory, change log and delete file can be produced simultaneously (without reading twice).
- The merge engine is still available.
4. Read
4.1. Overview
- For each read task, load the corresponding deleteFile.
- Construct the
map<fileName, bitmap>
from deleteFile. - Get the bitmap based on the filename, then pass it to the reader.
5. Maintenance
5.1. compaction
We can incorporate bitmap evaluation during compaction pick, such as when the proportion of deleted rows in a file reaches like 50%, we can pick it for compaction.
5.2. expire
Determine whether to delete based on the delete
and add
records in the deleteFileManifest.
6. Other considerations
- Impact on file meta: Currently, the stats (min, max, null count) in file meta are already unreliable, so no special handling will be performed for this aspect.
- ...
Public Interfaces
How to use
a new conf:
deletion-vectors.enabled:
control whether to enable deletion vectors mode: write deletion vectors index and read using it without merge.
limitations:
- Only support for tables with primary keys
- Only support `changelog-producer` = `none` or `lookup`
- `changelog-producer.lookup-wait` can't be `false`
- `merge-engine` can't be `first-row`, because the read of first-row is already no merging, deletion vectors are not needed
- This mode will filter the data in level-0, so when using time travel to read `APPEND` snapshot, there will be data delay
other:
- Since there is no need to merge when reading, in this mode, we can support filter pushdown of non-PK fields and data reading concurrency is no longer limited !
Classes
Add RecordWithPositionIterator
to get row position
Code Block | ||||
---|---|---|---|---|
| ||||
public interface RecordWithPositionIterator<T> extends RecordReader.RecordIterator<T> {
/**
* Get the row position of the row returned by {@link RecordReader.RecordIterator#next}.
*
* @return the row position from 0 to the number of rows in the file
*/
long returnedPosition();
} |
Abstract an interface DeletionVector
to represent the deletion vector, and provide a BitmapDeletionVector
based on RoaringBitmap to implement it:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface DeletionVector {
void delete(long position);
boolean checkedDelete(long position);
boolean isDeleted(long position);
boolean isEmpty();
byte[] serializeToBytes();
DeleteIndex deserializeFromBytes(byte[] bytes);
} |
Add a DeletionVectorsIndexFile
to read, write and delete deletionVector:
Code Block | ||||
---|---|---|---|---|
| ||||
public class DeletionVectorsIndexFile {
public long fileSize(String fileName);
public Map<String, DeletionVector> readAllDeletionVectors(String fileName, Map<String, Pair<Integer, Integer>> deletionVectorRanges);
public DeletionVector readDeletionVector(String fileName, Pair<Integer, Integer> deletionVectorRange);
public Pair<String, Map<String, Pair<Integer, Integer>>> write(Map<String, DeletionVector> input);
public void delete(String fileName);
} |
Add DeletionVectorsMaintainer
to maintain dv:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface IndexMaintainer<T, U> {
public void notifyNewDeletion(String fileName, long position);
public void removeDeletionVectorOf(String fileName);
List<IndexFileMeta> prepareCommit();
public Optional<DeletionVector> deletionVectorOf(String fileName);
} |
Add ApplyDeletionVectorReader implements RecordReader<KeyValue> to read with DeletionVector
Code Block | ||||
---|---|---|---|---|
| ||||
public class ApplyDeletionVectorReader implements RecordReader<KeyValue> {
public ApplyDeletionVectorReader(RecordReader<KeyValue> reader, DeletionVector deletionVector) {
this.reader = reader;
this.deletionVector = deletionVector;
}
@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
RecordIterator<T> batch = reader.readBatch();
if (batch == null) {
return null;
}
FileRecordIterator<T> batchWithPosition = (FileRecordIterator<T>) batch;
return batchWithPosition.filter(
a -> !deletionVector.isDeleted(batchWithPosition.returnedPosition()));
}
...
} |
...
Compatibility, Deprecation, and Migration Plan
Conversion between delete file mode (just temporarily call it) deletion vectors modeand original LSMmode
- LSM original mode -> delete file deletion vectors mode: can be directly switched (add a parameter to control whether to enable delete file). perform a full compaction, then set `deletion-vectors.enabled` = `true`, and time travel to the snapshots before enabled will be prohibited.
- deletion vectors mode -> original modedelete file mode -> LSM, in theory, perform a full compaction, then clean up the old snapshot, and then disabled delete file modeset `deletion-vectors.enabled` = `false`, and time travel to the snapshots before enabled will be prohibited.
Future work
- Integrate deletion vectors with append table
- ...
[1]: https://github.com/apache/iceberg
...