You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 25 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: 

JIRA or Github Issue: 

Released: <Doris Version>

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>

Motivation

Cloud object storage is cheaper than multi replication local storage, thus we can put cold data to s3 to store much more data at lower price.  To be more general, doris should not lose any feature due to putting cold data to s3.

Related Research

There is an implementation migrating data to s3, https://github.com/apache/incubator-doris/pull/9197.

The specific approach to this implementation is:
1. Use schema change to generate cold data migration jobs to s3. This job is at the partition level.
2. Complete the data migration in the BE side by using similar logic as schema change.

Advantage:
1. The implementation is simple and the progress is controllable.

Because the same logic of schema change is adopted, the entire process has FE to achieve final control, and it can ensure that the atomic effect at the partition level is effective, and no intermediate state is generated.

Shortcoming:
1. load on cold data cannot be supported.
2. Cannot support schema change.

However, these two functions are strong requirements of users.

So the main suppose of this proposal is to realize the tiered storage of data in cold and hot WITHOUT affecting the complete functions of Doris.

Detailed Design

The proposal aims to store cold rowsets in s3 without losing any feature, like updating and schema changing.  The whole work can be divided into four parts.

  1. Policy: How to define and describe the hot and cold properties of data, such as cooling time, storage location, etc.
  2. Decision: How to execute a policy, such as who initiates data migration tasks, how to synchronize multiple replicas, etc.
  3. Action: Interaction logic with s3, such as reading and writing of IO stack
  4. Result:The status of data after data migration is completed, such as organization of storage paths, garbage cleaning, clone and deletion, compaction, etc.

1. Policy

Currently, Doris supports local data tiered storage, and its general approach is as follows:

  1. Local storage is divided into HDD and SSD, corresponding to cold storage and hot storage respectively.
  2. The cooling time can be set at the partition level. After expiration, FE will migrate data from HDD to SSD through storage migration task.

In order to ensure compatibility with the current logic, and to keep the structure of the code clear. When implementing S3 storage, we use an additional set of strategies.

Along with the previous strategy, the new tiered storage strategy is as follows:

  • Local
    • HDD
    • SSD
  • Remote(S3)

The Local is the current hierarchical storage implementation of HDD and SSD. The Remote refers to S3.

For the Local level, we keep the original strategy unchanged, that is, the partition-level hierarchical storage setting is still supported.

For the Remote level, we only support policy settings at the table level. However, the application granularity of this policy is still at the partition level. This way we can ensure that the strategy is simple enough.

StoragePolicy

First, user can create a storage policy and apply it to a table. 

CREATE RESOURCE "storage_policy_name"
PROPERTIES(
     "type"="storage_policy",
     "cooldown_datetime" = "2022-06-01", // time when data is transfter to medium
     "cooldown_ttl" = "1h", // data is transfter to medium after 1 hour
     "s3_resource" = "my_s3" // point to a s3 resource
);

CREATE TABLE example_db.table_hash
(
    k1 BIGINT,
    k2 LARGEINT,
    v1 VARCHAR(2048) REPLACE,
    v2 SMALLINT SUM DEFAULT "10"
)
UNIQUE KEY(k1, k2)
DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
PROPERTIES(
    "storage_medium" = "SSD",
    "sotrage_policy" = "storage_policy_name"
);

When a cooldown_datetime is specified, cooldown_ttl is ignored.

Users can modify cooldown_datetime, cooldown_ttl, s3_ak, s3_sk of a storage policy, others attributed are not allowed to be modified.  For simplicity, be refresh storage policy periodically to refresh ak sk.

A storage policy can be applied to multi tables. And user can simple modify one policy to apply to many tables.

2. Decision

We still use FE to trigger migration, that is, FE perceives which partitions or tablets have expired through metadata, and then informs BE of the information.

But instead of using the migration task, we just notify the BE of this information, and each BE completes the data migration (uploading to s3) operation independently.

On the BE side, data uploading is performed at the rowset level.

The advantage of this is that for data loading, new data is still written locally in the previous way. load are not affected.

But here we need to modify the tablet's metadata and read-write logic, and abstract it at the IO layer to shield the impact of different storage locations on the read-write process.


Choose which replica to upload

If multiple replicas are uploaded at the same time, resources and storage space will be wasted. Here we can simply adopt some avoidance strategies to coordinate the upload tasks between the replicas.

For example, a replica first asks whether other replicas have started uploading before uploading, and if so, wait 10 minutes and then re-inquire.

TODO: Further refinement is required here.

3. Action

Please refer to: DSIP-006: Refactor IO stack

4. Result

Schema Change

We need to implement a new schema change logic:

  1. For add/drop column, no need to modify the real data, so that we don't need to implement "hard-link" logic for S3 storage.
  2. For other schema change task that must modify the data, see "compaction".

Compaction

Because the read and write logic has been shielded at the IO layer, the logic of compaction (or schema change) is theoretically the same as the existing one(that is, read-then-write), but we need to solve the following problems:

  1. Choose which replica to do the compaction: Same as Upload.
  2. Write-Amplify problem: the compaction may introduce heavy write amplify problem, we need to try best to reduce the data that need to be downloaded or uploaded to the S3.
  3. Garbage collection: For a distributed system, these operation must have garbage data that need to be cleaned. We can use Rocksdb to save the file info. For example, write a begin kv before upload and write a end kv after upload finished. Just like a single node transaction logic.

