Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Task blocking in requestMemory usually holds checkpointLock. If the back pressure is severe, the Task cannot apply for enough memorySegment in a short time, and the Checkpoint will take a long time. In some past JIRAs of Unaligned Checkpoint, the community added recordWriter.isAvaliable() to alleviate the problem of tasks blocking in requestMemory due to insufficient memory segments while processing data. It can effectively reduce the probability of Task being blocked in requestMemory.
As mentioned by FLINK-14396:
As long as there is at-least one available buffer in LocalBufferPool, the RecordWriter is available for network output in most cases. So it can only solve the scenario where only one buffer is needed to process a single record. When the back pressure is severe, if multiple output buffers are required to process a single record, the Task may still be blocked on requestMemory, resulting in Checkpoint not being able to complete quickly. For example:
- Big record which might span multiple buffers
- Flatmap-like operators which might emit multiple records in every process
- Broadcast watermark which might request multiple buffers at a time
In this FLIP, we propose to add the overdraft buffer in order to reduce the probability of Task being blocked in requestMemory when multiple output buffers are required to process a single record.
Overdraft Buffer mechanism: When LocalBufferPool#requestMemory is called and LocalBufferPool is insufficient, LocalBufferPool will allow Task to overdraw some MemorySegments and LocalBufferPool will not be available. The LocalBufferPool cannot become available until all the overdraft buffers are consumed by downstream tasks and the LocalBufferPool has recycled these overdraft buffers.
We will split this JIRA into 3 tasks.
The first task: Ignore max buffers per channel when allocate buffer
The LocalBufferPool will be unavailable when the maxBuffersPerChannel is reached for this channel or availableMemorySegments.isEmpty.
If we request a memory segment from LocalBufferPool and the maxBuffersPerChannel is reached for this channel, we just ignore that and continue to allocate buffer while availableMemorySegments isn't empty in LocalBufferPool.
The second task: Adding the overdraft buffer
Adding the new configuration : taskmanager.network.memory.max-overdraft-buffers-per-gate, the default value is 5.
The LocalBufferPool will be unavailable when the maxBuffersPerChannel is reached for this channel or availableMemorySegments.isEmpty or numberOfRequestedOverdraftMemorySegments > 0.
If we request a memory segment, we try to allocate buffer from availableMemorySegments. If availableMemorySegments.isEmpty numberOfRequestedOverdraftMemorySegments < maxOverdraftBuffersPerGate, we will request the overdraftBuffer from NetworkBufferPool.
For return memory segment, we will return the overdraft first if numberOfRequestedOverdraftMemorySegments > 0.
The third task: Compatible with LegacySource
Since LegacySource does not have checkAvailable, LegacySource will use all overdraft buffers by default, this is not what we expected.
So we'll set overdraft=0 for the SourceStreamTask.
Compatibility, Deprecation, and Migration Plan
- Test for apply for overdraft buffer when overdraft buffer is sufficient
- Test for apply for overdraft buffer when overdraft buffer is insufficient
- Checkpoint Duration Benchmark for enable overdraft buffer
After discussing, we decided to use overdraft-buffer instead of reserve-buffer. For details, please refer to the mail list.