Motivation

The current HistoryServer job history retention mechanism in Flink is limited to a single quantity-based policy, controlled by the configuration parameter: historyserver.archive.retained-jobs

This setting specifies the maximum number of jobs retained per archive directory. Although functional, it does not cover common operational requirements such as time-based retention (e.g., retain only jobs finished within the last X days) or combined policies (e.g., retain jobs no older than 7 days AND no more than 100 jobs).

Operational scenarios often require more flexibility to:

  • Control storage growth in environments with frequent job completions.

  • Comply with data retention regulations that mandate deletion after a certain time.

  • Implement hybrid policies combining time-to-live (TTL) and quantity constraints.

This FLIP proposes composite and configurable job history retention policies in HistoryServer.

Public Interfaces


This will introduce a new configuration parameters.


KeyTypeDefaultDescriptionComments/Note
historyserver.archive.retained-ttlDurationNone

The job archived files would be retained within the TTL (Time to Live) period.

When enabling the related configuration options, we would remove a job archive either when its TTL has passed, or when the retained job count (if configured by 'historyserver.archive.retained-jobs' ) has been reached and the job is the earliest job.


When there are multiple history server instances with different configurations, they are working independently today and may have conflict configs. This is an existing problem.

For the configuration, when there are multiple history servers instances. We have to add the description to clarify the conflicts between instances on enabling the configuration option like following:

If there are multiple HistoryServer instances using the same historyserver.archive.fs.dir directory as the refresh directory, we should enable and configure this feature in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files,
Or we need to keep the value of this configuration consistent across them.

Proposed Changes

Retention Strategy Abstraction

Introduce the new interface to define  whether the  target archive files should be retained.

JobArchivesRetainedStrategy
/** To define the strategy interface to judge whether the file should be retained. */
public interface JobRetainedStrategy {

    /***
     * Judge whether the file should be retained.
     * @param file the target file to judge.
     * @param fileOrderedIndex the specified order index position of the target file,
     * @return The result that indicates whether the file should be retained.
     */
    boolean shouldRetain(FileStatus file, int fileOrderedIndex);
}

Each retention policy implements this interface to decide whether a given job archive should be retained.

Provided Retention Strategies

a. TimeToLiveJobRetainedStrategy

  • Uses ttl threshold from configuration.

  • Retains jobs whose modification time is within the TTL duration.

b. QuantityJobRetainedStrategy

  • Uses quantity threshold from configuration.This strategy would be used to refactor the original logic of retain strategy that configured from (historyserver.archive.retained-jobs).

  • Retains jobs if their index in the ordered archive list is within the threshold.

c. CompositeJobRetainedStrategy

  • Combines multiple strategies with a logic AND operator: Retain only if all strategies return true when the historyserver.archive.retained-ttl and historyserver.archive.retained-jobs are both enabled.

The timing to check whether target files have exceeded the retention thresholds

Just like the implementation in the POC[1], We could follow the process where HistoryServer#start method periodically invokes HistoryServerArchiveFetcher#fetchArchives based on 'historyserver.archive.fs.refresh-interval' to check whether target files should be retained.

Compatibility, Deprecation, and Migration Plan

  • N.A

Test Plan

Add the corresponding test cases.

The POC draft

POC[1]

Rejected Alternatives


Public Interfaces


This will introduce two new configuration parameters to replace and extend the original setting: historyserver.archive.retained-jobs.


historyserver.archive.retained-jobsInteger-1Maximum number of jobs to retain per archive directory. -1 means unlimited. 0 or values less than -1 will throw IllegalConfigurationException.The configuration option will be deprecated after the FLIP.
historyserver.archive.retained-jobs.modeEnum<JobArchivedRetainedMode>None

Retention strategy mode. Possible values:

- None: Retain all jobs.

- Ttl: Retain based on time-to-live

- Quantity: Retain based on job count.

- TtlAndQuantity: Retain only if both TTL and quantity conditions are met.

- TtlOrQuantity:  As the configuration key 'historyserver.archive.retained-jobs.mode' literally suggests,  this policy governs the retention mode for archived historical jobs. 
   When set to 'ttlOrQuantity', a target file will be retained if either of the following conditions is met (in other words, deletion occurs only if both conditions are unsatisfied):

    - The file count is within the maximum retention threshold.

    - The file remains within the TTL (Time to Live) period.


historyserver.archive.retained-jobs.thresholdsMap<String,String>{ "ttl" : null, "quantity" : "-1" }Threshold values for TTL and quantity strategies. ttl accepts duration strings (e.g., "7d", "24h"), quantity accepts integers.

Proposed Changes

Retention Strategy Abstraction

Introduce the new interface to define  whether the  target archive files should be retained.
JobArchivesRetainedStrategy 

1
2
3
4
5
6
7
8
9
10
11

/** To define the strategy interface to judge whether the file should be retained. */
public interface JobArchivesRetainedStrategy {
 
    /***
     * Judge whether the file should be retained.
     * @param file the target file to judge.
     * @param fileOrderedIndex the specified order index position of the target file,
     * @return The result that indicates whether the file should be retained.
     */
    boolean shouldRetain(FileStatus file, int fileOrderedIndex);
}

Each retention policy implements this interface to decide whether a given job archive should be retained.

Provided Retention Strategies

a. TimeToLiveBasedJobRetainedStrategy

  • Uses ttl threshold from configuration.

  • Retains jobs whose modification time is within the TTL duration.

b. QuantityBasedJobRetainedStrategy

  • Uses quantity threshold from configuration.

  • Retains jobs if their index in the ordered archive list is within the threshold.

c. CompositeBasedJobRetainedStrategy

  • Combines multiple strategies with a logic operator:

    • OR: Retain if any strategy returns true.

    • AND: Retain only if all strategies return true.

  • Allows flexible combinations (TTL and quantity).

Retained Mode Configuration

The historyserver.archive.retained-jobs.mode option determines which composite strategy is created:

NoneNo strategies, always retain.
TtlTTL strategy only.
QuantityQuantity strategy only.
TtlAndQuantityComposite with AND.
TtlOrQuantityComposite with OR.


Compatibility, Deprecation, and Migration Plan

  • If historyserver.archive.retained-jobs.mode is not set, the old behavior  (historyserver.archive.retained-jobs) is used.

  • If set, the new composite strategy logic takes effect.

  • Invalid thresholds (e.g., quantity = 0 or < -1, or ttl < 0ms) will trigger IllegalConfigurationException.


References

[1] https://github.com/apache/flink/pull/26902

  • No labels