반응형
PythonOperator에서 함수값 넘겨주기
간략한 코드 예시
from airflow.operators.python import PythonOperator
def temp_function(**context):
print(context['params']['target_date'] # dict형식으로 왼쪽과 같은 방식으로 변수를 불러읽습니다
target_date = '2022-03-10'
temp_function_job = PythonOperator(
task_id='temp_function_job',
python_callable=temp_function,
params={
'target_date': target_date,
},
provide_context=True,
dag=dag
)
#################
# Airflow쪽 출력
# >>> 2022-03-10
Task에서 강제로 exception발생시키기
이번에 작업하면서 알게되었는데, 특정 문제가 발생할때 기존에 python코드에서 작성했던것처럼, try...except를 발생시키려고 했지만, 아무리 exception을 발생시켜도 airflow에서는 정상동작한것으로 인식이된다.
생각해보니, 결국에는 Python코드만 보면 정상적으로 try...except문을 발생시킨것이므로, airflow상에서는 당연히 정상적인 실행이라고 인식이 되는것같았다.
따라서, 아래와같은 코드로 task안에서 강제로 exception을 발생시켰다.
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowException # exception발생 필요 라이브러리
def temp_function_job(**context):
raise AirflowException("Exception Happen!") # try...except 구문이 필요없다
temp_function_job = PythonOperator(
task_id='temp_function_job',
python_callable=temp_function,
params={
'target_date': target_date,
},
provide_context=True,
dag=dag
)
이제 정상적으로 강제 exception을 발생시키는것을 확인할수가 있었다.
반응형
'Data > Data Engineering' 카테고리의 다른 글
[Terraform] Error: Error acquiring the state lock 문제 해결 (0) | 2022.03.30 |
---|---|
[Airflow] MWAA - Datadog send_metrics 적용 (0) | 2022.03.16 |
[Flink] Flink 정보 정리 (0) | 2022.02.19 |
[Kafka] Kafka 정리 (0) | 2022.02.05 |
[Airflow] 관련 정리 (0) | 2022.02.02 |