You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

TL;DR

This document described how will Celeborn support writing and reading shuffle files from worker’s memory.

Introduction

Celeborn won’t fully use huge memory if disks are fast enough and write small shuffle files to disks inefficiently. I think there is a need to enable Celeborn to write shuffle files in memory. 

Goals:

  1. Take advantage of memory’s speed to decrease shuffle time.
  2. Use different storage to boost shuffle throughput. A memory shuffle file can be evicted to local disks or HDFS.


Proposal

General


A Celeborn worker’s off-heap can be divided into two parts logically. 

  1. Credit-based stream read buffers. To adapt different engines, Celeborn supports credit-based streams and uses some off-heap memory to achieve IO scheduling and improve reading performance. This region won’t exist if there are no credit-based streams.
  2. Shuffle file disk cache. In order to achieve a high-performance IO pattern, the Celeborn worker will receive and aggregate shuffle data sent from clients. A Celeborn worker will flush cached shuffle data if its size grows to 256KB(local disks) or 4MB(HDFS).



I’ll add a new logical region called “memory shuffle file storage”, and it will store shuffle data files in memory unless some files are too large to store. This region is made of a counter record memory-stored shuffle file size instead of copy memory.  If some memory-stored shuffle files are too large to store, the shuffle files will be evicted to local disks or HDFS if available.  

The eviction operations are monodirectional, if a shuffle file is written to disks or HDFS, it will never be brought to memory storage again.

The size of a single memory-stored shuffle file can be as large as the whole region. The celeborn clients will be able to read shuffle files from workers’ memory directly. 


  Due to the change of storage, clients will need to know the new storage location for a shuffle partition. `OpenStream` RPC will return the storage location for a shuffle partition, so this RPC must be complete before any reader is created.

Details

I’ll explain how I will implement the proposal. To support memory file storage, there are several changes:

Config changes

Add `celeborn.worker.directMemoryRatioForMemoryFileStorage`, default value is 0.3 means that memory file storage region will use up to 30% of worker’s off-heap memory.

Add `celeborn.worker.memoryFileStorage.maxFileSize`, the default value is “8MB”.

Add `celeborn.worker.directMemoryRatioToResume`, it will resume channel reading if   ‘‘(current direct memory usage - current memory file storage usage) / ( max direct memory - memory file storage usage )’ is lesser than the configured value.

Update `celeborn.storage.activeTypes`, adding memory as an option.

Change the relationship between `FileWriter` and `FileInfo`

The relationship between `FileWriter` and `FileInfo` will be changed to one-to-one correspondence.

Enhance `FileInfo`

Add storage type to indicate what storage this writer is writing.

Add a flag to indicate this file is read by map ID range.

Record all streams reading from this file.

Add a counter to record data size written to memory storage.


Enhance `FileWriter`

A file writer will need to write shuffle data into different storages and it will need to implement the logic of eviction.

A file writer needs to record the limit of shuffle memory files, the limit to trigger flush on disk, and the limit to trigger flush on HDFS.

There might be no flusher because a worker can have memory storage only, so file writer need to adapt to empty flushers.

If a writer is writing to memory storage, it will need a fileinfo to record meta and update its meta if a writer executes eviction.

A writer needs to check its status before the worker finishes handling pushData and return hard split flag if data size in memory storage is greater than `celeborn.worker.memoryFileStorage.maxFileSize` or memory storage usage is greater than the memory storage limit. 

A writer needs to implement eviction which can be divided into 4 steps. First, check whether there is another storage available. Second, create files and connect to the corresponding flusher. Third, invoke `flushOnMemoryPressure` to trigger flush operation. Last, update the meta in fileinfo connected to this file writer. Ecivtion can be performed if this file is not being read. If this file needs to be sorted, write the sorted data to the next storage.

Close methods will need modification to adapt storing shuffle files in memory and updating the corresponding chunk offset should be enough.


Enhance `MemoryManager`

Add a new atomic long counter to record the total size of shuffle files in memory.

Add a new thread to check whether memory storage usage is above the limit and evict partitions stored in memory by the order of size. If memory storage usage is below the limit, stop eviction procedures.

For a non-skew partition, the client should send a StreamEnd RPC to tell the server that the partition is fully read. 

For a skew partition, the client should send a StreamEnd RPC to tell the server that the map ID range has been fully read. Fileinfo will have a bitmap to record map IDs that have not been read. If the bitmap is empty then the partition is read fully. The bitmap shall be constructed when OpenStream RPC is received or use the existing bitmap.

If the client fails to send `StreamEnd` to the server, the server will not treat a partition as fully read until the shuffle is expired.

Changes to Worker

The method `handlePushMergedData` will perform the check method of FileWriter and return hard split flag for a `PushMergedData`.

File writers that are writing to memory will ignore the threshold of split until reach the limit of a shuffle partition.

PartitionSorter will sort shuffle data in memory without copying memory.

A Celeborn worker will save all partitions in memory graceful shutdown is triggered and it won’t load persisted shuffle files in memory again. If there is no available storage, the memory shuffle file will be lost.

Remove in-memory shuffle files when the shuffle is expired or the application is expired.

Changes to Client

Merge common logic from  `DfsPartitionReader`, `LocalPartitionReader`, and `WorkerPartitionReader`.

The client will always send an `OpenStream` RPC first while reading shuffle data then create corresponding readers from the respo

  • No labels