본문 바로가기

Data/Data Engineering

[Airflow] 인자값 전송 및 Task raise exception

반응형

PythonOperator에서 함수값 넘겨주기

간략한 코드 예시

from airflow.operators.python import PythonOperator

def temp_function(**context):
	print(context['params']['target_date'] # dict형식으로 왼쪽과 같은 방식으로 변수를 불러읽습니다

target_date = '2022-03-10'

temp_function_job = PythonOperator(
    task_id='temp_function_job',
    python_callable=temp_function,
    params={
        'target_date': target_date,
        },
    provide_context=True,
    dag=dag
)

#################
# Airflow쪽 출력
# >>> 2022-03-10

Task에서 강제로 exception발생시키기

이번에 작업하면서 알게되었는데, 특정 문제가 발생할때 기존에 python코드에서 작성했던것처럼, try...except를 발생시키려고 했지만, 아무리 exception을 발생시켜도 airflow에서는 정상동작한것으로 인식이된다.

 

생각해보니, 결국에는 Python코드만 보면 정상적으로 try...except문을 발생시킨것이므로, airflow상에서는 당연히 정상적인 실행이라고 인식이 되는것같았다.

 

따라서, 아래와같은 코드로 task안에서 강제로 exception을 발생시켰다.

from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowException # exception발생 필요 라이브러리

def temp_function_job(**context):
	raise AirflowException("Exception Happen!") # try...except 구문이 필요없다

temp_function_job = PythonOperator(
    task_id='temp_function_job',
    python_callable=temp_function,
    params={
        'target_date': target_date,
        },
    provide_context=True,
    dag=dag
)

이제 정상적으로 강제 exception을 발생시키는것을 확인할수가 있었다.

 

반응형