본문 바로가기

Data/Data Engineering

[Airflow] 관련 정리

반응형

아래 정보들은 fastcampus의 강좌를 보고 중요한 내용만 정리한것입니다 :)

기존 방식의 문제점(이라 쓰고 Airflow는 이러한 문제점들을 해결해준다)

  • 실패 복구 - 실패한 배치에 대해서 복구가 어렵다는점
  • 모니터링 - 내가 등록한 배치가 잘 돌아가고있는지 확인하기 어려운점
  • 의존성 관리 - 배치간의 의존성이 존재할경우 관리하기가 까다롭다
  • 확장성 - 중앙화 해서 관리하는 도구가 없기때문에 분산된 환경에서 파이프라인들을 관리하기 힘듬
  • 배포 - 새로운 워크 플로우를 배포하기 힘들다

Airflow 장점

  • Python으로 쉬운 프로그래밍 가능 (사람 코딩능력에 다르겠지만...)
  • 분산된 환경에서 확장성이 있음
  • 대시보드 존재
  • 오픈소스이므로 커스터마이징 가능
  • 위에 기존에 존재하던 문제점 해결가능

Airflow 구성

Airflow는 워크플로우를 작성하는데. 여기서의 워크플로우는 DAG(Directed Acyclic Graph)로 이뤄져있음. 또한 다음과같은 요소가 포함되어있음

  • 웹 서버 - 웹 대시보드 UI
  • 스케줄러 - 워크플로우가 언제 실행되는지 관리
  • Metastore - 메타데이터 관리
  • Executor - 테스크(task)가 어떻게 실행되는지 정의
  • Worker - 테스트를 실행하는 프로세스

Airflow 동작방식

  • DAG를 작성하여 Workflow를 만든다. DAG은 Task로 구성
  • Task는 Operator(ex. bashOperator, pythonOperator ...)가 인스턴스화 된 것
  • DAG를 실행시킬때 Scheduler는 DagRun 오브젝트를 만든다
  • DagRun 오브젝트는 Task Instance를 만든다
  • Worker가 Task를 수행 후 DagRun의 상태를 완료로 바꿔놓는다

Airflow 다양한 명령어

몇개는 이미 알고있는거라 스킵하는게 많음

# 특정 dag안에 존재하는 Task 나열
airflow tasks list [dag이름]

# 특정 Dag 트리거 걸기(실행시키기)
airflow dags trigger [dag이름]

Operator

airflow에는 여러 Operator가 존재함

  • Action Operator : 액셜 실행 Operator
  • Transfer Operator : 데이터 이동 Operator
  • Sensor : 조건이 맞을때까지 대기

강의를 보면서 HttpSensor라는걸 알게되었는데, 사용용도는 특정 REST API가 Available하는지 확인하기 위함인것으로 보인다.

다른 Operator에서 데이터 가져오기

xcom_pull이라는 함수를 활용하면 된다

# extract_nft라는 task에서 출력된 데이터를 가져온다
assets = ti.xcom_pull(task_ids=['extract_nft'])

간단한 sqlite3 명령어

# 존재하는 databases 보여주기
.databases

# 존재하는 table 보여주기
.tables

# 특정 table column정보 보여주기
PRAGMA table_info([table_name]);

Backfill

start_date에서 설정한 날짜에서 현재날짜까지 배치를 다시 돌리고싶다면 DAG옵션에 catchup변수를 True로 설정

default_args = {
  'start_date': datetime(2021, 1, 1),
}

with DAG(
  dag_id='nft-pipeline',
  schedule_interval='@daily',
  default_args=default_args,
  tags=['nft'],
  catchup=True) as dag:
  pass
  
  ...

 

반응형