Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>

View file
nameApache Doris冷热数据详细设计.docx
height250

Motivation

Cloud object storage is cheaper than multi replication local storage, thus we can put cold data to s3 to store much more data at lower price.  To be more general, doris should not lose any feature due to putting cold data to s3.

...

A storage policy can be applied to multi tables. And user can simple modify one policy to apply to many tables.

2.

...

We still use FE to trigger migration, that is, FE perceives which partitions or tablets have expired through metadata, and then informs BE of the information.

But instead of using the migration task, we just notify the BE of this information, and each BE completes the data migration (uploading to s3) operation independently.

On the BE side, data uploading is performed at the rowset level.

The advantage of this is that for data loading, new data is still written locally in the previous way. load are not affected.

But here we need to modify the tablet's metadata and read-write logic, and abstract it at the IO layer to shield the impact of different storage locations on the read-write process.

Choose which replica to upload

If multiple replicas are uploaded at the same time, resources and storage space will be wasted. Here we can simply adopt some avoidance strategies to coordinate the upload tasks between the replicas.

For example, a replica first asks whether other replicas have started uploading before uploading, and if so, wait 10 minutes and then re-inquire.

TODO: Further refinement is required here.

3. Action

Please refer to: DSIP-006: Refactor IO stack

4. Result

Schema Change

We need to implement a new schema change logic:

  1. For add/drop column, no need to modify the real data, so that we don't need to implement "hard-link" logic for S3 storage.
  2. For other schema change task that must modify the data, see "compaction".

Compaction

Because the read and write logic has been shielded at the IO layer, the logic of compaction (or schema change) is theoretically the same as the existing one(that is, read-then-write), but we need to solve the following problems:

  1. Choose which replica to do the compaction: Same as Upload.
  2. Write-Amplify problem: the compaction may introduce heavy write amplify problem, we need to try best to reduce the data that need to be downloaded or uploaded to the S3.
  3. Garbage collection: For a distributed system, these operation must have garbage data that need to be cleaned. We can use Rocksdb to save the file info. For example, write a begin kv before upload and write a end kv after upload finished. Just like a single node transaction logic.

TODO: need more detail design.

4. Single remote replica mode

Doc in chinese

View file
name冷热数据单一副本模式详细设计.docx
height250

Description

The current cold and hot data is divided into two parts. The hot data is stored on the local disk and the cold data is stored in the remote cluster. Due to the independence of segments between BEs, cold data has multiple copies on the remote cluster, resulting in data redundancy. To solve this problem, the single remote replica is proposed.

A single remote replica, as the name implies, means that the number of data copies saved on the remote cluster is one, that is, no additional layer of copy management is added to the original file system copy management of the cluster, so as to greatly reduce the storage space occupied.

In order to meet the cooperative management between different replicas, the FE selects one of several replicas as the upload node, while the replica that is not selected will only check whether the current segment data has been uploaded on the cluster, and synchronously delete the local data. The copy of the selected upload file will upload the data file normally, and upload the data of the current tablet to the cluster for other fragment checks.

...

Scheduling rules for cooldown

DORIS local storage is used as the carrier of hot data, while external clusters (HDFS, S3, etc.) are used as carriers of cold data. During the import process, the data first exists as hot data and is stored on the local disk of the BE node. When the data needs to be turned cold, create a cold data copy shard for the hot data shard, and then dump the data to the external cluster specified by the cold data. After the cold data copy is generated, the hot data is sharded delete.
As shown in the figure below, when the data becomes cold data, the BE will retain the metadata information of a cold data locally. When a query hits cold data, BE will use this metadata information to cache the cold data locally.

For cold data, the frequency of use is very low, so that limited BE nodes can be used to manage more data, and the cost will be much lower than that of pure local storage.

Image Added

cooldown_tasks_producer_thread

As shown in the figure below, cooldown_tasks_producer_thread is a daemon process of BE, which traverses all surviving TABLETs of this BE, and checks the configuration information of each TABLET. When it is found that the TABLET is configured with storage_policy, it means that it needs to perform hot and cold data conversion.
According to the configuration in storage_policy, BE will obtain the corresponding rule information from the StoragePolicy list in the cache information, and then according to this rule, determine whether the current tablet needs to perform hot and cold data conversion, and store the data on a remote storage cluster (such as S3) .

Image Added

3. Cold data read and write

Storage structure under the hot and cold data model