TODO: need more detail design.

4. Single remote replica mode

Description

The current cold and hot data is divided into two parts. The hot data is stored on the local disk and the cold data is stored in the remote cluster. Due to the independence of segments between BEs, cold data has multiple copies on the remote cluster, resulting in data redundancy. To solve this problem, the single remote replica is proposed.

A single remote replica, as the name implies, means that the number of data copies saved on the remote cluster is one, that is, no additional layer of copy management is added to the original file system copy management of the cluster, so as to greatly reduce the storage space occupied.

In order to meet the cooperative management between different replicas, the FE selects one of several replicas as the upload node, while the replica that is not selected will only check whether the current segment data has been uploaded on the cluster, and synchronously delete the local data. The copy of the selected upload file will upload the data file normally, and upload the data of the current tablet to the cluster for other fragment checks.

Organization Chart

CooldownHandler in FE

CooldownHandler uniformly manages the configuration of data transfer to cold, which only takes effect in the MASTER node. There is a list that records all the tablets configured with cold and hot data rules and their replicas, and checks the status of each replica. The processing here is only for ordinary tablets and does not operate on ShadowTablets. The replica information is consistent with the show tablet.

When the CooldownType of a tablet is inconsistent with the status reported by BE, the CooldownType status needs to be synchronized first (first turn off the upload of the upload copy of the file being uploaded and set it to USE_REMOTE_DATA; then set the new upload copy status to UPLOAD_DATA). When the copy of a tablet is abnormal (the CooldownType of all copies is USE_REMOTE_DATA, the tablet with the CooldownType specified as UPLOAD_DATA is abnormal, and the tablet with the CooldownType specified as UPLOAD_DATA has not reported the heartbeat (lastUpdateTime) after the timeout period, it is determined that the tablet needs to reselect the Upload copy and set the CooldownType to UPLOAD_ DATA。

The CooldownType on the FE side can only be accessed from the USE_ REMOTE_ DATA changes to UPLOAD_ DATA,UPLOAD_ Generally, the DATA node does not need to be changed unless BE times out. When BE times out, the FE side will automatically process the TABLET migration. The newly migrated TABLET is defaulted to USE_ REMOTE_ DATA, UPLOAD will be re specified_ DATA node. Ensure that two CooldownTypes are not UPLOAD at the same time_ Node of DATA.

Considering that the coolown operation does not require high real-time performance, and in order to prevent UPLOAD_ DATA switches frequently, and the timeout threshold can be set longer, such as half an hour.

Cooldown in BE

The CooldownType status of each tablet on the BE side is USE by default_ REMOTE_ DATA, that is, do not upload. Only when FE sends a request to set CooldownType to UPLOAD_ Only DATA updates its status and sets it to upload.

If the heartbeat time reported by the Tablet last exceeds the timeout threshold mentioned above, according to the timeout processing mechanism of the FE end, the current Tablet has been set to USE_ REMOTE_ The possibility of DATA. At this time, set CooldownType to USE_ REMOTE_ DATA, stop uploading. Later, wait for FE to re initiate the request and redistribute CooldownType to BE.

In the process of uploading files on the BE side, if the operation takes too long, the timeout threshold should also be checked regularly. If timeout is found, the current upload request should be stopped immediately and the uploaded files should be deleted. (If the remote TabletMeta has been updated, it indicates that the operation has been completed, and the file does not need to be deleted)

UPLOAD_ DATA is a super operation with high risk. It is necessary to ensure that only one copy is performing upload requests at the same time. Therefore, some real-time performance can be sacrificed to reduce the probability of this risk.

BE in UPLOAD_ DATA type

Because different copies of the same batch of imported data under the same tablet rowset_ The IDs are different. When uploading a segment, its corresponding TabletMeta should be uploaded for reference by other copies of the tablet. This information only needs to include the Upload segment, not the local segment.

When in coolown, first read the TabletMeta file under the remote cluster directory, check the existing data range (which segments have been uploaded) on the remote cluster according to it, and compare it with the local TabletMeta. If the remote TabletMeta of the local record is different, merge the remote TabletMeta.

According to CooldownType status, there are two operations:

If CooldownType is UPLOAD_ DATA, indicating that the current tablet needs to be uploaded. Upload the subsequent segments according to the data range on the cluster, and then update the remote TabletMeta. During the upload process, the CooldownType should also be checked regularly to prevent the CooldownType update from not being found because the upload time is too long.

If CooldownType is USE_ REMOTE_ DATA, indicating that it is not necessary to upload files. Skip the upload step and directly delete the local redundant segments after merging the TabletMeta.

Note: In order to prevent inconsistency between different replicas of the local Rowset, which will result in the inability to de duplicate the cold data on the remote cluster, the cold data compression operation only processes the segments on the remote cluster, not the local segments.

BE in USE_ REMOTE_ DATA type

Since the remote TabletMeta may add or delete Rowsets, the local TabletMeta may not match the remote TabletMeta, that is, the specified file cannot be found when reading the remote data. In this case, you need to trigger the TabletMeta synchronization operation to merge the remote TabletMeta, and then continue the reading operation.

Scheduling

specific implementation steps and approximate scheduling.

  • No labels