This page is meant as a template for writing a CIP. To create a CIP choose Tools->Copy on this page and modify with your content and replace the heading with the next CIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the CIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadTBD
Vote threadTBD
JIRATBD
Release<Celeborn Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Unlike PushData, Celeborn won’t actively trigger HARD_SPLIT in PushMergedData unless there are one or more partitions which have been split in the partition group of PushMergedData.


This leads to several problems:

  1. Cascading HARD_SPLIT in PushMergedData will be too wasted because most partitions may not reach the HARD_SPLIT threshold.
  2. Worker pressure cannot be transferred if the partitions won’t be split.
  3. ReverveSize won’t take effect.

To support HARD_SPLIT in PushMergedData, we can simply check partition size in handlePushMergedData of PushDataHandler and return HARD_SPLIT if any of these partitions reaches a threshold. As mentioned before, it will cause unnecessary  HARD_SPLIT. Therefore, we proposed another solution to solve the above problems which will only split the partitions that need to be split.

Public Interfaces

Add PUSH_MERGED_DATA_RESPONSE(23) in Message.Type

Add FILE_ALREADY_CLOSED(50) in StatusCode

Add ByteArrays in org.apache.celeborn.common.network.protocol.Encoders

Add PushMergedDataResponse in org.apache.celeborn.common.network.protocol

public class PushMergedDataResponse extends ResponseMessage {
    public long requestId;
    /**
     * partitionUniqueId of each partition, in the same order as statusCodes
     */
    public final String[] partitionUniqueIds;
    /**
     * only partitions that need to be split or have been closed
     */
    public final byte[] statusCodes;
    ......
}

Add PushMergedDataResponseCallback

public interface PushMergedDataResponseCallback  extends RpcResponseCallback{
    void onSuccess(ByteBuffer response, String[] partitionUniqueIds, byte[] statusCodes);
}


Proposed Changes

  1. PushDataHandler.handlePushMergedData
    1. Use map partitionIdToStatusCode to record the statusCode of the problematic partition. Only when the disk is full or the worker is in shutdown process will the HARD_SPLIT result be directly returned. In other cases, the partition that requires hard split will be recorded in partitionIdToStatusCode and continue processing the next partition. Finally, after all partitions are processed, if there is no failure, the PushMergedDataResponse.partitionUniqueIds and PushMergedDataResponse.statusCodes are filled in with information in partitionIdToStatusCode and returned to the sender. If the original statusCode value is empty, the new statusCode returned is PARTIAL_SUCCESS.
    2. To trigger an active hard split, call the method checkDiskFullAndSplit for each partition in the group before checking whether the file is closed and put the partition information that reaches the HARD_SPLIT threshold into partitionIdToStatusCode. If the disk is full, return a HARD_SPLIT statusCode outside PushMergedDataResponse.statusCodes to sender immediately because in that case, every partition needs to be split in the worker.
    3. When checking whether the file is closed, put the closed partition into partitionIdToStatusCode likewise instead of returning a CelebornIOException which will also cause unnecessary REVIVE.
    4. Before writing the partition file or sending the copy to the replica worker, the partitions that require hard split will be removed first.
    5. If the current worker is consuming a dual-copy primary partition, the union of the primary and replica partitions that need to be hard split is returned to the shuffle client.
  2. TransportResponseHandler.handle
    1. Add a branch to handle message instanceof PushMergedDataResponse
  3. ShuffleClientImpl.doPushMergedData
    1. change wrappedCallback to instance of PushMergedDataResponseCallback
    2. At the beginning of onSuccess, check whether the reason is PARTIAL_SUCCESS or PUSH_DATA_SUCCESS_PRIMARY_CONGESTED or PUSH_DATA_SUCCESS_REPLICA_CONGESTED. If so and statusCodes is not empty, use the partitionUniqueIds passed in the onSuccess method to assemble a new set of batchIds, batches and partitionUniqueIds to represent the partitions that need to be revived, and generate a new groupedBatchId and pass it into submitRetryPushMergedData. Here, HARD_SPLIT and FILE_ALREADY_CLOSED are assembled separately. For FILE_ALREADY_CLOSED, remainReviveTimes needs to be decremented by one.

Compatibility, Deprecation, and Migration Plan

  • The structure of the message body returned by PushMergedData has changed, and Shuffle Client needs to be upgraded.

Test Plan

This CIP will be tested in cluster and UT.

Rejected Alternatives

N/A

  • No labels