Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: 

JIRA or Github Issue: 

Released: <Doris Version>

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>

Motivation

Cloud object storage is cheaper than multi replication local storage, thus we can put cold data to s3 to store much more data at lower price.  To be more general, doris should not lose any feature due to putting cold data to s3.

Related Research

There is an implementation migrating data to s3, https://github.com/apache/incubator-doris/pull/9197.

The specific approach to this implementation is:
1. Use schema change to generate cold data migration jobs to s3. This job is at the partition level.
2. Complete the data migration in the BE side by using similar logic as schema change.

Advantage:
1. The implementation is simple and the progress is controllable.

Because the same logic of schema change is adopted, the entire process has FE to achieve final control, and it can ensure that the atomic effect at the partition level is effective, and no intermediate state is generated.

Shortcoming:
1. load on cold data cannot be supported.
2. Cannot support schema change.

However, these two functions are strong requirements of users.

So the main suppose of this proposal is to realize the tiered storage of data in cold and hot WITHOUT affecting the complete functions of Doris.

Detailed Design

The proposal aims to store cold rowsets in s3 without losing any feature, like updating and schema changing.  The whole work can be divided into four parts.

  1. Policy: How to define and describe the hot and cold properties of data, such as cooling time, storage location, etc.
  2. Decision: How to execute a policy, such as who initiates data migration tasks, how to synchronize multiple replicas, etc.
  3. Action: Interaction logic with s3, such as reading and writing of IO stack
  4. Result:The status of data after data migration is completed, such as organization of storage paths, garbage cleaning, clone and deletion, compaction, etc.

1. Policy

Currently, Doris supports local data tiered storage, and its general approach is as follows:

  1. Local storage is divided into HDD and SSD, corresponding to cold storage and hot storage respectively.
  2. The cooling time can be set at the partition level. After expiration, FE will migrate data from HDD to SSD through storage migration task.

In order to ensure compatibility with the current logic, and to keep the structure of the code clear. When implementing S3 storage, we use an additional set of strategies.

Along with the previous strategy, the new tiered storage strategy is as follows:

  • Local
    • HDD
    • SSD
  • Remote(S3)

The Local is the current hierarchical storage implementation of HDD and SSD. The Remote refers to S3.

For the Local level, we keep the original strategy unchanged, that is, the partition-level hierarchical storage setting is still supported.

For the Remote level, we only support policy settings at the table level. However, the application granularity of this policy is still at the partition level. This way we can ensure that the strategy is simple enough.

StoragePolicy

First, user can create a storage policy and apply it to a table. 

CREATE RESOURCE "storage_policy_name"
PROPERTIES(
     "type"="storage_policy",
     "cooldown_datetime" = "2022-06-01", // time when data is transfter to medium
     "cooldown_ttl" = "1h", // data is transfter to medium after 1 hour
     "s3_resource" = "my_s3" // point to a s3 resource
);

CREATE TABLE example_db.table_hash
(
    k1 BIGINT,
    k2 LARGEINT,
    v1 VARCHAR(2048) REPLACE,
    v2 SMALLINT SUM DEFAULT "10"
)
UNIQUE KEY(k1, k2)
DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
PROPERTIES(
    "storage_medium" = "SSD",
    "sotrage_policy" = "storage_policy_name"
);

When a cooldown_datetime is specified, cooldown_ttl is ignored.

Users can modify cooldown_datetime, cooldown_ttl, s3_ak, s3_sk of a storage policy, others attributed are not allowed to be modified.  For simplicity, be refresh storage policy periodically to refresh ak sk.

A storage policy can be applied to multi tables. And user can simple modify one policy to apply to many tables.

2. Scheduling rules for cooldown

DORIS local storage is used as the carrier of hot data, while external clusters (HDFS, S3, etc.) are used as carriers of cold data. During the import process, the data first exists as hot data and is stored on the local disk of the BE node. When the data needs to be turned cold, create a cold data copy shard for the hot data shard, and then dump the data to the external cluster specified by the cold data. After the cold data copy is generated, the hot data is sharded delete.
As shown in the figure below, when the data becomes cold data, the BE will retain the metadata information of a cold data locally. When a query hits cold data, BE will use this metadata information to cache the cold data locally.

For cold data, the frequency of use is very low, so that limited BE nodes can be used to manage more data, and the cost will be much lower than that of pure local storage.

cooldown_tasks_producer_thread

As shown in the figure below, cooldown_tasks_producer_thread is a daemon process of BE, which traverses all surviving TABLETs of this BE, and checks the configuration information of each TABLET. When it is found that the TABLET is configured with storage_policy, it means that it needs to perform hot and cold data conversion.
According to the configuration in storage_policy, BE will obtain the corresponding rule information from the StoragePolicy list in the cache information, and then according to this rule, determine whether the current tablet needs to perform hot and cold data conversion, and store the data on a remote storage cluster (such as S3) .

3. Cold data read and write

Storage structure under the hot and cold data model

