Develop/DevOps
[Airflow] 성공, 실패시 Slack 전송 작업
재융
2021. 12. 22. 12:16
반응형
[참고] https://moons08.github.io/programming/airflow-slack/
작업중 마주한 문제
새로운 airflow 환경이 필요할때 ~/.bashrc에서 AIRFLOW_HOME을 원하고자하는 폴더 경로를 넣기
export AIRFLOW_HOME=$HOME/[원하는 경로]
아래와 같이 모듈을 Import 시켰으나 오류가 발생
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
아래와 같이 pip install 모듈로 해결
pip install apache-airflow-providers-slack
----------------
Token을 넣어야하는데 어느 토큰을 넣어야하는지 정확하게 몰랐음
- OAuth & Permissions 에 있는 토큰을 넣어줘야함
token관련 내용은 최대한 포함을 안하는게 보안상 좋으니, 이번 포스팅에는 airflow webserver에 토콘정보를 올려놓고 이 정보를 가져오는 방식을 사용
- 먼저 airflow webserver 를 실행하여 아래와 같이 카테고리에 접속 (Admin - Connections)
- 접속한이후 왼쪽에 + 모양을 클릭하여 아래와 같이 slack이라는 Connection Id, 그리고 Slack Webhook Type으로 설정하고, 다른 칸은 채울필요없이 Password에 위에서 설명한 Token정보값을 넣어준다.
- 나는 이번에 dags 폴더 아래에 temputils폴더를 생성하고 alert.py 파일을 아래와 같이 작성
- 그리고 실패시 호출되는 slack_failure_alert 함수
- 성공시 호출되는 slack_success_alert 함수를 작성
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
class SlackAlert:
def __init__(self, channel):
self.slack_channel = channel
# 위에서 설정한 connection Id의 password(token) 정보값 가져오기
self.slack_token = BaseHook.get_connection('slack').password
def slack_failure_alert(self, context):
alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=self.slack_channel,
token=self.slack_token,
text="""
*Result* Failed :alert:
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
)
return alert.execute(context=context)
def slack_success_alert(self, context):
alert = SlackAPIPostOperator(
task_id='slack_success',
channel=self.slack_channel,
token=self.slack_token,
text="""
*Result* Success :checkered_flag:
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
)
return alert.execute(context=context)
DAG파일 예시
default_args에 성공시, 실패시 호출되는 함수를 끌어온다.
- on_success_callback - airflow dag 성공시 호출
- on_failure_callback - airflow dag 실패시 호출
from airflow import DAG
from bbutils.alert import SlackAlert
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
alert = SlackAlert('#채널명 기입')
default_args = {
'owner': 'jaeyung',
'email': ['jaeyung1001@naver.com'],
'on_success_callback': alert.slack_success_alert,
'on_failure_callback': alert.slack_failure_alert
}
dag = DAG(
dag_id='dag_name',
default_args=default_args,
start_date=days_ago(2),
schedule_interval='* * * * *' # 주기 설정
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='temp',
bash_command='date',
dag=dag)
t1
그렇다면 이제 dag이 성공적으로 실행될경우 다음과같이 설정한 채널에 slack메세지가 전송하게 된다
반응형