Skip to end of metadata
Go to start of metadata


Current state: "Completed"

Discussion thread:


Released: -


The current architecture around the BLOB server and cache components seems rather patched up and has some issues regarding concurrency ([FLINK-6380]), cleanup, API inconsistencies / currently unused API ([FLINK-6329], [FLINK-6008]). These make future integration with FLIP-6 or extensions like offloading oversized RPC messages ([FLINK-6046]) difficult. We therefore propose an improvement on the current architecture as described below which tackles these issues, provides some cleanup, and enables further BLOB server use cases.


Public Interfaces

The proposed changes mainly affect the back-end and are not user-facing.
Currently, we also do not plan any changes to the configuration or the monitoring information, except for:

  • "library-cache-manager.cleanup.interval": replace by to "blob.retention.interval" and reduce its default value (currently ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = 1h) to half of the current value so that BLOBs will be deleted at roughly the same expected delay as now (see "BLOB Lifecycle - Staged Cleanup" below)

Proposed Changes

Add Checksum Verifications

Since we already have the checksums in the CONTENT_ADDRESSABLE blobs, we might as well just verify them when reading/copying the files, i.e. upon download or at first use with an existing file that does not require the download.


Support for name-addressable blobs was implemented but never used so far and thus had some shortcoming and issues, especially for cleanup (some presented in [FLINK-6008]).
This is dead code and we don't actually need them and should remove it.

Supported BLOB File Types

We design the BLOB storage with the following use cases for BLOB files in mind:

  • storing jar files, i.e. libraries for use by the user-ClassLoader
  • offloading (large) RPC messages: for (a) large RPC messages that do not fit into the akka.framesize, (b) distributing a single (large) RPC message, e.g. the TaskDeploymentDescriptor, to multiple recipients by leveraging any underlying distributed file system in HA mode, (c) the ability to re-use a (large) cached RPC message during re-deployment after failures.
  • exchange of task manager log files to shown them in the Web-UI

The maximum life span of any of these types is limited by the life span of the job it belongs to. Files will be ref-counted so that shorter life spans are possible, e.g. for the latter two types.

Move BLOB Ref-counting & Cleanup from the LibraryCacheManager to the BlobCache

Ref-counting and cleanup are tasks generic to any BLOB file in the system and should therefore be handled more generically in the BlobCache and BlobServer.

BLOB Store High-Level Components



  • offers file upload and download facilities based on jobId and BlobKey
  • local store (file system): read/write access, using "<path>/<jobId>/<BlobKey>"
  • HA store: read/write access for high availability, using "<path>/<jobId>/<BlobKey>"
  • responsible for cleanup of local and HA storage
  • upload to local store, then to HA (possibly in parallel, but waiting for both to finish before acknowledging)
  • downloads will be served from local storage only
  • on recovery (HA): download needed files from HA to local store, take cleanup responsibility for all other files on the path, i.e. orphaned files, too! (see below)


  • offers transparent local cache of BlobServer files (based on jobId and BlobKey)
  • local store (file system): read/write access, using "<path>/<jobId>/<BlobKey>"
  • HA store: read access only (if available)
  • download either from HA store or BlobServer
  • responsible for cleanup of local storage


  • offers interface to upload and download files
  • communicates with BlobServer


  • bridge between a task's classloader and cached library BLOBs, i.e. jar files, at the BlobCache

BLOB Lifecycle

The BlobCache uses ref-counting of all of its BLOBs and starts deleting unreferenced files after "blob.retention.interval" seconds. The <jobId> subdirectory is also ref-counted along with each of its BLOBs and will be deleted similarly. Note that the BlobCache currently does not have the information that a job entered a final state and thus has to rely on reference counting only for the removal of a job's BLOBs. We may cleanup any remaining files in the <jobId> subdirectory when the TaskManager shuts down.

The BlobServer will use the same ref-counting technique and will delete files from both local and HA storage appropriately. Additionally, it will have a safety-net by using the knowledge of when a job is entering a final state. In that case, the job's BLOB storage directory (local and HA) will be deleted, i.e. "<path>/<jobId>", including all its BLOB files.

