You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 19 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: 

JIRA or Github Issue: 

Released: <Doris Version>

Google Doc: <If the design in question is unclear or needs to be discussed and reviewed, a Google Doc can be used first to facilitate comments from others.>

Motivation

Cloud object storage is cheaper than multi replication local storage, thus we can put cold data to s3 to store much more data at lower price.  To be more general, doris should not lose any feature due to putting cold data to s3.

Related Research

There is an implementation migrating data to s3, https://github.com/apache/incubator-doris/pull/9197.

The specific approach to this implementation is:
1. Use schema change to generate cold data migration jobs to s3. This job is at the partition level.
2. Complete the data migration in the BE side by using similar logic as schema change.

Advantage:
1. The implementation is simple and the progress is controllable.

Because the same logic of schema change is adopted, the entire process has FE to achieve final control, and it can ensure that the atomic effect at the partition level is effective, and no intermediate state is generated.

Shortcoming:
1. load on cold data cannot be supported.
2. Cannot support schema change.

However, these two functions are strong requirements of users. We need to realize the tiered storage of data in cold and hot without affecting the complete functions of Doris.

Detailed Design

The proposal aims to store cold rowsets in s3 without losing any feature, like updating and schema changing.  The whole work can be divided into four parts.

  1. Policy: How to define and describe the hot and cold properties of data, such as cooling time, storage location, etc.
  2. Decision: How to execute a policy, such as who initiates data migration tasks, how to synchronize multiple replicas, etc.
  3. Action: Interaction logic with s3, such as reading and writing of IO stack
  4. Result:The status of data after data migration is completed, such as organization of storage paths, garbage cleaning, clone and deletion, compaction, etc.

1. Policy

Currently, Doris supports local data tiered storage, and its general approach is as follows:

  1. Local storage is divided into HDD and SSD, corresponding to cold storage and hot storage respectively.
  2. The cooling time can be set at the partition level. After expiration, FE will migrate data from HDD to SSD through storage migration task.

In order to ensure compatibility with the current logic, and to keep the structure of the code clear. When implementing S3 storage, we use an additional set of strategies.

Along with the previous strategy, the new tiered storage strategy is as follows:

  • Local
    • HDD
    • SSD
  • Remote(S3)

The Local is the current hierarchical storage implementation of HDD and SSD. The Remote refers to S3.

For the Local level, we keep the original strategy unchanged, that is, the partition-level hierarchical storage setting is still supported.

For the Remote level, we only support policy settings at the table level. However, the application granularity of this policy is still at the partition level. This way we can ensure that the strategy is simple enough.

StoragePolicy

First, user can create a storage policy and apply it to a table. 

CREATE RESOURCE "storage_policy_name"
PROPERTIES(
     "type"="storage_policy",
     "cooldown_datetime" = "2022-06-01", // time when data is transfter to medium
     "cooldown_ttl" = "1h", // data is transfter to medium after 1 hour
     "s3_resource" = "my_s3" // point to a s3 resource
);

CREATE TABLE example_db.table_hash
(
    k1 BIGINT,
    k2 LARGEINT,
    v1 VARCHAR(2048) REPLACE,
    v2 SMALLINT SUM DEFAULT "10"
)
UNIQUE KEY(k1, k2)
DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
PROPERTIES(
    "storage_medium" = "SSD",
    "sotrage_policy" = "storage_policy_name"
);

When a cooldown_datetime is specified, cooldown_ttl is ignored.

Users can modify cooldown_datetime, cooldown_ttl, s3_ak, s3_sk of a storage policy, others attributed are not allowed to be modified.  For simplicity, be refresh storage policy periodically to refresh ak sk.

A storage policy can be applied to multi tables. And user can simple modify one policy to apply to many tables.

2. Decision

We still use FE to trigger migration, that is, FE perceives which partitions or tablets have expired through metadata, and then informs BE of the information.

But instead of using the migration task, we just notify the BE of this information, and each BE completes the data migration (uploading to s3) operation independently.

On the BE side, data uploading is performed at the rowset level.

The advantage of this is that for data loading, new data is still written locally in the previous way. load are not affected.

But here we need to modify the tablet's metadata and read-write logic, and abstract it at the IO layer to shield the impact of different storage locations on the read-write process.


Choose which replica to upload

If multiple replicas are uploaded at the same time, resources and storage space will be wasted. Here we can simply adopt some avoidance strategies to coordinate the upload tasks between the replicas.

For example, a replica first asks whether other replicas have started uploading before uploading, and if so, wait 10 minutes and then re-inquire.

TODO: Further refinement is required here.

3. Action

Please refer to: DSIP-006: Refactor IO stack

4. Result

Schema Change

We need to implement a new schema change logic:

  1. For add/drop column, no need to modify the real data, so that we don't need to implement "hard-link" logic for S3 storage.
  2. For other schema change task that must modify the data, see "compaction".

Compaction

Because the read and write logic has been shielded at the IO layer, the logic of compaction (or schema change) is theoretically the same as the existing one(that is, read-then-write), but we need to solve the following problems:

  1. Choose which replica to do the compaction: Same as Upload.
  2. Write-Amplify problem: the compaction may introduce heavy write amplify problem, we need to try best to reduce the data that need to be downloaded or uploaded to the S3.
  3. Garbage collection: For a distributed system, these operation must have garbage data that need to be cleaned. We can use Rocksdb to save the file info. For example, write a begin kv before upload and write a end kv after upload finished. Just like a single node transaction logic.

TODO: need more detail design.

Scheduling

specific implementation steps and approximate scheduling.

  • No labels