This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This proposal's aim is to support Conditions in the Flink Deployment CR status. Currently Flink Operator doesn’t provide Conditions in the status of Flink Deployment CR. So tools or other controllers can’t  gather summary information about the Flink Deployment.   

For example,  in Openshift UI currently the Status field is populated as blank for Flink Deployment as conditions are missing. 

As per https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties , it is nice to have Conditions in the status of CR,  as Conditions can provide more detailed information about the observed status of the resource , and thereby allows tools or other controllers to gather summary information about the resources. 

With the proposed change of providing Conditions in Flink Deployment CR , Status field will have Status of single Flink Job running in the cluster for application mode , and for Session mode Status field will have the status of Runtime, thereby providing summary status of FlinkDeployment. 

Public Interfaces


n/a not a java change

Proposed Changes

Simple single Condition called "Running" for each mode:

  • Application Mode:  Condition Type “Running” , with status flag as “True” or “False” which gives information of Job state.
  • Session Mode:  Condition Type “Running” , with status flag as “True” or “False” which gives information of Job manager.

 Application Mode 

In application mode FlinkDeployment, status conditions will be populated with status of Job running in the cluster. Below is the Job state transition diagram where each Job state will have associated conditions. 

We will have one condition type “Running” in application mode , and the condition status flag will be set as  “True” only for the Job state “RUNNING”. For the rest of the Flink Job state , the condition status flag will be set as  “False” . 



As mentioned in the above diagram, when the Job state changes from one state to another state  , there will be only one condition for the Flink Deployment in application mode, corresponding to the respective current state of Flink Job.

For example, once applied Flink deployment in application mode, and Job is in running state (RECONCILING > CREATED > RUNNING) , CR will have below condition as 

apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

 name: sample

spec:

 ...

status:

  clusterInfo: {}

  conditions:

    - lastTransitionTime: '2024-08-14T06:14:13Z'

      message: The job is currently reconciling and waits for task execution report to recover state

      reason: Job is currently reconciling

      status: 'False'

      type: Running

    - lastTransitionTime: '2024-08-14T06:14:29Z'

      message: Job is created

      reason: Job is newly created, no task has started to run

      status: 'False'

      type: Running

    - lastTransitionTime: '2024-08-14T06:15:17Z'

      message: Job is running

      reason: Job is running

      status: 'True'

      type: Running

  ...


In the above scenario, as the condition status flag is True,  Openshift UI will have Status field as below 


At any time, if the Job has been canceled , CR will have below status 

apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

 name: sample

spec:

 ...

status:

  clusterInfo: {}

   conditions:

    - lastTransitionTime: '2024-08-14T06:14:13Z'

      message: The job is currently reconciling and waits for task execution report to recover state

      reason: Job is currently reconciling

      status: 'False'

      type: Running

    - lastTransitionTime: '2024-08-14T06:14:29Z'

      message: Job is created

      reason: Job is newly created, no task has started to run

      status: 'False'

      type: Running

    - lastTransitionTime: '2024-08-14T06:15:17Z'

      message: Job is running

      reason: Job is running

      status: ‘False’

      type: Running 

   - lastTransitionTime: '2024-08-14T06:33:04Z'

      message: Job has been canceled

      reason: Job has been canceled

      status: 'False'

      type: Running

  ...

    

          For the successfully completed  Job , CR will have below status. 

apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

 name: sample

spec:

 ...

status:

  clusterInfo: {}

  conditions:

    - type: Running

      status: "False"

      reason: Job's tasks have successfully finished

      message: "Job successfully finished"

      lastTransitionTime:

  ...


Session Mode 

In session mode FlinkDeployment, status conditions will be populated with status of Job manager. Below is the Job manager  state transition diagram where each state will have associated conditions.

 

In session mode deployment, we will have one condition type “Running” , and the status flag is set as “True” only for the Job manager state “READY”, and  for the rest of the state, the status flag is set as “False”. 

For example, CR will have below conditions when the Job manager is in READY state. 

apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

 name: sample

spec:

 ...

status:

  clusterInfo: {}

  conditions:

    - lastTransitionTime: '2024-08-14T06:02:45Z'

      message: 'JobManager deployment not found '

      reason: 'JobManager deployment not found '

      status: 'False'

      type: Running

    - lastTransitionTime: '2024-08-14T06:05:05Z'

      message: JobManager process is starting up

      reason: JobManager process is starting up

      status: 'False'

      type: Running

    - lastTransitionTime: '2024-08-14T06:05:15Z'

      message: JobManager is running but not ready yet to receive REST API calls

      reason: JobManager is running but not ready yet to receive REST API calls

      status: 'False'

      type: Running

    - lastTransitionTime: '2024-08-14T06:13:56Z'

      message: JobManager is running and ready to receive REST API call

      reason: JobManager is running and ready to receive REST API call

      status: 'True'

      type: Running  

  ...


As the status flag is “True”, Openshift UI will have the Status field populated as below.

At any point, if the Job manager is failed and rest service is not available ,CR will have below condition

apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

 name: sample

spec:

 ...

status:

  clusterInfo: {}

  conditions:

    - type: Running

      status: "False"

      reason: JobManager deployment failed

      message: Deployment in terminal error, requires spec change for reconciliation to continue

      lastTransitionTime: 

  ...


 To provide status conditions in the Flink Deployment, we will have new Utility class ConditionUtils, that builds various io.fabric8.kubernetes.api.model.Condition below.


public class ConditionUtils {

    public static Condition runningTrue(final String message, final String reason) {

        return crCondition("Running", "True", message, reason);

    }

    public static Condition runningFalse(final String message, final String reason) {

        return crCondition("Running", "False", message, reason);

    }

    public static Condition crCondition(

            final String type, final String status, final String message, final String reason) {

        return new ConditionBuilder()

                .withType(type)

                .withStatus(status)

                .withMessage(message)

                .withReason(reason)

                .withLastTransitionTime(

                        new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date()))

                .build();

    }

}



In the FlinkDeploymentStatus class, declare the Condition object to populate the condition in the CR. Based on the Job state for application mode or Job manager deployment status for Session mode, build the respective Condition as mentioned in the above diagram  by calling the Condition builder from ConditionUtils class. 


/** Condition of the CR . */

private Condition condition = new Condition();






Compatibility, Deprecation, and Migration Plan

  • The changes proposed in this document  are backward compatible.

Test Plan

Existing UT/IT will be extended to cover the proposed changes.

Rejected Alternatives

  • considered using multiple conditions.