Staged Cleanup

Up to Flink 1.3, at the LibraryCacheManager, we run a periodic cleanup task every hour (see ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL or the "library-cache-manager.cleanup.interval" configuration parameter) which deletes any unreferenced jar files. If a job's task fails right before the cleanup starts, a following recovery could thus not access the cached file anymore. We'd like to change this with a "staged cleanup".

In a staged cleanup, a BLOB file is only deleted at the second time the (periodic) cleanup task encounters this BLOB as being unreferenced, e.g. by having two cleanup lists: one for the actual cleanup, i.e. "delete now", one for staging files, i.e. "delete next time". Each time the cleanup task runs, the actual cleanup list (and files) will be deleted and the staging list becomes the actual list. This is cheaper than having a time-to-live for each unreferenced jar and good enough for the cleanup task at hand. A (currently) unreferenced BLOB will thus stay around at least "blob.retention.interval" seconds and at most twice this amount. As long as it stays, it may be re-used by future (recovery) tasks and does not need to be downloaded again.

Note: since the <jobId> directory is also ref-counted and it is generally cheaper (especially in HA file systems) to delete whole directories rather than each single file, we will adapt the final delete process by sweeping over the list of files/directories to delete first and issuing a directory-delete only in case it is found.


  • all blobs are ref-counted, starting from first BLOB retrieval/delivery
  • the job-specific BLOB sub-directory is also ref-counted with each job-related file
  • if reference = 0 the BLOB enters the staged cleanup (see above)
  • if a task succeeds, fails or is cancelled, all its BLOBs' references are counted down appropriately (if possible)
  • all blobs should be deleted when the BlobCache shuts down, i.e. the TaskManager exits

Note that several tasks running on the same TaskManager may use BLOB files of the same job!


All unused BLOB files stored at the BlobServer should also be periodically cleaned up and not just when the BlobServer shuts down (as of Flink 1.3).

  • all blobs are ref-counted, starting from the initial upload
  • the job-specific BLOB sub-directory ("<path>/<jobId>") is not ref-counted (it may be, but this is not necessary here)
  • two types of BLOB lifecycle guarantees: HA (retain for recovery) and non-HA (re-creatable files - not necessary for recovery)
  • if a job fails, all non-HA files' refCounts are reset to 0; all HA files' refCounts remain and will not be increased again on recovery
  • if a job enters a final state, i.e. finished or cancelled, the job-specific BLOB subdirectory ("<path>/<jobId>") and all its BLOBs are deleted immediately and are removed from ref-counting (despite their actual ref-count!)
  • if reference = 0 the BLOB enters the staged cleanup (see above)
  • all blobs should be deleted when the BlobServer exits

Note that jar files are mostly deleted along with the job-specific BLOB directory while short-lived BLOBs like RPC messages or logs are deleted mostly based on ref-counting. The job-directory delete acts as a safety-net for them in case the ref-counting is wrong.

BlobCache Download

When a file for a given jobId and BlobKey is requested, the BlobCache will first try to serve it from its local store (after a successful checksum verification). If it is not available there or the checksum does not match, the file will be copied from the HA store to the local store if available. If this does not work or is not available, a fallback direct download from the BlobServer to the local store via a connection established and managed by the BlobClient is being used. During the transfer, these files will be put into a temporary directory and only submitted to the job-specific path when completely transferred and checksum-verified. This may invoke multiple (concurrent) downloads for the same file but ensures that while serving BLOBs, no incomplete file is being used. We may prevent such multiple downloads by the BlobCache as an optimisation.

BlobServer Upload

While user jars are being uploaded, the corresponding job is not submitted yet and we cannot bind the jar files to a non-existing job's lifecycle. Also, we cannot upload each file at once and use a ref-counter of 0 each or some files may already be deleted when the job is started. Instead, we will upload all jar files together and only after receiving the last one, we will put all of them into the staging list of the staged cleanup. The job then needs to be submitted within "blob.retention.interval" seconds or we cannot make any guarantees that the jar files still exist. This ensures a proper cleanup during client aborts/crashes between the upload and the job submission.

