본문 바로가기

Data/MLOps

[Airflow] 성공, 실패시 Slack 전송 작업

반응형

[참고] https://moons08.github.io/programming/airflow-slack/

작업중 마주한 문제

새로운 airflow 환경이 필요할때 ~/.bashrc에서 AIRFLOW_HOME을 원하고자하는 폴더 경로를 넣기

export AIRFLOW_HOME=$HOME/[원하는 경로]

 

아래와 같이 모듈을 Import 시켰으나 오류가 발생

from airflow.providers.slack.operators.slack import SlackAPIPostOperator

아래와 같이 pip install 모듈로 해결

pip install apache-airflow-providers-slack

----------------

Token을 넣어야하는데 어느 토큰을 넣어야하는지 정확하게 몰랐음

- OAuth & Permissions 에 있는 토큰을 넣어줘야함

token관련 내용은 최대한 포함을 안하는게 보안상 좋으니, 이번 포스팅에는 airflow webserver에 토콘정보를 올려놓고 이 정보를 가져오는 방식을 사용

 

  • 먼저 airflow webserver 를 실행하여 아래와 같이 카테고리에 접속 (Admin - Connections)

  • 접속한이후 왼쪽에 + 모양을 클릭하여 아래와 같이 slack이라는 Connection Id, 그리고 Slack Webhook Type으로 설정하고, 다른 칸은 채울필요없이 Password에 위에서 설명한 Token정보값을 넣어준다.

  • 나는 이번에 dags 폴더 아래에 temputils폴더를 생성하고 alert.py 파일을 아래와 같이 작성
    • 그리고 실패시 호출되는 slack_failure_alert 함수
    • 성공시 호출되는 slack_success_alert 함수를 작성
from airflow.hooks.base import BaseHook
from airflow.providers.slack.operators.slack import SlackAPIPostOperator

class SlackAlert:
    def __init__(self, channel):
        self.slack_channel = channel
        # 위에서 설정한 connection Id의 password(token) 정보값 가져오기
        self.slack_token = BaseHook.get_connection('slack').password
        
    def slack_failure_alert(self, context):
        alert = SlackAPIPostOperator(
            task_id='slack_failed',
            channel=self.slack_channel,
            token=self.slack_token,
            text="""
                *Result* Failed :alert:
                *Task*: {task}  
                *Dag*: {dag}
                *Execution Time*: {exec_date}  
                *Log Url*: {log_url}
                """.format(
                    task=context.get('task_instance').task_id,
                    dag=context.get('task_instance').dag_id,
                    exec_date=context.get('execution_date'),
                    log_url=context.get('task_instance').log_url,
                    )
                  )
        return alert.execute(context=context)

    def slack_success_alert(self, context):
        alert = SlackAPIPostOperator(
            task_id='slack_success',
            channel=self.slack_channel,
            token=self.slack_token,
            text="""
                *Result* Success :checkered_flag:
                *Task*: {task}
                *Dag*: {dag}
                *Execution Time*: {exec_date}
                *Log Url*: {log_url}
                """.format(
                    task=context.get('task_instance').task_id,
                    dag=context.get('task_instance').dag_id,
                    exec_date=context.get('execution_date'),
                    log_url=context.get('task_instance').log_url,
                    )
                  )
        return alert.execute(context=context)

DAG파일 예시

default_args에 성공시, 실패시 호출되는 함수를 끌어온다.

  • on_success_callback - airflow dag 성공시 호출
  • on_failure_callback - airflow dag 실패시 호출
from airflow import DAG
from bbutils.alert import SlackAlert
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago

alert = SlackAlert('#채널명 기입')

default_args = {
    'owner': 'jaeyung',
    'email': ['jaeyung1001@naver.com'],
    'on_success_callback': alert.slack_success_alert,
    'on_failure_callback': alert.slack_failure_alert
}

dag = DAG(
        dag_id='dag_name',
        default_args=default_args,
        start_date=days_ago(2),
        schedule_interval='* * * * *' # 주기 설정
        )

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='temp',
    bash_command='date',
    dag=dag)

t1

 

그렇다면 이제 dag이 성공적으로 실행될경우 다음과같이 설정한 채널에 slack메세지가 전송하게 된다

반응형