When BE stores TABLET data, there will be ROWSET and SEGMENT divisions under TABLET. Among them, ROWSET represents the data import batch, and the same ROWSET generally represents a batch of import tasks, such as a stream load, a begin/commit transaction, etc., all correspond to a ROWSET, and this characteristic of ROWSET means that it has The characteristics of a transaction, that is, the same rowset can exist as an independent data unit, and the data in it is either all valid or all invalid.
Because of this, using rowset as the basic unit to perform hot and cold conversion of data can more easily solve the problem of new data being written during the migration of hot and cold data.
As shown in the figure below, for the TABLET entering the hot and cold data conversion state, its ROWSET is divided into two parts.
Part of it is locally, and this part of data is often newly written data, and the upload operation has not yet been triggered.
The other part is in the remote storage cluster (S3/HDFS). This part of data is relatively early and has been triggered to be uploaded to the storage cluster before.
The combination of the two parts is a complete TABLET.

Cold data reading

When cold data needs to be read, since the data has been split into two parts, the data needs to be read from the local and remote storage clusters (S3/HDFS) respectively.
As shown in the figure below, the reading of local files and remote files is encapsulated into a virtual base class of FileReader, which implements two derived classes LocalFileReader and S3FileReader, corresponding to local file reading and S3 file reading respectively. When a read request arrives at TABLET, TABLET will find the corresponding ROWSET according to the conditions. Some of these ROWSETs are local storage and some are remote storage (S3). Through the mapping relationship, ROWSET finds their respective FileReaders, completes the data reading, and the complete TABLET data is obtained after merging.
Here, in order to ensure the reading efficiency of remote data files, there can be various optimization directions, such as adding a layer of local cache, such as using local indexes, etc. These are detailed in subsequent articles.

Cold data writing

Similar to cold data reading, cold data writing also encapsulates a FileWriter virtual base class, as shown in the following figure:

Newly written data will add a ROWSET to the local storage part of the TABLET, which is the same as a normal TABLET, and also ensures that cold data can also be written. And this part of the data written locally will be merged with the remote cold data at a certain point in time, and uploaded to the remote storage cluster. This step is done by the daemon process cooldown_tasks_producer_thread mentioned above.

4. Cache Manager

Design points

1. The cache layer is compatible with some interfaces of the IO layer, and a cache layer is added between FileReader and Segment. Read() parameter is filepath, ***
2. The generation of cache layer files is triggered by Reader, and the unified CacheManager manages its life cycle.
3. Take a Segment as a unit and split it into INDEX files and DATA files.
4. When Reader triggers CACHE generation, it pulls all footer and INDEX information, decides whether to hit the current segment according to footer and INDEX, and downloads the data file after hitting.
5. Each CACHE marks its effective time when it is generated, and registers it in CacheManager, which is used to manage CACHE files and delete them after timeout

FileCache

Added the FileCache interface, which is used to handle the local cache of remote files. A layer of Cache is added between Segment and FileReader to generate cache files on the local disk. As shown below:

1. Status read_at(size_t offset, Slice result, size_t* bytes_read)
It is used to read files, and the function body includes pulling remote files to the local and generating cache files. The cache file is specified by cache_file_path
2. const Path& cache_file_path()
The directory where the cache files are located.
3. size_t cache_file_size()
The size of the cache file.
4. FileReader* remote_file_reader()
FileReader for remote files
5. Status clean_timeout_cache()
Clean up timed out cache files.
6. Status clean_all_cache()
Empty all cache files, execute when the file needs to be emptied. Such as cache file refresh, compaction, or drop tablet.

WholeFileCache

WholeFileCache pulls complete files when caching, and the files correspond exactly to the remote RemoteFile.
_alive_time_sec specifies the retention time of the cache file, and _last_match_time is used to mark the last time the local cache file was queried. When the retention time is exceeded, the cache file will be cleared. The management of cache files is carried out in FileCacheManager.
_cache_file_reader is used to specify the handle of the local cache file, which is generated when the local cache is created.

SubFileCache

SubFileCache will split a file into multiple when caching, and put them in _cache_file_readers. Generate a key according to the offset, and generate split cache files in the cache directory.
_cache_file_readers is used to specify the handle of the local cache file. Unlike _cache_file_reader in WholeFileCache, _cache_file_readers corresponds to multiple cache files, which are split from the remote files corresponding to Segment.

FileCacheManager

FileCacheManager is used to manage FileCache, when there is a new FileCache, register it to _file_cache_map. The Cache files in _file_cache_map are periodically traversed, and the clean_timeout_cache() of FileCache is called to perform cache file cleaning operations in FileCache.

TFileCacheType

Add file_cache_type in StoragePolicy, the default is NO_CACHE:
1. NO_CACHE: Do not use local cache
2. WHOLE_FILE: The cache file corresponds to a complete segment.
3. SUB_FILE: The cache file is split into multiple subfiles by Segment.

5. Single remote replica mode

Description

