LocalExchanger由BE遵循一系列规则完成规划。
LocalExchanger类型
NOOP:不使用LocalExchange
HASH_SHUFFLE:使用hash算法打散数据
BUCKET_HASH_SHUFFLE:使用hash算法打散数据,并根据tablet在instance中的分布去分散数据
PASSTHROUGH:把数据以block为单位打散到不同pipeline task中
ADAPTIVE_PASSTHROUGH:通过自适应的方式把数据均匀打散到不同的pipeline task中
BROADCAST:把所有数据广播到所有下游pipeline task中
PASS_TO_ONE:把数据发送到下游pipeline task中的任意一个
通用规则
通用规则也是优先级最高的规则
如果前置算子是exchange / scan (按存储层分布读数据),并且当前规划出LocalExchanger类型为 HASH_SHUFFLE/BUCKET_HASH_SHUFFLE,则使用NOOP。(因为存储层或者shuffle已经完成了exchange的操作)
如果前置算子已经做过local shuffle并且输入数据的DataDistribution已经满足当前算子的需求,则使用NOOP
如果前置算子是scan(不按存储层分布读数据),则需要立马使用一个PASSTHROUGH的LocalExchanger将数据打散
举例:
Exchange(HASH_SHUFFLE) - HashJoinProbe(非broadcast)。这是一个典型的shuffle hash join,根据HashJoin规则5,当前HashJoinProbe规划出来的LocalExchanger类型本来应该是SHUFFLE,但是因为exchange算子已经对数据进行了hash分区,所以根据通用规则1,HashJoinProbe前面不需要再规划LocalExchanger。
LocalExchange(SHUFFLE) - HashJoinProbe 1(非broadcast) - HashJoinProbe 2(非broadcast)。根据HashJoin规则5,HashJoinProbe1和HashJoinProbe2都应该规划出来类型为SHUFFLE的LocalExchanger,但是因为HashJoinProbe 1规划出来的LocalExchanger已经让数据满足了分布要求,所以根据通用规则3,所以HashJoinProbe 2前面不需要再规划LocalExchanger。
scan(无数据分布)- Exchange Sink。应用通用规则3,需要拆分成scan(无数据分布)- local exchange sink 和 local exchange source- Exchange Sink两条pipeline
operator定制化规则
HashJoin规则
规则序号 | join类型 | Build side | Probe side | |
1 | BROADCAST | PASS_TO_ONE | PASSTHROUGH | |
2 | NULL_AWARE_LEFT_ANTI_JOIN | NOOP | NOOP | |
3 | BUCKET_SHUFFLE_JOIN | BUCKET_HASH_SHUFFLE | BUCKET_HASH_SHUFFLE | |
4 | COLOCATE_JOIN | BUCKET_HASH_SHUFFLE | BUCKET_HASH_SHUFFLE | |
5 | 其他 | HASH_SHUFFLE | HASH_SHUFFLE |
Agg规则
规则序号 | 类型 | exchange类型 | ||
1 | Colocate agg | BUCKET_HASH_SHUFFLE | ||
2 | STREAMING AGG | NOOP | ||
3 | DISTINCT STREAMING AGG | HASH_SHUFFLE | ||
4 | Blocking AGG | 没有groupby key | PASSTHROUGH | |
5 | 有groupby key | HASH_SHUFFLE |
Analytic规则
规则序号 | 类型 | exchange类型 | ||
1 | 无partition by | PASSTHROUGH | ||
2 | 无排序,有partition by | colocate | BUCKET_HASH_SHUFFLE | |
不是colocate | HASH_SHUFFLE | |||
3 | 其他 | NOOP |
Sort规则
规则序号 | 类型 | exchange类型 | ||
1 | Partial sort(shuffle后还需要merge exchange归并) | PASSTHROUGH | ||
2 | 其他 | NOOP |
Set operation规则
规则序号 | 类型 | exchange类型 | ||
1 | colocate | BUCKET_HASH_SHUFFLE | ||
2 | 其他 | HASH_SHUFFLE |
NL join规则
规则序号 | join类型 | Build side | Probe side | |
2 | NULL_AWARE_LEFT_ANTI_JOIN | NOOP | NOOP | |
5 | 其他 | BROADCAST | ADAPTIVE_PASSTHROUGH |
Assert rows num规则
PASSTHROUGH