Status

State

Completed

Discussion Threadhttps://lists.apache.org/thread/tc37cgrx2tojv3zgzokz06f5ypk0y2hg
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Motivation

To run airflow tasks, airflow needs to parse the dag file twice, once in the airflow run local process, once in airflow run raw (It is mitigated partially when there is no `run_as_user` specified). This is a waste of memory, doubling the memory. During the peak hour, the CPU can spike for a long time due to double parsing, thus slowing down the task starting time , even causing dag parsing to timeout. This is especially true when there are complex large dags in the airflow cluster.

By removing the double parsing, it can save the memory required to run the task and reduce the task bootstrap time.

What change do you propose to make?

Relying on the deserialized DAG in the `airflow run local` process to do deps check.

In utils/cli.py, add another method: 


def get_serialized_dag(dag_id, task_id):

    dag_model = SerializedDagModel.get(dag_id)

    if dag_model is None:

        raise AirflowException("Serialized dag_id could not be found: {}".format(dag_id))

    return dag_model.get_dag_with_task_id(dag_id, task_id)


The `get_dag_with_task_id` takes dag_id and task_id, which deserializes json data from the  serialized_dag in a streaming way to the dag with only one task with the task_id. (this saves lots of memory).


Callbacks Discussion

This requires moving the callback execution from `airflow run  local` back to `airflow run raw` since callbacks require the python object from the parsed DAG.

Since the process owner of the `airflow run local` and `airflow run raw` can be different and callbacks are meant to run with the same user as the `airflow run raw`.  It might be better to run callbacks in the `airflow run raw` process.

Houqp ( sorry i don't know your username in confluence) https://github.com/apache/airflow/pull/10917#issuecomment-694567832 Let me know your thoughts. Having the callbacks in the `airflow run raw` will make the `airflow run local` process simpler and more lightweight. 


For the case where `airflow run raw` is force-killed, we can leverage the CallbackManager mentioned in AIP-43 DAG Processor separation. The `airflow run local` process creates an entry in the `callbacks` table and the CallbackManager decides where/how to run them, Or we delegate it to the scheduler to mark it as zombie and create/run callbacks.

Options:

We can use feature flag to control this behavior Or make it the default behavior.

What defines this AIP as "done"?

airflow run --local process does not parse the dag file.


11 Comments

  1. Changed it to 45 (smile). The 43/44 are under AIP-1 "umbrella"  security and I also move that one there.

  2. I realy like that proposal. It has some backwards compatibility implications - so we need to get opinion of other maintainers on how we should approach it IMHO (Kaxil Naik Kamil Bregula Ash Berlin-Taylor Ephraim Anierobi - I think you are the ones who know most about it or were involved recently).

    As I see it - It changes the definition of "raw" task to include callbacks. I think it makes sense and is rather consistent with the impersonation idea, because callbacks **should** be run with the same impersonation as the tasks .
    The fact that they are not currently is IMHO a slight violation of "impersonation" approach. 

    Let me rephrase how I understand what happens now and how we are planning to change it:

    This really only affects case where we have default impersonation enabled. This is the case where all tasks of airflow are executed as a separate user (say "nobody").

    Current state:

    1) Local Tasks Job starts, changes the state of task to RUNNING, then executes "RAW"  task of airflow using "nobody" user and monitors it. When the task finishes, it changes the state of the task to SUCCESS/FAILURE (depending on the state) and runs appropriate success/failure callbacks.
    2) The only reason why Local Task Job needs to parse the Task from file is because it needs to run callbacks. All the other information needed to run the "raw" task is available in serialized dag table.
    3) Callbacks are executed as "airflow user" (inside LocalTaskJob).

    We can save a lot of CPU when impersonation is involved. Currently when the DAGs are "huge", every impersonated task - no matter how small - has to parse the whole DAG TWICE now - once for LocalTaskJob and once for "raw task" (since it is run with "sudo" and different user, python interpreter has to be started and the DAG has to be parsed again).

    Desired state - we want to move callbacks to inside the "raw" task, to achieve two things: a) LocalTaskJob does not need to read and parse DAG (only reads serialized form from the DB) b) callbacks are executed as "nobody" user:

    1) Local Tasks Job starts, changes the state of task to RUNNING, then executes "RAW"  task of airflow using "nobody" user and monitors it. The "raw" task runs appropriate success/failure callbacks. Then Local Task Job changes state as appropriate to SUCCESS/FAILURE
    2) Local Task Job does not need to parse the DAGs at all - it can just read the serialized form. This is a huge saving for CPU when the tasks are run with impersonation.
    3) When the local task job detects that "raw task" failed "forcefully" (not sure if we can do it easily?) we want the Local Task Job write the callback entry and let the DagProcessor to execute the callbacks. What is important here - we also want in this case to run DagProcessor's parsing/callback process to run as the impersonated user ("nobody") but I think this is reasonable expectation and rather consistent approach - if we want to set ""default" impersonation, it should be set for all 'user code' for consistency: task execution, callbacks, but also DAG parsing. 

    The implication of that change is that there will be different users involved in impersonation case for running the callbacks. Currently all callbacks in "impersonation" case are run as "airflow" user. Where after the change they will be run as "nobody" user. It is definitely a breaking change, but I believe we should treat it as "bug-fix". I consider it a bug  in the current behaviour where callbacks in impersonation case are run as "airflow" user.

    WDYT everyone?  

    1. Thanks Jarek Potiuk ?

      The callbacks in the airflow task execution are used to run in the airflow run --raw process under the "nobody" user. It was changed in this pr https://github.com/apache/airflow/pull/10917#issuecomment-694567832, so i would love to have Houpq's opinion as well. (Jarek Potiuk do you know his Confluence ID?)


      In general, we might need another AIP or separated work to unify callback logic, including callback types, where to run callbacks, the callbacks  run as user, etc. And mark this AIP as blocked by the callback unification work. WDYT. Jarek Potiuk 



      1. Thanks Jarek Potiuk for the ping (smile)

        Ping Zhang the design change looks good to me, we should be good as long as the callbacks are invoked from an external process.

        1. QP Hou 'external process', do you mean the airflow run --local  process or the airflow run --raw  process?  thanks. and we will make sure there is no race condition happening so that no callbacks are run twice

          1. By external process, I meant a different process other than the one that's executing the `._run_raw_task` method:  https://github.com/apache/airflow/blob/d353f023ff8856c00b9f054526cb2e40ff0116ae/airflow/models/taskinstance.py#L1606.

  3. This isn't security is it – I thought this was just performance related?

  4. Very much so. Moved it up.

  5. Ping Zhang This API is done with that one PR, right? If So I'll mark it as Completed and in 2.4