背景
在RSS的使用过程中,存在下面情况:
作业客户端推数据非常快速,给集群Worker带来很高的负载,导致worker 因为磁盘写入速度有限,处理数据不及时,数据堆积在内存,触发worker memory tracker的暂停接收数据行为,这个worker的所有客户端推送数据回应慢,会影响到这个客户端向所有worker 推数据的速度。
在这种情况下就造成下面问题:
- worker 长期处于高内存使用状态,不断触发暂停读取写入数据,导致推送数据超时失败,需要重新revive(不会全部重写分区),增加作业和集群的开销。
- 一个worker 暂停数据接收状态,会影响一片作业和这些作业向对应worker推送数据的速度,导致资源利用率地下,也就是说一个大作业打满一个worker可以导致一片worker性能得不到充分利用
导致上述问题原因主要是三个方面:
- 客户端的限流逻辑是针对当前这个map task向外推数据的所有请求,并没有对不同的target worker进行隔离,当向一个worker 推送数据出问题, 向其他正常worker 推数据的速率也收到影响
- Worker 端限制流量根据内存的使用率来限制, 当总的数据推送流量 > 磁盘写入流量,内存保持高水位,不断触发暂停读取数据的行为,这个暂停时针对所有的client, 而不是个别流量高的client,这就导致一个作业推送流量大,会影响其他这个worker上的作业,扩大影响。
- 客户端推送数据当前的限制方式为限制一个ShuffleClient同时推送数据的并行数量,但是同一个作业并行的ShuffleClient 数量不可控,同一个worke承接多少shuffle的分区的个数不可控,所以并没有针对Worker 接收数据流量整体的一个控制和将这个状况反馈到客户端。
设计目标
- 控制worker 接收的总的推送数据的流量,避免出现MemoryTracker触发暂停接收所有客户端数据的行为
- 控制某个用户在worker上的流量,避免单个用户流量大挤压其他用户的推送速率
- 客户端推送要采取慢启动逻辑,慢慢逼近worker的接收数据能力上线
- 新的ShuffleClient 要能对旧的Worker兼容
实现上述设计目标需要完成下述内容:
- worker服务端实现方案限制总的流量
- 推送数据流量统计,包括整体和用户维度
- 建立数据推送反馈机制,客户端能根据worker端处理完推送数据的反馈调整发送速率
分析
Worker
在之前的压力测试过程中,当worker 端收到量非常多的push data的请求时,在开始接受初期,因为数据可以堆积在disk buffer中,快速接收大量的数据,后因为内存压力上升,部分worker 开触发pause data push, 反馈到作业的每个client上,shuffle client 推出的数据没被处理返回不及时,受到maxReqsInFlight的限制,client 端推数据能力下降,推数据的速率直接受到机器上磁盘写入数据跟的影响(磁盘写入越快,disk buffer释放内存越快,memory tracker能允许接入的数据越多),接收push data请求一直处于低水平,期间disk buffer用量一直较高。
在worker端需要让worker接收数据的流量保持在和磁盘写入一样的水平能达到一个比较合理的状态(目前的硬件速度跟不上软件处理速度),但是如果某个租户的作业流量非常高,会导致其他在这个worker上写数据的用户作业请求得不到及时的处理。在worker 端需要实现下面目标:
- 避免一开始流量直接达到顶峰触发worker 整体层面的trim pause push data,这样会因为某个大作业影响其他的作业的性能, 这里需要客户端启动时采用慢启动的方式启动并缓步逼近流量阈值(阈值取决于机器磁盘的写入性能)。
- 统计总的流量和每个用户的流量,给单个用户的流量做限制,避免单个用户抢占流量资源,影响其他用户。
- 当worker 流量超出阈值时,选这性的(需要细化规则)通知影响大的client端进入拥塞控制阶段,降低发送的速度。
主要目标是: 避免因为某一个用户作业的流量影响了其他用户的作业,推送速度直接影响到shuffle 性能。
ShuffleClient
在之前的代码逻辑中,每一个发送客户端有一个maxReqsInFlight 限制,这个参数限制这个client向所有worker 发送数据, 这会导致两个问题:
- 如果client 向某一个worker 发送数据受阻,想起他worker 发送的消息返回后, inFlight的份额可能会被向阻塞的worker 发送的消息堆满,导致client 没法向其他正常的worker 发送消息。
- pushGrain data不会把消息放入push data queue, 如果遇到阻塞会更加放大影响。
基于上述问题,在这次设计中考虑去除总的maxReqsInFlight限制,client和每个target worker 增加一个限制,避免一个worker的问题影响向其他worker 推送数据。
推送速度测试
测试一个worker 一个map task 运行时,不开始replicate情况下,向worker 推送数据的速度
在这个文档中可以基本得出结论:
在ShuffleClient 一次只能发送一个request的情况下,推送数据的速度大概在4m/s,我们一台worker的性能瓶颈在磁盘写入, 磁盘写入单块ssd盘的速度约为300m/s, 一个worker 会同时接收到几百上千个同一个作业的shuffle client推送数据的请求, 所以单纯的限制客户端的并行推送的速度也无法满足需求,因为需要:
- Worker 端做接收数据的窗口限制,同时限制每个用户的窗口,且可以动态调整
- Client 端限制发送数据的速度,并可以根据反馈动态调整
设计思路
Push Data 传输流程
- ShuffleClient 产生一个pushTask 向worker A 推送数据
- 当向WorkerA推送数据的Sender Buffer 存在空余,将PushData 请求推送到NettyBuffer,随后发送到SocketBuffer, 如果Sender Buffer无空余,进入等待
- PushData 请求由socket 发送到worker端
- Worker 端PushData请求由socket 接收并接收进Netty Buffer内存空间
- PushData handler接收Shuffle Client channel对应的PushData请求
- 并向当前ShuffleClient 对应User A的DiskBuffer Pool 申请Buffer
- 当User A对应DiskBuffer pool 用完时, 向总的DiskBuffer poll 申请buffer
- 如果没有空余的Buffer, 返回读失败,并反馈当前的服务端繁忙。
- 如果客户端收到success状态,客户端缓慢增加Sender Buffer的size,增大客户端流量
- 客户端收到worker端繁忙,进如拥塞控制,降低客户端sender buffer值减少流量并进入重发阶段
Worker Side 多租户限流实现
在现有的逻辑中,Worker端有一个 DiskBuffer queue, 当Worker 处理完 Push Data的请求返回Success 后,数据被放入Disk Buffer的一个Buffer里,当Buffer 满,Buffer 会被生成一个FlushTask, 推给Flusher的task queue中,等待写入数据,数据写入完成后,将buffer 返回给DiskBuffer queue。
TODO: 较小worker的disk buffer 数值, 测试是否会影响到整个的写入速率 yifan.xia@shopee.com
(在验证完disk buffer的大小可以影响写入速度后)
在当前的设计中,将DiskBuffer 拆分成Disk Buffer Queue和用户的Disk Buffer Queue
- 已用户A为例, 当用户的请求进来时,会先向A 用户的Disk Buffer pool 申请buffer, 如果申请成功,则数据直接写入,当buffer 写满User A的写数据拿到的buffer 写满,调用flush() API生成一个Flush Task,进入Flusher处理队列中, 由Flusher将Buffer 写入磁盘,并将buffer 返回给用户A的DiskBuffer Pool
- 当用户A的请求进来时,没法申请到Buffer,这时候去判断A 用户队列的流量是否应该增加, 如果允许增加用户A的流量,总的DiskBuffer 返回一个新的Buffer 给用户A的DiskBuffer queue,数据写入buffer
- 如果总的DiskBuffer pool 也没有新的buffer 返回, 则去判断其他的队列是否存在buffer 数过多,需要降低流量,将buffer 返回给总的DiskBuffer pool
- 如果判断到用户A的流量应该降低,将用户A 的Buffer 返回给总的Buffer Pool
Push data --Take Buffer success
- 已用户A为例, 当用户的请求进来时,会先向A 用户的Disk Buffer pool 申请buffer, 如果申请成功,则数据直接写入
- 当User A的写数据拿到的buffer 写满,调用flush() API生成一个Flush Task,进入Flusher处理队列中, 由Flusher将Buffer 写入磁盘,并将buffer 返回给用户A的DiskBuffer Pool
- 当用户A的请求进来时,没法申请到Buffer,这时候会去判断A 用户队列的流量是否应该增加, 如果允许增加,如果总的DiskBuffer 返回一个新的Buffer 给用户A的DiskBuffer queue,数据写入buffer, 返回Success。
Push data --Take Buffer Failed
- 已用户A为例, 当用户的请求进来时,会先向A 用户的Disk Buffer pool 申请buffer, 如果申请成功,则数据直接写入
- 当User A的写数据拿到的buffer 写满,调用flush() API生成一个Flush Task,进入Flusher处理队列中, 由Flusher将Buffer 写入磁盘,并将buffer 返回给用户A的DiskBuffer Pool
- 当用户A的请求进来时,没法申请到Buffer,这时候回去判断A 用户队列的流量是否应该增加, 如果不允许增加,Push Data 请求返回失败并在客户端重试的应答。
- 当用户A的请求进来时,没法申请到Buffer,这时候回去判断A 用户队列的流量是否应该增加,如果总的DiskBuffer pool 也没有新的buffer 返回, push Data的请求拿不到Buffer,Push Data 请求返回失败并在客户端重试的应答
Client Side Buffer Pool限制客户端发送速度机制
在上一节, client side的buffer pool可以复用当前的旧逻辑即maxReqsInFlight来实现,
下面的逻辑都是针对一个shuffle client 和一个worker 之间推送数据。
- inFlightReqsNum:shuffle client 向一个worker 发送并还没有接收到返回的数据请求个数。
- maxRequestsInFlight:允许的Shuffle client 向一个worker 发送还没接收到返回的最大数据请求个数。
- Shuffle Client 发送数据时会判断当前已经发送并且没有返回的请求个数 inFlightReqsNums是否小于当前允许的这个client 和worker 之前传输数据并行度 maxReqsInFlight
- 如果正在发送的请求数小于最大限制值,则直接发送数据,inFlightReqsNums += 1,当对应Worker 返回response后,inFlightReqsNums -= 1
- 如果正在发送的请求书大于最大限制值,则判断向当前worker发送数据的请求中是否有失败的情况,如果是,抛出exception, 如果不是等待设置的等待间隔时间,进如步骤1的判断
PushData反馈
当前代码push data返回有以下状态:
- Failed: Worker端返回失败报错, 包括
- PUSH_DATA_FAIL_PARTITION_NOT_FOUND
- PUSH_DATA_FAIL_MAIN
- PUSH_DATA_FAIL_SLAVE
- Success: Worker 端返回推送数据成功,成功状态中包含三种,但是均为推送成功
- None (普通成功的返回)
- SOFT_SPLIT
- HARD_SPLIT
在这次的设计中,当数据写入Buffer 失败需要返回重试和限流信号,这次设计中,Push Data的返回状态如下
- Failed: Worker端返回失败报错, 包括
- PUSH_DATA_FAIL_PARTITION_NOT_FOUND
- PUSH_DATA_FAIL_MAIN
- PUSH_DATA_FAIL_SLAVE
- PUSH_DATA_FAIL_MASTER_NO_BUFFER
- PUSH_DATA_FAIL_SLAVE_NO_BUFFER
- Success: Worker 端返回推送数据成功,成功状态中包含三种,但是均为推送成功
- None (普通成功的返回)
- SOFT_SPLIT
- HARD_SPLIT
当Shuffle Client在向worker A推送数据失败并返回失败类型为
- PUSH_DATA_FAIL_MASTER_NO_BUFFER
- PUSH_DATA_FAIL_SLAVE_NO_BUFFER
进入对worker A发送数据拥塞控制
Worker 返回拥塞状态的rule:
FLAG | 什么时候返回 |
PUSH_DATA_FAIL_MASTER_NO_BUFFER | 推送master 数据时,master partition 所在worker 无法提供当前用户的disk buffer返回 |
PUSH_DATA_FAIL_SLAVE_NO_BUFFER | enable replicate 时,推送master 数据时,master partition 所在worker 向slave partition worker推送数据时,无法提供当前用户的disk buffer返回 |
客户端发送重试
当前Shuffle Client 处理Push Data request的response 有以下几种装
在这次的设计中,增加了worker 端无法满足disk bffer的返回状态,当Shuffle Client 接收到这两种状态时, shuffle client 做一下行为:
- 当前的PushData request 作为失败
- Shuffle client进入阻塞状态,即降低对这个worker 发送速度maxReqsInflight的值
- 不用revive直接重新进入 limitMaxReqsInFlight 状态判断并重新发送
实现细节
流量统计节点
流量统计方法
客户端慢启动和拥塞控制
在当前的逻辑中,每一个shuffle map task 的shuffle writer 有一个对象ShuffleClient, 当前的ShuffleClient有参数 spark.rss.push.data.maxReqsInFlight 来控制一个map task中ShuffleClient能同时推数据的并行度。这个限制时限制的客户端维度,处理的瓶颈在Worker端,这个限制无法限制某个worker接收到的push data的流量。
并且当前代码中方案的一个弊端时在最开始阶段,客户端推送的量就达到最大,所以在本次设计中给客户端增加慢启动和拥塞控制过程, 这个过程针对的时每一个worker。
概念:
- RTT(Round Trip Time): 在RSS中相当于一个推送数据请求到请求返回的时间
- currentMaxReqsInFlight: 表示shuffle client 像某一个worker 推送数据的最大请求并行度
- reqsInFlightBlockThreshold: 预设置的一个阈值,当shuffle client向某个worker 推送数据的currentMaxReqsInFlight 值达到 reqsInFlightBlockThreshold时,进入拥塞避免阶段,减缓currentMaxReqsInFlight 的增加速度。
- 慢启动:慢启动也叫做指数增长期。慢启动是指每次RSS push 数据请求接收窗口收到确认时都会增长。增加的大小就是已确认段的数目。这种情况一直保持到要么没有收到一些段,要么窗口大小到达预先定义的阈值。
- 拥塞避免:当流量以指数增长达到阈值的时候,降低增长速率,以线性方式逼近性能顶峰。
- 拥塞控制:当服务端返回拥塞信号时,降低客户端的发送速率。
慢启动 + 拥塞控制 + 快速重传参考 TCP Reno 版本
- 设置发送进入阻塞的最大阈值, reqsInFlightBlockThreshold (per worker)
- 初始设置发送阈值 currentMaxReqsInFlight = 1 (per worker)
- 当push data response 接收到 FAILED 状态, 不调整 currentMaxReqsInFlight
- 当push data response 返SUCCESS & FREE 状态, currentMaxReqsInFlight += 1,直到达到阻塞阈值 reqsInFlightBlockThreshold, 将不再增加。(这里相当于每个RTT 周期 currentMaxReqsInFlight = currentMaxReqsInFlight * 2,慢启动阶段 ), 如果currentMaxReqsInFlight 超出最大阻塞阈值reqsInFlightBlockThreshold, 采用每个RTT +1 递增(拥塞避免)
- 当push data response返回SUCCESS&CONGESTION, maxReqsInFlight 设置为currentMaxReqsInFlight 的一半,并且让为currentMaxReqsInFlight 从maxReqsInFlight开始进入拥塞控制
- 在进入快速启动阶段后,因reqsInFlightBlockThreshold = currentMaxReqsInFlight, 这个阶段进入拥塞控制阶段, 采用每个RTT +1的策略避免拥塞, 直到再次进入拥塞控制阶段
Worker端用户流量配额的动态调整*:
流量统计方式 [Checking which method is better]
quota.window.num * quota.window.size.seconds。
Kafka:
org.apache.kafka.common.metrics.stats.Rate 计算速率
Codahale
SlidingTimeWindowReservoir (效果不好)
自己开发:
目标:
- 统计准确及时
- 不要影响性能
开发计划
附录
HDFS
DataNode写入限流
DataNode上,通过dfs.datanode.data.write.bandwidthPerSec等参数限制了节点的最大数据写速度,没有提供多租户用户隔离选型。
NN FairCallQueue
NameNode上,对接收到的RPC请求按一定策略调度执行(org.apache.hadoop.ipc.FairCallQueue, org.apache.hadoop.ipc.DecayRpcScheduler,WeightedRoundRobinMultiplexer),公平原则,不会因为异常用户阻塞正常用户的请求。同时还支持backoff.enable,可以在RPC堆积的时候指示客户端退让,等待一定时间后再发请求。
同时, 还可以在NameNode上对路径配置Quota限制文件数和文件占用的存储空间大小,相关限制会在NameNode上执行创建文件,创建目录,申请block等rpc操作是被校验。
Router上,通过dfs.federation.router.client.thread-size限制每个后台NameNode的线程池的queue大小,拒绝多余的请求,也没有多租户相关限制。
总之,HDFS的多租户管理主要在NameNode上,可以限制存储的数据文件数和数据量,同时通过NameNode的Fair Call Queue实现对请求的公平处理。由于DataNode的所有操作都要先有一个相关的NameNode操作,所以理论上也能在整体上保证DataNode的操作公平。
Kafka
可以通过设置Topic的log.retention.bytes限制保存的数据量大小,partition级别。
可以根据user, client id维度的组合,设置组合在单机上的最大数据生产和消费速度,不管组合在集群不同机器上是否访问均匀。
kafka.server.ClientRequestQuotaManager#maybeRecordAndGetThrottleTimeMs()