authors: fmerik[at]gmail.com

Background and Motivation

In Celeborn 0.3, ShuffleReader reads shuffle data from Worker with OpenStream/FetchChunk RPC and throws CelebornIOException on read failure, causing Spark to fail and rerun the task reading shuffle data. However, Shuffle data lost, as a major source of read failure, is usually caused by bad disk or unsuccessful graceful shutdown, which can’t be mitigated with task retry and will result the job to fail. 

Currently, Celeborn relies on replication to handle Worker data lost. Considering shuffle data is intermediate and losing shuffle data only impacts a few shuffles, replicating all shuffle data in the cluster may be too costy. 

In Spark, shuffle fech failure can be properly handled by resubmiting responding ShuffleMapStage, Celeborn can leverage the same logic.

Goal

Despite of replication, implementing an alternative solution for failure tolerance of shuffle data lost by Spark stage resubmission.

Proposed Solution

Resubmit Spark stage with FetchFailedException

In Spark, DAGScheduler and TaskSetManager has codes to handle task failed with FetchFailedException. TaskSetManager first marks FetchFailed task as successful to skip task re-run and post CompletionEvent. Then DAGScheduler.handleTaskCompletion() matches against TaskEndReason for FetchFailed, finds failedStage and related mapStage to put into failedStages, and clean up mapOutputTracker to determine the maptasks to re-run, and finally resubmit failedStages by DAGScheduler.resubmitFailedStages(). 

As a result, two things needs to be done with Celeborn: wrap CelebornIOException to FetchFailedException in ShuffleReader, and clean up mapOutputTracker on FetchFailedException. But we don’t have reference for mapOutputTracker in LifeCycleManager, and there is no interaction with LifeCycleManager and DAGScheduler for the time being. 

There are two options:

Option1

Modify Spark to abstract new interface for DAGScheduler to call builtin or plugin ShuffleManager to clean up mapOutputTracker, and Celeborn implements the inferface.

Option 2

Modify Celeborn only. Before propagating FetchFailedException to Spark driver, ShuffleReader first sends RPC to LifeCycleManager to mark the shuffle fails with FetchFailure and LifeCycleManager cleans up status on mapOutputTracker.

Considering the effort and long process of negotiation with Spark Community to add the interface and refactor existing code, I prefer to first go with option 2 and discuss option 1 with Spark community.

Reuse remaining shuffle data for better performance

Assuming most of the shuffle data is still there and only a small part is lost, the remaining shuffle data can be resued to improve performance. 

Having some of the partition data are lost, Celeborn doesn’t know which mapper tasks need to be recomputed, unless the mapping of parititionId -> List<mapId> is recorded and reported to LifeCycleManager at committing time. 

It is also doable for LifeCycleManager to collect all remaining shuffle data from allocatedWorkers to find out lost partitions, and sends to ShuffleWriter to write lost partition data, skipping all other rows to save network and Worker disk I/O.

At shuffle read time, LifeCycleManager combines remaining partition data from failed stage and recomputed data from re-run MapTasks, then offers to ShuffleReader.

As discussed above, the process is complex and lots of change is required to in Celeborn. I prefer to recompute all the ShuffleMapTask and rewrite all shuffle data now, the reusing logic will be done in the future.

Map Spark shuffle id to Celeborn shuffle id

Presently, Celeborn can’t distinguish shuffle data from failed Spark stage and re-run Spark stage since all attempts of one Spark stage shares the same shuffle id. This causes issue when re-run Stage writes to Celeborn:

  1. LifecyleManage treats registerShuffle RPC as registered shuffle and answer the old locations directly, skipping RequestSlots from master and reserver slots on Worker
  2. Worker can’t find reserved location and treats Push request from re-run ShuffleMapTask as speculative task, which is not expected

The fix is relative simple, just takes stage retry in account and get a mapping from Spark shuffle id to Celeborn shuffle id:

celeborn shuffleId = spark shuffleId * maxStageAttemptsNum + stageAttemptId

For example, spark shuffle id = 1, and maxStageAttemptsNum = 4 (default value), celeborn shuffle id = 4 for the first stageAttempt (stageAttemptId = 0), and celeborn shuffle id = 5 for the resubmited stageAttempt (stageAttemptId = 1).

Having shuffle id mapping, resubmit stage uses a new celeborn shuffle id to write to Celeborn, which have no relation with previous stage and no speical cleanup is required for shuffle data created by previous stage attempt.

A pair of GetShuffleId/GetShuffleIdResponse RPC is introduced for shuffle writer/reader to register and request Spark/Celeborn shuffle id mapping. On startup of Celeborn shuffle writer, a GetShuffleId message of {spark shuffleId, maxStageAttemptsNum, stageAttemptId} is send to LifeCycleManager, and LifeCycleManager caculates the celeborn shuffleId and store it in a Map if it was not seen before. On startup of Celeborn shuffle reader, a GetShuffleId message is sent to LifeCycleManager that returns the max celeborn shuffle id associated with the spark shuffle id.

Since celeborn shuffleId is increasing with stageAttemptId for the same spark shuffleId, the max associated celeborn id identifies the latest attempt of shuffle data.

Summary

The process can be summarized as follows:

  1. Spark shuffle id is passed to Celeborn Shuffle Handle and assigned as usual
  2. When constructing Celeborn ShuffleWriter, Spark shuffle id, along with maxStageAttemptsNum and stageAttemptId is sent to LifecycleManager to generate celeborn shuffle id, and recorded in shuffleId mapping, then LifecycleManager replies celeborn shuffle id to ShuffleWriter to write shuffle data
  3. When constructing Celeborn ShuffleReader, Spark shuffle id is sent to LifecycleManager to get the max celeborn shuffle id associated with the spark shuffle id to read shuffle data
  4. Celeborn ShuffleClientImpl catches CelebornIOException, send FetchFailure RPC to LifeCycleManager
  5. LifeCycleManager receives FetchFailure RPC, unreigsters all mapOuput from mapOutputTracker for the Spark shuffle id, replies to ShuffleClientImpl
  6. ShuffleClientImpl throws CelebornIOException and Celeborn ShuffleReader catches the Exception and wraps into FetchFailedException and throws
  7. DAGScheduler complete the task with FetchFailed and resubmit all MapTasks from the corresponding ShuffleMapTask with increated StageAttemptId. 


Discarded Solution

Before leveraging GetShuffleId/GetShuffleIdResponse RPC, a simple solution of calculate celeborn shuffle id locally within Celeborn shuffle reader/writer is used. It works with one shuffle data but fails with one Spark stage fetching multiple shuffle data.

Having a Join operator that read shuffle data 0 (spark shuffle id 0) and shuffle data 4 (spark shuffle id 1)  regarding maxStageAttemptsNum=4 and stageAttemptId=0, and part of shuffle data 0 is lost due to disk failure. Celeborn shuffle reader raises FetchFailedException, causes Spark to resubmit stage to compute shuffle data 1. Then Celeborn shuffle reader is re-run with stageAttemptId=1 and calculate the celeborn shuffle id to be shuffle data 0 and shuffle data 5, however, spark doesn’t resubmit stage to recompute shuffle data 5, the job complains can’t fetch shuffle data 5 and fails. As a result, failed shuffle data must be record ed somewhere and LifecycleManager is a good choice

  • No labels