Status


State

Abandonded in favour of new https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-5+Remote+DAG+Fetcher

Discussion Thread


JIRA

AIRFLOW-2221 - Getting issue details... STATUS

Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Motivation

Currently Airflow requires DAG files to be present on a file system that is accessible to the scheduler, webserver, and workers. Given that more and more people are running airflow in a distributed setup to achieve higher scalability, it becomes more and more difficult to guarantee a file system that is accessible and synchronized amongst services. By allowing Airflow to fetch DAG files from a remote source outside the file system local to the service, this grant a much greater flexibility, eases implementation, and standardizes ways to sync remote sources of DAGs with Airflow.


Proposed Solutions

Option 1: DAG Repository (short term)

DAGs are persisted in remote filesystem-like storage and Airflow need to know where to find them. DAG repository is introduced to record the remote root directory of DAG files. Prior to DAG loading, Airflow would download files from the remote DAG repository and cache it under the local filesystem directory under $AIRFLOW_HOME/dags.

DAG Repository

We will create a file, remote_repositories.json, to record the root directory of DAGs on the remote storage system. Multiple repositories are supported.

Format

dag_repositories: [
	"repo1": {
		"url": "s3://my-bucket/dags",
		"conn_id": "blahblah"
	},
	"repo2": {
		"url": "git://repo_name/dags",
		"conn_id": "blahblah2"
	}
]

DagFetcher

The following is the DagFetcher interface, we will implement different fetchers for different storage system, GitDagFetcher and S3DagFetcher. Say we have a remote_repositories.json configuration like above. DagFetcher would download files under s3://my-bucket/dags to $AIRFLOW_HOME/dags/repo_id/

class BaseDagFetcher():
	def fetch(repo_id, url, conn_id, file_path=None):
		"""
		Download files from remote storage to local directory under $AIRFLOW_HOME/dags/repo_id
		"""


Proposed changes

DagBag

We should ensure that we are loading the latest DAGs cache copy, thus we should fetch DAGs from remote repo before we load the DagBag.

Scheduler

Currently, DagFileProcessorManager periodically calls DagFileProcessorManager._refresh_dag_dir to look for new DAG files. We should change this method to fetch DAGs from remote at first .

Versioning:

DAG URI is also DAG version.

    • We load the DAG from the same URI throughout the entire DAG run even if the DAG manifest was changed to a new DAG URI. We will add a URI attribute to the DagRun model to persist the URI used for each DAG run.
    • Users are free to define their own URI naming convention.
    • Same version/URI should not be re-used.

Caching:

Moving DAGs to a remote location will introduce network overhead so we should cache DAGs and avoid unnecessary fetch. We should only re-fetch a DAG when the URI in the DAG manifest is changed (or we find that the last_update_time of the DAG file is changed).

Cache Location

In order to avoid making a remote fetch every time the dag needs to be run it will be best to keep a local cache of dag files for individual Airflow services to use

  • User will specify a cached location. Each service (scheduler, webserver, and worker) will cache the latest DAG files in the cache location.
  • We will store DAGs in the cached location in the following structure: cached_dags_path/<DAG_ID>/<DAG_URI>
  • Users should not modify anything under cached_dags_path/
  • We also save the last_modified_date of the remote files.


Proposed changes

DagBag

The main part of the code base that we will need to change is in DagBag. In collect_dags, we will go through each entries in defined in the DAG manifest and download the DAG files if cache is invalid. In download_dag_file_and_add_to_cache,  we will use different fetching implementation for different kind of uri, e.g. s3 or git.

class DagBag():
	def collect_dags():
		for entry in get_dag_manifest_entries():
			if entry.uri is stored locally:
				self.process_file(entry.uri, only_if_updated=True)			
				continue				
			
			# the DAG is stored remotely
			dag_cache_path = self.get_cache_path(entry.dag_id, entry.uri) 
			if self.cached_dag_file_is_latest(dag_id, uri, dag_cache_path):
				# we have the latest cache of DAG
				continue
			else:
				download_dag_file_and_add_to_cache(dag_id, entry.uri)
			self.process_file(cache_path)

	def cached_dag_file_is_latest(dag_id, uri, dag_cache_path):
		"""
		Check if the DAG file on remote storage is changed since the last download.
		"""

	def download_dag_file_and_add_to_cache(dag_id, uri, dag_cache_path):
		"""
		Download DAG files from remote location to the local dag_cache_path.
		"""
		self.dag_fetcher.fetch_and_cache_dag(dag_id, uri, dag_cache_path)

	def get_cache_path(dag_id, uri):
		return cached_dags_path + "/" + dag_id + "/" + uri


class DagFetcher():
	def fetch_and_cache_dag(dag_id, uri, conn_id, dag_cache_path):
		"""
		Download the DAG file from remote to local file system under dag_cache_path
		and record the last_modified_date in a file on local filesystem. 
        """
		raise NotImplementedError()

	def get_last_modified_date(dag_id, uri, conn_id)
        """
		When the DAG is last modified on remote system.
		"""
		raise NotImplementedError()

Scheduler

Currently the scheduler checks what dags are on disk by calling list_py_file_paths this will need to be changed to instead look at the manifest and load the files on the local filesystem cache. Airflow scheduler will need to persist the URI into the DagRun table when it creates a new DagRun.

DagRun versioning

If we implement versioning of dags it will require a number of changes to the current scheduler. The biggest issue comes from how the scheduler currently propagates the dag object to it's various function calls for task scheduling. As is the scheduler loads in the dag objects that are found in the filesystem, and these passed along to the resulting functions. In order to implement versions we would need to associate a certain dag version/uri to a DagRun, when previous DagRuns are fetched we'll need to check if they were for an earlier dag version/uri and fetch that version/uri if necessary.

8 Comments

  1. Thanks for capturing this, will give it a fuller read later.

    One question I have is how does this interact with "libraies" in dags - for instance some people have:

    from lib.utils import my_factory
    
    dag = my_factory(....)

    (i.e. where the DAG depends upon other files in the DAGs folder - so this would have dags/lib/utils.py?

    1. That's a great question, I'm not entirely sure what the best way to handle that some options could be:

      1. zip all dependencies with each dag
      2. Have some way of specify remote dependency files that need to be downloaded
      3. External libraries would need to be managed via pip
  2. Ian Davison  are you still working on this AIP?

  3. Chao-Han Tsai , I don't think the original author works on it. Feel free to grab and initiate the discussion.

    1. Thanks Tao Feng. Looks like I don't have permission to edit this page. Do you know how I can gain access?

  4. Chao-Han Tsai , I add you permissions to the wiki space, let me know if you can't access it.