아래와 같은 DAG이 존재했다고 할때
나는 "꼭" Task 1 -> Task 2가 실행이 완료되고나서, Task 3 -> Task 4가 실행되게끔 만들고싶었다.
사실 위와같이 단순한 DAG에서는 그냥 Task들을 한개로 합쳐서 concurrency를 1로해두면 될것같긴하지만, 실제 작업환경에서는 이러한 구조를 쉽게 바꿀수가 없었다.
따라서, 위와같은 상황을 해결해야했는데, 자료를 찾고 찾다보니 Airflow의 ExternalTaskSensor라는 Operator를 발견했다.
사실 이 Sensor는 2개의 DAG사이를 연결하기 위한 Sensor이긴하지만, 하나의 DAG에서도 사용할수가있다. 사용방법은 아래와같다.
with DAG(
'TEST',
default_args = args,
description='TEST',
schedule_interval='0 0 * * 0',
start_date=datetime(2022, 8, 11),
max_active_runs = 1,
catchup=False) as dag:
start = DummyOperator(
task_id = "start")
end = DummyOperator(
task_id = "end", trigger_rule=TriggerRule.ALL_DONE)
task_1 = DummyOperator(
task_id = "task_1")
task_2 = DummyOperator(
task_id = "task_2")
task_3 = DummyOperator(
task_id = "task_3")
task_4 = DummyOperator(
task_id = "task_4")
waiting_another_task = ExternalTaskSensor(
task_id = f"waiting_task",
external_dag_id = dag.dag_id,
external_task_id = "task_2",
mode = 'reschedule')
start >> [task_1 >> task_2] >> end
start >> [waiting_another_task >> task_3 >> task_4] >> end
ExternalTaskSensor에는 꼭 필요한 인자값이 있다.
- external_dag_id = 기다려야하는 task가 포함된 dag id. 우리는 현재 사용하려는 dag안의 task를 기다려야하니, 위에서 정의한 dag의 dag_id를 사용한다
- external_task_id = 기다려야하는 dag안에 있는 task id. 우리가 기다리고자하는 task_id를 써넣어주면된다
추가로 예제에서는 mode라는 인자값에 대해서도 값을줬는데
- mode = Sensor가 대기하는 mode를 의미한다. 기본값으로는 poke로 되어있는데, 이게 안좋은 이유중 하나가 poke로 설정할경우 계속해서 해당 operator가 running상태로 유지된다. 즉, 만약에 cpu파워가 충분하지않다면 불필요한 리소스를 계속해서 소모하게된다. 따라서 만약에 오래걸리는 작업을 기다려야하는경우에는 꼭 reschedule로 설정해줘야한다.
위와같이 DAG을 작성하게되면, task_3 >> task_4는 task_2가 완료되어야지만 돌아가게된다
'Data > Data Engineering' 카테고리의 다른 글
[Docker+Airflow] Airflow로 인해 너무 커져버린 Docker Container용량 (0) | 2022.08.20 |
---|---|
[Docker] Docker 기본 파일시스템 경로 변경 (0) | 2022.08.18 |
[Airflow] 다중 클러스터에서 Airflow Celery Worker 환경 구성하기 (11) | 2022.08.10 |
[Docker] 간단한 명령어 정리 (0) | 2022.07.07 |
[Terraform] EKS삭제 시 "aws_auth" 에러 해결방법 (0) | 2022.05.31 |