Files are first uploaded to the local store and then transferred to the HA store. The latter may be optimised to run in parallel but we may only acknowledge the upload once both are written (if HA is configured).

BlobServer Download

Similarly to the BlobCache, we will first try to serve a file from local store (checksum-verified) and, if it does not exist, create a local copy from the HA store (if available) - see above.

BlobServer Recovery (HA)

During recovery, the JobManager (or the Dispatcher for FLIP-6) will:

  • fetch all jobs to recover
  • download their BLOBs lazily and increase reference counts appropriately (at the JobManager only after successful job submission)
  • put any other, i.e. orphaned, file in the configured storage path into staged cleanup

Use Cases Details

Jar files

User-code jar files are uploaded before submitting a job by the job submission client. After successfully uploading all jars, the job is submitted and the JobManager/Dispatcher will increase the reference count at the BlobServer by 1. It will be decreased when the job enters a final state in which case the <jobId> directory will be deleted anyway. The BlobCache only needs to reference-count the jars in its local store, no further interaction is needed.

RPC Messages

An RPC message may be off-loaded into the BlobServer during job submission or at any time in a job's lifecycle. It may be sent to multiple receivers. In contrast to jar files, we expect messages to be re-creatable, i.e. in case of a recovery, we do not necessarily need these BLOBs to be available and effectively only use the HA store for leveraging its distributed file system. We therefore use the BlobServers's non-HA lifecycle guarantee for these.

For a message within a job's lifecycle, we want to be able to delete (temporary) ones when all receivers have successfully downloaded the message. Therefore, during message upload, an offloading-capable RpcService will set the reference counter to the number of receivers. Upon successful download and deserialisation of the message on the receiver's side, it will not only reduce the BlobCache's refCount but also acknowledge towards an RpcService at the BlobServer that the message has been received. This will reduce the BlobServer's refCount and eventually lead to the message being deleted. If the job fails, all reference counts are re-set to 0 and these files are thus subject to staged cleanup.

Special handling is required for an off-loaded job submission message: If we set the refCount to 1 immediately, we would not have a safety net if the job was never submitted. Therefore, we will use the same technique as for the jar files and upload with an initial refCount of 0 so that if the job submission RPC message itself (pointing to the BLOB) arrived within "blob.retention.interval" seconds we could guarantee that the BLOB still exists.

Log Files

Log files are currently only used by the Web-UI to show TaskManager logs. They are downloaded upon request and served afterwards. Each download should decrease the previous log's refCount by 1 and increase the new log's refCount by 1. Logs have non-HA lifecycle guarantees and may even be deleted immediately instead of putting them into staged cleanup.

As an optimisation, instead of transmitting the same log parts over and over again, we may support uploading log file partitions, i.e. byte xxxx-yyyy, as BLOBs and use them in the WebUI. This is something agnostic to the BLOB store however and is supported by the architecture above.

Compatibility, Deprecation, and Migration Plan

The impact on users will be minimal except the (currently undocumented) configuration parameter that we'd like to change. We will keep the old one and mark it deprecated. Other than that, the user may notice a properly working cleanup with this implementation.

Test Plan

We will adapt the current unit tests to any new API to verify that the behaviour of the BLOB storage components did not change. Also, proper unit tests for the different BLOB types will be added. These additions should already cover more than before and since this is close to the back-end, further (integration) tests will indirectly verify that the changes work as expected.

Rejected Alternatives

Flat file system hierarchy

If instead of using "<path>/<jobId>/<BlobKey>", we would use "<path>/<BlobKey>" we may easily share common BLOBs among jobs but, on the BlobServer, will loose a safety net when the job enters a final state. In that case, if the ref-counting is wrong, we could not delete all BLOBs for the job or even decrement the references appropriately.

BLOB-Sharing among different Jobs

The usecase of several jobs sharing the same jar file(s) and an optimisation using only a single manifestation of this file may be covered at a later point in time since we do want to keep it simple this time and develop a proper cleanup story instead which is more important.

  • No labels