본문 바로가기

Data/Data Engineering

[Airflow] DAG안에서 병렬 실행시 특정 Task가 완료될때까지 기다려야하는 경우

반응형

아래와 같은 DAG이 존재했다고 할때

나는 "꼭" Task 1 -> Task 2가 실행이 완료되고나서, Task 3 -> Task 4가 실행되게끔 만들고싶었다.

사실 위와같이 단순한 DAG에서는 그냥 Task들을 한개로 합쳐서 concurrency를 1로해두면 될것같긴하지만, 실제 작업환경에서는 이러한 구조를 쉽게 바꿀수가 없었다.

 

따라서, 위와같은 상황을 해결해야했는데, 자료를 찾고 찾다보니 Airflow의 ExternalTaskSensor라는 Operator를 발견했다.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#module-airflow.sensors.external_task

 

airflow.sensors.external_task — Airflow Documentation

 

airflow.apache.org

사실 이 Sensor는 2개의 DAG사이를 연결하기 위한 Sensor이긴하지만, 하나의 DAG에서도 사용할수가있다. 사용방법은 아래와같다.

with DAG(
    'TEST',
    default_args = args,
    description='TEST',
    schedule_interval='0 0 * * 0',
    start_date=datetime(2022, 8, 11),
    max_active_runs = 1,
    catchup=False) as dag:
	
    start = DummyOperator(
    	task_id = "start")
    end = DummyOperator(
    	task_id = "end", trigger_rule=TriggerRule.ALL_DONE)
    
    task_1 = DummyOperator(
    	task_id = "task_1")
    task_2 = DummyOperator(
    	task_id = "task_2")
    task_3 = DummyOperator(
    	task_id = "task_3")
    task_4 = DummyOperator(
    	task_id = "task_4")
    
    waiting_another_task = ExternalTaskSensor(
    	task_id = f"waiting_task",
        external_dag_id = dag.dag_id,
        external_task_id = "task_2",
        mode = 'reschedule')
        
    start >> [task_1 >> task_2] >> end
    start >> [waiting_another_task >> task_3 >> task_4] >> end

ExternalTaskSensor에는 꼭 필요한 인자값이 있다.

- external_dag_id = 기다려야하는 task가 포함된 dag id. 우리는 현재 사용하려는 dag안의 task를 기다려야하니, 위에서 정의한 dag의 dag_id를 사용한다

- external_task_id = 기다려야하는 dag안에 있는 task id. 우리가 기다리고자하는 task_id를 써넣어주면된다

 

추가로 예제에서는 mode라는 인자값에 대해서도 값을줬는데

- mode = Sensor가 대기하는 mode를 의미한다. 기본값으로는 poke로 되어있는데, 이게 안좋은 이유중 하나가 poke로 설정할경우 계속해서 해당 operator가 running상태로 유지된다. 즉, 만약에 cpu파워가 충분하지않다면 불필요한 리소스를 계속해서 소모하게된다. 따라서 만약에 오래걸리는 작업을 기다려야하는경우에는 꼭 reschedule로 설정해줘야한다.

 

위와같이 DAG을 작성하게되면, task_3 >> task_4는 task_2가 완료되어야지만 돌아가게된다

반응형