When BE stores TABLET data, there will be ROWSET and SEGMENT divisions under TABLET. Among them, ROWSET represents the data import batch, and the same ROWSET generally represents a batch of import tasks, such as a stream load, a begin/commit transaction, etc., all correspond to a ROWSET, and this characteristic of ROWSET means that it has The characteristics of a transaction, that is, the same rowset can exist as an independent data unit, and the data in it is either all valid or all invalid.
Because of this, using rowset as the basic unit to perform hot and cold conversion of data can more easily solve the problem of new data being written during the migration of hot and cold data.
As shown in the figure below, for the TABLET entering the hot and cold data conversion state, its ROWSET is divided into two parts.
Part of it is locally, and this part of data is often newly written data, and the upload operation has not yet been triggered.
The other part is in the remote storage cluster (S3/HDFS). This part of data is relatively early and has been triggered to be uploaded to the storage cluster before.
The combination of the two parts is a complete TABLET.

Image Added

Cold data reading

When cold data needs to be read, since the data has been split into two parts, the data needs to be read from the local and remote storage clusters (S3/HDFS) respectively.
As shown in the figure below, the reading of local files and remote files is encapsulated into a virtual base class of FileReader, which implements two derived classes LocalFileReader and S3FileReader, corresponding to local file reading and S3 file reading respectively. When a read request arrives at TABLET, TABLET will find the corresponding ROWSET according to the conditions. Some of these ROWSETs are local storage and some are remote storage (S3). Through the mapping relationship, ROWSET finds their respective FileReaders, completes the data reading, and the complete TABLET data is obtained after merging.
Here, in order to ensure the reading efficiency of remote data files, there can be various optimization directions, such as adding a layer of local cache, such as using local indexes, etc. These are detailed in subsequent articles.

Image Added

Cold data writing

Similar to cold data reading, cold data writing also encapsulates a FileWriter virtual base class, as shown in the following figure:

Image Added

Newly written data will add a ROWSET to the local storage part of the TABLET, which is the same as a normal TABLET, and also ensures that cold data can also be written. And this part of the data written locally will be merged with the remote cold data at a certain point in time, and uploaded to the remote storage cluster. This step is done by the daemon process cooldown_tasks_producer_thread mentioned above.

4. Cache Manager

Image Added

Design points

1. The cache layer is compatible with some interfaces of the IO layer, and a cache layer is added between FileReader and Segment. Read() parameter is filepath, ***
2. The generation of cache layer files is triggered by Reader, and the unified CacheManager manages its life cycle.
3. Take a Segment as a unit and split it into INDEX files and DATA files.
4. When Reader triggers CACHE generation, it pulls all footer and INDEX information, decides whether to hit the current segment according to footer and INDEX, and downloads the data file after hitting.
5. Each CACHE marks its effective time when it is generated, and registers it in CacheManager, which is used to manage CACHE files and delete them after timeout

FileCache

Added the FileCache interface, which is used to handle the local cache of remote files. A layer of Cache is added between Segment and FileReader to generate cache files on the local disk. As shown below:

Image Added

Image Added

1. Status read_at(size_t offset, Slice result, size_t* bytes_read)
It is used to read files, and the function body includes pulling remote files to the local and generating cache files. The cache file is specified by cache_file_path
2. const Path& cache_file_path()
The directory where the cache files are located.
3. size_t cache_file_size()
The size of the cache file.
4. FileReader* remote_file_reader()
FileReader for remote files
5. Status clean_timeout_cache()
Clean up timed out cache files.
6. Status clean_all_cache()
Empty all cache files, execute when the file needs to be emptied. Such as cache file refresh, compaction, or drop tablet.

WholeFileCache

Image Added

WholeFileCache pulls complete files when caching, and the files correspond exactly to the remote RemoteFile.
_alive_time_sec specifies the retention time of the cache file, and _last_match_time is used to mark the last time the local cache file was queried. When the retention time is exceeded, the cache file will be cleared. The management of cache files is carried out in FileCacheManager.
_cache_file_reader is used to specify the handle of the local cache file, which is generated when the local cache is created.

SubFileCache

Image Added

SubFileCache will split a file into multiple when caching, and put them in _cache_file_readers. Generate a key according to the offset, and generate split cache files in the cache directory.
_cache_file_readers is used to specify the handle of the local cache file. Unlike _cache_file_reader in WholeFileCache, _cache_file_readers corresponds to multiple cache files, which are split from the remote files corresponding to Segment.

FileCacheManager

Image Added

FileCacheManager is used to manage FileCache, when there is a new FileCache, register it to _file_cache_map. The Cache files in _file_cache_map are periodically traversed, and the clean_timeout_cache() of FileCache is called to perform cache file cleaning operations in FileCache.

TFileCacheType

Image Added

Add file_cache_type in StoragePolicy, the default is NO_CACHE:
1. NO_CACHE: Do not use local cache
2. WHOLE_FILE: The cache file corresponds to a complete segment.
3. SUB_FILE: The cache file is split into multiple subfiles by Segment.

5. Single remote replica mode

Description

