반응형
[참고] https://moons08.github.io/programming/airflow-slack/
작업중 마주한 문제
새로운 airflow 환경이 필요할때 ~/.bashrc에서 AIRFLOW_HOME을 원하고자하는 폴더 경로를 넣기
export AIRFLOW_HOME=$HOME/[원하는 경로]
아래와 같이 모듈을 Import 시켰으나 오류가 발생
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
아래와 같이 pip install 모듈로 해결
pip install apache-airflow-providers-slack
----------------
Token을 넣어야하는데 어느 토큰을 넣어야하는지 정확하게 몰랐음
- OAuth & Permissions 에 있는 토큰을 넣어줘야함
token관련 내용은 최대한 포함을 안하는게 보안상 좋으니, 이번 포스팅에는 airflow webserver에 토콘정보를 올려놓고 이 정보를 가져오는 방식을 사용
- 먼저 airflow webserver 를 실행하여 아래와 같이 카테고리에 접속 (Admin - Connections)
- 접속한이후 왼쪽에 + 모양을 클릭하여 아래와 같이 slack이라는 Connection Id, 그리고 Slack Webhook Type으로 설정하고, 다른 칸은 채울필요없이 Password에 위에서 설명한 Token정보값을 넣어준다.
- 나는 이번에 dags 폴더 아래에 temputils폴더를 생성하고 alert.py 파일을 아래와 같이 작성
- 그리고 실패시 호출되는 slack_failure_alert 함수
- 성공시 호출되는 slack_success_alert 함수를 작성
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
class SlackAlert:
def __init__(self, channel):
self.slack_channel = channel
# 위에서 설정한 connection Id의 password(token) 정보값 가져오기
self.slack_token = BaseHook.get_connection('slack').password
def slack_failure_alert(self, context):
alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=self.slack_channel,
token=self.slack_token,
text="""
*Result* Failed :alert:
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
)
return alert.execute(context=context)
def slack_success_alert(self, context):
alert = SlackAPIPostOperator(
task_id='slack_success',
channel=self.slack_channel,
token=self.slack_token,
text="""
*Result* Success :checkered_flag:
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
)
return alert.execute(context=context)
DAG파일 예시
default_args에 성공시, 실패시 호출되는 함수를 끌어온다.
- on_success_callback - airflow dag 성공시 호출
- on_failure_callback - airflow dag 실패시 호출
from airflow import DAG
from bbutils.alert import SlackAlert
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
alert = SlackAlert('#채널명 기입')
default_args = {
'owner': 'jaeyung',
'email': ['jaeyung1001@naver.com'],
'on_success_callback': alert.slack_success_alert,
'on_failure_callback': alert.slack_failure_alert
}
dag = DAG(
dag_id='dag_name',
default_args=default_args,
start_date=days_ago(2),
schedule_interval='* * * * *' # 주기 설정
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='temp',
bash_command='date',
dag=dag)
t1
그렇다면 이제 dag이 성공적으로 실행될경우 다음과같이 설정한 채널에 slack메세지가 전송하게 된다
반응형
'Develop > DevOps' 카테고리의 다른 글
[MLOps] CKA 자격증 합격 후기 (2022.01) (0) | 2022.01.08 |
---|---|
[MLOps] FastCampus 강의 정리 - 개념, DVC, MLFlow (0) | 2022.01.07 |
[MLOps] CKA Mock Exam 틀린거 정리 (0) | 2021.12.16 |
[MLOps] 시험 전 도움이 되는 정보, 명령어 정리 (0) | 2021.12.16 |
[MLOps] Kubernetes CKA자격증 공부 - Troubleshooting (0) | 2021.12.13 |