Single copy, as the name suggests, means that the number of data copies stored on the remote cluster is one, that is, on top of the original file system copy management of the cluster, no additional layer of copy management is added to greatly reduce the occupied storage space.
In order to meet the collaborative management between different copies, a single copy selects one of several copies as the upload node uniformly by FE, and the copy that is not selected will only check whether the data of the current segment has been uploaded on the cluster, and delete the local one synchronously. data. The copy of the selected uploaded file uploads the data file normally, and uploads the current tablet data to the cluster for inspection by other shards.

Organization Chart

Main parameters

FE:

a) Tablet state
  cooldown_replica_id=-1 (persistent): The replica that can currently upload data, FE will send it to BE.
cooldown_term=-1 (persistent): The term of cooldown_replica_id, which is monotonically increasing, is used to prevent BE from receiving cooldown_replica_id out of order or receiving an outdated cooldown_replica_id from FE with split-brain. FE sends cooldown_replica_id to BE at the same time.
b) Replica state
cooldowned_version=-1 (non-persistent): Bring it when BE reports tablet info to FE. The cooldown_replica_id can be used for reference when re-selection is required (e.g. select the replica with the largest cooldowned_version).
cooldown_meta_id="" (non-persistent): bring it when BE reports tablet info to FE. It is used to judge that the upload progress of each replica has reached a consensus, so as to trigger the deletion task. (see Synchronization meta for the principle)

BE:

cooldown_replica_id=-1 (non-persistent)
cooldown_term=-1 (non-persistent)
cooldowned_version=-1 (non-persistent): The maximum version of the continuous rowset uploaded by the tablet, which can be calculated if the BE is restarted.
cooldown_meta_id="" (persistent): The uuid bound to the cold data meta, every upload of data/ColdDataCompaction will generate a new meta and generate a uuid, the meta data uploaded to s3 is {uuid, rowset_metas}. For replicas with the same cooldown_meta_id, the meta of the cold data part is exactly the same.

CooldownConfHandler

CooldownConfHandler uniformly manages the configuration of data cooling, which only takes effect on the MASTER node, and the relevant information is stored in the metadata of the tablet. The processing here is only for ordinary Tablet, not for ShadowTablet. The copy information is consistent with that of show tablet.
When the CooldownReplicaId of a tablet is inconsistent with the state reported by the BE, the CooldownReplicaId needs to be synchronized first. When the replica of a tablet is abnormal (CooldownReplicaId does not match the ReplicaId in the current FE), the CooldownReplicaId is reassigned.
PushCooldownConf sends configuration information to BE: cooldown_replica_id, cooldown_term.

Cooldown on the BE side processes

Since different copies of the same batch of imported data under the same tablet have different rowset_ids, when uploading a segment, the corresponding TabletMeta must be uploaded at the same time for reference by other copies of the tablet. The information only needs to include the uploaded segment, not the local segment.
When the CooldownReplicaId is the same as the current ReplicaId, it means that the current Replica needs to perform an upload operation. According to the data range on the cluster, upload the subsequent Segment, and then update the remote TabletMeta.
When the CooldownReplicaId is different from the current ReplicaId, during cooldown, first read the TabletMeta file in the remote cluster directory, check the existing data range on the remote cluster (which Segments have been uploaded), and compare with the local TabletMeta Compare, if the progress is consistent with the local Meta, merge the remote TabletMeta.

Delete invalid files

Since remote files are shared by multiple BEs in single-copy mode, deleting files must ensure that they are no longer used in all copies.
1. The BE side collects the file list and generates delete_id
The replicas corresponding to the CooldownReplicaId on the BE end regularly list, and query which files in the remote Tablet directory are not in the TabletMeta. This is an invalid file, and the corresponding delete_id is generated and stored in the BE cache. At the same time, delete_id, cooldown_meta_id, and cooldowned_version are reported to FE through tabletReport.
2. The FE sends a file deletion request
When the FE side checks that the tabletReport contains delete_id, check whether the Replica is CooldownReplicaId, and check whether the metadata of other Replicas are synchronized with the current Replica according to cooldown_meta_id and cooldowned_version. Only when the progress of all replicas is synchronized can it be confirmed that other replicas are no longer using these invalid files.
Check whether the current FE is the Master. After checking and confirming, send a CooldownDelete request and send the delete_id back to the BE that just reported the status.
3. Delete invalid files on the BE side
After receiving the CooldownDelete request from FE, the BE side checks whether the delete_id is consistent with the cache. If it is consistent, it needs to delete the invalid file corresponding to the delete_id.

Compaction

Compared with the local copy, when processing the compaction of hot and cold data, the local rowset and the remote rowset need to be compacted separately. Compaction logic uses the original compaction logic.
When the remote file performs compaction, the upload node generates a new rowset according to the version, rebuilds the RemoteTabletMeta, and uploads it to the remote.
Then, the download node downloads the remote meta and synchronizes with the upload node. Junk files are deleted through the previous logic.


  • No labels