Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread

https://lists.apache.org/thread/7qjzbcfzdshqb3h7ft31v9o3x43t8k6r
Vote thread
ISSUE
ReleaseTBD

Motivation

Position delete is a solution to implement the Merge-On-Read (MOR) structure, which has been adopted by other formats such as Iceberg[1] and Delta[2]. By combining with Paimon's LSM tree, we can create a new position deletion mode unique to Paimon.

Under this mode, extra overhead (lookup and write delete file) will be introduced during writing, but during reading, data can be directly retrieved using "data + filter with position delete", avoiding additional merge costs between different files. Furthermore, this mode can be easily integrated into native engine solutions like Spark + Gluten[3] in the future, thereby significantly enhancing read performance.

Goals

Must

  1. Data read and write operations are accurate.
  2. Reader can directly obtain the final data through "data + filter with position delete" without additional merging.

Should

  1. The number of delete files written each time is controllable.
  2. The additional overhead caused by writing is controllable.
  3. Read performance is superior to the original LSM merge.
  4. Unused delete files can be automatically cleaned up.

Implement

1. Delete File

Delete file is used to mark the deletion of original file. The following figure illustrates how data updating and deleting under the delete file mode:

...

Therefore, we do not consider equality delete and will only implement delete file using the position delete. There are three design approachs as follows:

1.1. Approach 1


Store deletes as list<file_name, pos> , which is doubly sorted by file_name and pos.

...

Approach 1 is inefficient, don’t choose it, Approach 2 and Approach 3 both directly store bitmap in delete file, but the implementations are different.

1.2. Approach 2(pick)


One delete file per bucket, with a structure of  map<file_name, bitmap>. When reading a specific data file, read it and construct the map<file_name, bitmap>, and then get the corresponding bitmap by file_name.

...

  • Reading and writing of delete file is on bucket-level.
  • In extreme cases, if the deletion is distributed across all buckets, the delete files for all buckets will need to be rewritten.

1.3. Approach 3


One delete file per writing, with a structure of list<bitmap>,and add additional metadata <delete file name, offset, size> to point to its bitmap (this structure is also called delete vector).

...

  • More changes to the Paimon protocol are needed, file become a tuple <data_file, delete_meta>, and the logic for cleaning up delete files is more complex.
  • When writing, it is necessary to merge the bitmaps generated by each bucket into a single delete file.
  • In extreme cases, if there are deletions with every write, then a new delete file will be generated with each write operation (however, there is a maximum number guaranteed because with each full compaction, all delete files become invalid).

1.4. Test

Before deciding on which approach to go with, let's first conduct a performance test on bitmaps, based on org.roaringbitmap.RoaringBitmap[4]. The reasons for choosing it are as follows:

...

Therefore, considering both implementation and performance aspects, Approach 2 is ultimately chosen.

2. protocal design

2.1. layout

Reuse the current index layout and just treat deletemap as a new index file type

...

{
  "org.apache.paimon.avro.generated.record": {
    "_VERSION": 1,
    "_KIND": 0,
    "_PARTITION": "\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p\u0000\u0000\u0000\u0000\u0000\u0000",
    "_BUCKET": 0,
    "_TYPE": "DELETE_MAP",
    "_FILE_NAME": "index-32f16270-5a81-4e5e-9f93-0e096b8b58d3-0",
    "_FILE_SIZE": x,
    "_ROW_COUNT": x
  }
}

2.2. DeleteMap index file encoding

Like hash index, one bucket one deleteMap index. Therefore, a deleteMap index file needs to contain bitmaps of multiple files in the same bucket, its structure is actually a map<fileName, bitmap>, to support high-performance reads, we have designed the following file encoding to store this map :

  • First, record the size of this map by an int.
  • Then, record the max fileName length by an int (because the length of the file name may not be consistent).
  • Next, record <fileName (padding to max length), the starting pos of the serialized bitmap, the size of the bitmap> in sequence.
  • Finally, record <serialized bitmap> in sequence.

e.g:

2.3 Classes

 BitmapDeleteIndex Implement DeleteIndex  based on RoaringBitmap 

...

Code Block
languagejava
titleDeleteMapIndexMaintainer.java
public interface IndexMaintainer<T, U> {

    void notifyNewRecord(T record);

    List<IndexFileMeta> prepareCommit();
    
    /* (new) delete file's index */
    default void delete(String fileName) {
        throw new UnsupportedOperationException();
    }

    /* (new) get file's index */
    default Optional<U> indexOf(String fileName) {
        throw new UnsupportedOperationException();
    }

    /** Factory to restore {@link IndexMaintainer}. */
    interface Factory<T, U> {
        IndexMaintainer<T, U> createOrRestore(
                @Nullable Long snapshotId, BinaryRow partition, int bucket);
    }
}

3. Write

3.1. Overview

Refer to the existing lookup mechanis, design a deleteFile generation mechanism based on compaction + lookup:

...

  • f8 and f9 are marked as ADD, and the new delete file is marked as ADD.

3.2. Implementation

Considerations for implementation:

  • Currently, when set 'changelog-producer' = 'lookup', the data write behavior is not atomic but divided into two steps: first, data is written to create snapshot1, then lookup compaction generates snapshot2. We need to consider the atomicity of this.
  • In most cases, the data will be transferred to level-0 first, and then rewritten. The writing overhead is a bit high, and perhaps some optimization can be done in this regard.
  • If change log needs to be generated, in theory, change log and delete file can be produced simultaneously (without reading twice).
  • The merge engine is still available.

4. Read

4.1. Overview

  1. For each read task, load the corresponding deleteFile.
  2. Construct the map<fileName, bitmap> from deleteFile.
  3. Get the bitmap based on the filename, then pass it to the reader.

4.2. Classes

Code Block
languagejava
titleApplyDeleteIndexReader.java
public class ApplyDeleteIndexReader implements RecordReader<KeyValue> {

   public ApplyDeleteIndexReader(RecordReader<KeyValue> reader, DeleteIndex deleteIndex) {
        this.reader = reader;
        this.deleteIndex = deleteIndex;
    }

    @Nullable
    @Override
    public RecordIterator<KeyValue> readBatch() throws IOException {
        RecordIterator<KeyValue> batch = reader.readBatch();
        if (batch == null) {
            return null;
        }
        return new RecordIterator<KeyValue>() {
            @Override
            public KeyValue next() throws IOException {
                while (true) {
                    KeyValue kv = batch.next();
                    if (kv == null) {
                        return null;
                    }
                    if (!deleteIndex.isDeleted(kv.position())) {
                        return kv;
                    }
                }
            }
        };
    }
  ...
}


5. Maintenance

5.1. compaction

We can incorporate bitmap evaluation during compaction pick, such as when the proportion of deleted rows in a file reaches like 50%, we can pick it for compaction.

5.2. expire

Determine whether to delete based on the delete and add records in the deleteFileManifest.

6. Other considerations

  1. Impact on file meta: Currently, the stats (min, max, null count) in file meta are already unreliable, so no special handling will be performed for this aspect.
  2. ...

Compatibility, Deprecation, and Migration Plan

New conf

delete-map.enabled : contral whether to enable delete map mode, limit:

  • The first version only supports file.format = parquet , and more formats will be supported in the future.
  • Only support changelog-producer  = none  or lookup 

Migration

  1. Conversion between delete file mode (just temporarily call it) and original LSM

...