Single copy, as the name suggests, means that the number of data copies stored on the remote cluster is one, that is, on top of the original file system copy management of the cluster, no additional layer of copy management is added to greatly reduce the occupied storage space.
In order to meet the collaborative management between different copies, a single copy selects one of several copies as the upload node uniformly by FE, and the copy that is not selected will only check whether the data of the current segment has been uploaded on the cluster, and delete the local one synchronously. data. The copy of the selected uploaded file uploads the data file normally, and uploads the current tablet data to the cluster for inspection by other shards.

OrganizationChart

Image Added

Main parameters

FE:

a) Tablet state
  cooldown_replica_id=-1 (persistent): The replica that can currently upload data, FE will send it to BE.
cooldown_term=-1 (persistent): The term of cooldown_replica_id, which is monotonically increasing, is used to prevent BE from receiving cooldown_replica_id out of order or receiving an outdated cooldown_replica_id from FE with split-brain. FE sends cooldown_replica_id to BE at the same time.
b) Replica state
cooldowned_version=-1 (non-persistent): Bring it when BE reports tablet info to FE. The cooldown_replica_id can be used for reference when re-selection is required (e.g. select the replica with the largest cooldowned_version).
cooldown_meta_id="" (non-persistent): bring it when BE reports tablet info to FE. It is used to judge that the upload progress of each replica has reached a consensus, so as to trigger the deletion task. (see Synchronization meta for the principle)

BE:

cooldown_replica_id=-1 (non-persistent)
cooldown_term=-1 (non-persistent)
cooldowned_version=-1 (non-persistent): The maximum version of the continuous rowset uploaded by the tablet, which can be calculated if the BE is restarted.
cooldown_meta_id="" (persistent): The uuid bound to the cold data meta, every upload of data/ColdDataCompaction will generate a new meta and generate a uuid, the meta data uploaded to s3 is {uuid, rowset_metas}. For replicas with the same cooldown_meta_id, the meta of the cold data part is exactly the same.

CooldownConfHandler

CooldownConfHandler uniformly manages the configuration of data cooling, which only takes effect on the MASTER node, and the relevant information is stored in the metadata of the tablet. The processing here is only for ordinary Tablet, not for ShadowTablet. The copy information is consistent with that of show tablet.
When the CooldownReplicaId of a tablet is inconsistent with the state reported by the BE, the CooldownReplicaId needs to be synchronized first. When the replica of a tablet is abnormal (CooldownReplicaId does not match the ReplicaId in the current FE), the CooldownReplicaId is reassigned.
PushCooldownConf sends configuration information to BE: cooldown_replica_id, cooldown_term.

Cooldown on the BE side processes

Since different copies of the same batch of imported data under the same tablet have different rowset_ids, when uploading a segment, the corresponding TabletMeta must be uploaded at the same time for reference by other copies of the tablet. The information only needs to include the uploaded segment, not the local segment.
When the CooldownReplicaId is the same as the current ReplicaId, it means that the current Replica needs to perform an upload operation. According to the data range on the cluster, upload the subsequent Segment, and then update the remote TabletMeta.
When the CooldownReplicaId is different from the current ReplicaId, during cooldown, first read the TabletMeta file in the remote cluster directory, check the existing data range on the remote cluster (which Segments have been uploaded), and compare with the local TabletMeta Compare, if the progress is consistent with the local Meta, merge the remote TabletMeta.

Delete invalid files

Since remote files are shared by multiple BEs in single-copy mode, deleting files must ensure that they are no longer used in all copies.
1. The BE side collects the file list and generates delete_id
The replicas corresponding to the CooldownReplicaId on the BE end regularly list, and query which files in the remote Tablet directory are not in the TabletMeta. This is an invalid file, and the corresponding delete_id is generated and stored in the BE cache. At the same time, delete_id, cooldown_meta_id, and cooldowned_version are reported to FE through tabletReport.
2. The FE sends a file deletion request
When the FE side checks that the tabletReport contains delete_id, check whether the Replica is CooldownReplicaId, and check whether the metadata of other Replicas are synchronized with the current Replica according to cooldown_meta_id and cooldowned_version. Only when the progress of all replicas is synchronized can it be confirmed that other replicas are no longer using these invalid files.
Check whether the current FE is the Master. After checking and confirming, send a CooldownDelete request and send the delete_id back to the BE that just reported the status.
3. Delete invalid files on the BE side
After receiving the CooldownDelete request from FE, the BE side checks whether the delete_id is consistent with the cache. If it is consistent, it needs to delete the invalid file corresponding to the delete_id.

Compaction

Compared with the local copy, when processing the compaction of hot and cold data, the local rowset and the remote rowset need to be compacted separately. Compaction logic uses the original compaction logic.
When the remote file performs compaction, the upload node generates a new rowset according to the version, rebuilds the RemoteTabletMeta, and uploads it to the remote.
Then, the download node downloads the remote meta and synchronizes with the upload node. Junk files are deleted through the previous logic

...

Image Removed

Flow Chart

Image Removed

CooldownHandler in FE

CooldownHandler uniformly manages the configuration of data transfer to cold, which only takes effect in the MASTER node. There is a list that records all the tablets configured with cold and hot data rules and their replicas, and checks the status of each replica. The processing here is only for ordinary tablets and does not operate on ShadowTablets. The replica information is consistent with the show tablet.

When the CooldownType of a tablet is inconsistent with the status reported by BE, the CooldownType status needs to be synchronized first. When the replica of a Tablet is abnormal (the CooldownType of all replicas is USE_REMOTE_DATA), it is determined that the Tablet needs to re select the UPLOAD replica and set the CooldownType to Upload_DATA.

When the CooldownType needs to be issued, the FE side needs to send a request TPushCooldownConfReq, send the status to BE, and modify the UPLOADING file of the specified TABLET on S3_ REPLICA_ ID, the file content is that the current CooldownType is UPLOAD_ REPLICA of the copy of DATA_ ID。

The CooldownType on the FE side can only be accessed from the USE_ REMOTE_ DATA changes to UPLOAD_ DATA,UPLOAD_ Generally, the DATA node does not need to be changed unless BE times out. When BE times out, the FE side will automatically process the TABLET migration. The newly migrated TABLET is defaulted to USE_ REMOTE_ DATA, UPLOAD will be re specified_ DATA node. Ensure that two CooldownTypes are not UPLOAD at the same time_ Node of DATA.

Considering that the coolown operation does not require high real-time performance, and in order to prevent UPLOAD_ DATA switches frequently, and the timeout threshold can be set longer, such as half an hour.

Cooldown in BE

The CooldownType status of each tablet on the BE side is USE by default_ REMOTE_ DATA, that is, do not upload. Only when FE sends a request to set CooldownType to UPLOAD_ Only DATA updates its status and sets it to upload.

When CooldownType is UPLOAD_ During DATA, check UPLOADING on S3_ REPLICA_ ID, if UPLOADING_ REPLICA_ If the ID is not the current REPLICA, set the local CooldownType to USE_ REMOTE_ DATA, no longer upload. If UPLOADING_ REPLICA_ The ID is the current REPLICA, upload the file, and regenerate the remote META file on S3. The META file name needs to contain the current REPLICA_ ID to prevent write conflicts with other replicas.

When CooldownType status is USE_ REMOTE_ When DATA, obtain the UPLOADING on S3_ REPLICA_ ID, according to REPLICA_ ID to find the corresponding remote META file. Get the META file and merge it with the local META. If the VERSION corresponding to the local ROWSET is

UPLOAD_ DATA is a super operation with high risk. It is necessary to ensure that only one copy is performing upload requests at the same time. Therefore, some real-time performance can be sacrificed to reduce the probability of this risk.

BE in UPLOAD_ DATA type

Because different copies of the same batch of imported data under the same tablet rowset_ The IDs are different. When uploading a segment, its corresponding TabletMeta should be uploaded for reference by other copies of the tablet. This information only needs to include the Upload segment, not the local segment.

Each time CooldownType changes, the first launch of coolown will trigger metadata synchronization, first read the TabletMeta file in the remote cluster directory, check the existing data range on the remote cluster according to it (which VERSIONS have been uploaded), pull down the ROWSET metadata of the specified VESION, and replace the corresponding VERSION fragment in the local TabletMeta.

In coolown, check UPLOADING on S3_ REPLICA_ ID, if UPLOADING_ REPLICA_ If the ID is not the current REPLICA, set the local CooldownType to USE_ REMOTE_ DATA, no longer upload. If UPLOADING_ REPLICA_ The ID is the current REPLICA, upload the file, and regenerate the remote META file on S3.

During the whole process of uploading files, the uploaded files are all your own files and will not conflict with other copies. In extreme cases, two copies are being uploaded at the same time, because UPLOADING_ REPLICA_ If the ID file exists, only the data of one copy file will be used.

Note: In order to prevent inconsistency between different replicas of the local Rowset, the cold data cannot be de duplicated with the cold data on the remote cluster. The cold data will not be compressed temporarily.

BE in USE_ REMOTE_ DATA type

cooldown will trigger metadata synchronization. First read the UPLOADING in the directory specified in S3_ REPLICA_ ID file, read the TabletMeta file on S3 according to the ID, check the existing data range on S3 according to it (which VERSIONS have been uploaded), pull down the ROWSET metadata of the specified VESION, and replace the corresponding VERSION fragment in the local TabletMeta.

After the local metadata is updated, the local corresponding data can be deleted.

Scheduling

specific implementation steps and approximate scheduling.