Data/Data Engineering

[Airflow] 다중 클러스터에서 Airflow Celery Worker 환경 구성하기

재융 2022. 8. 10. 20:43
반응형

이번에 다중 클러스터(즉, 여러대의 컴퓨터)에서 Airflow Celery Worker환경을 세팅해야하는 업무가 생겨서 작업을했다. 과정중에 무수히 많은 시행착오가 있었고, 다음에는 이를 방지하기위해 글을 남깁니다.또한, 다른분들도 이러한 환경을 세팅할때 참고하시면 좋을것같습니다.

 

일단 모든 Airflow는 Docker로 구축하였습니다. 또한 Master Node와 Worker Node를 아래와 같이 구성했습니다.

버전은 다음과 같습니다

  • Airflow - 2.3.3
  • Python - 3.8

Master Node

  • airflow worker
  • airflow scheduler
  • airflow webserver
  • airflow trigger
  • postgresql
  • redis

Worker Node

  • airflow worker

주의해야할점

Master Node와 Worker Node는 서로 통신이 가능해야하며, 특정 포트가 열려있는 상황이여야합니다. 기본적으로 지정되는 포트는 다음과 같습니다

  • airflow webserver - 8080
  • postgresql - 5432
  • redis - 6379

Docker-compose로 Master Node Airflow 설치

위와같이 Master Node에서 필요로하는 요소들이 굉장히 많은데, 이것을 하나하나 설치를 하는건 복잡해서, docker-compose를 통해서 쉽게 설치가 가능하다. 이 방식은 airflow 공식 documentation에도 소개된다.

https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

# airflow 요소들이 담겨져있는 docker-compose.yaml 파일 다운로드
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml'

docker-compose.yaml파일에서 고쳐줘야하는 부분이 있는데, postgresql, redis를 export로 포트를 여는것이 아닌, ports로 포트를 열어줘야한다.

간단하게 export와 ports에 대한 설명을 하자면, export는 도커 컨테이너사이에만 개방하는 포트 개념이고, ports는 외부접근에도 개방을 해주는 개념이다.

 

아래와같이 수정해준다.

service:
  postgres:
    image: postgres:13
    ports:
      - 5432:5432
    ...
  redis:
    image: redis:latest
    ports:
      - 6379:6379
    ...

추가로 docker-compose.yaml파일이 있는 디렉토리에서 폴더를 생성해준다

mkdir dags
mkdir logs
mkdir plugins

이렇게 수정해주면 Master Node는 세팅이 끝나게된다. 다음 명령어로 Master Node에 Airflow를 실행시켜준다.

docker-compose up airflow-init

docker-compose up

docker ps
``` 아래와 같이 docker container가 작동하는것을 확인할수있음
CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS                    PORTS                              NAMES
247ebe6cf87a   apache/airflow:2.3.3   "/usr/bin/dumb-init …"   3 minutes ago    Up 3 minutes (healthy)    8080/tcp                           compose_airflow-worker_1
ed9b09fc84b1   apache/airflow:2.3.3   "/usr/bin/dumb-init …"   3 minutes ago    Up 3 minutes (healthy)    8080/tcp                           compose_airflow-scheduler_1
7cb1fb603a98   apache/airflow:2.3.3   "/usr/bin/dumb-init …"   3 minutes ago    Up 3 minutes (healthy)    0.0.0.0:8080->8080/tcp             compose_airflow-webserver_1
74f3bbe506eb   postgres:13            "docker-entrypoint.s…"   18 minutes ago   Up 17 minutes (healthy)   5432/tcp                           compose_postgres_1
0bd6576d23cb   redis:latest           "docker-entrypoint.s…"   10 hours ago     Up 17 minutes (healthy)   0.0.0.0:6379->6379/tcp             compose_redis_1
```

 

Worker Node Airflow 세팅

이제 Worker Node를 설치하려는 클러스터(컴퓨터)로 넘어와서 Dockerfile을 아래와같이 생성해준다

FROM ubuntu:latest

# install airflow
ENV AIRFLOW_HOME="/root/airflow"
ENV AIRFLOW__WORKER__NAME="worker_node"

ENV AIRFLOW__MASTER__IP="HOST_IP"
ENV AIRFLOW__POSTGRESQL__HOST=${AIRFLOW__MASTER__IP}:5432
ENV AIRFLOW__REDIS__HOST=${AIRFLOW__MASTER__IP}:6379
ENV AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
ENV AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql+psycopg2://airflow:airflow@${AIRFLOW__POSTGRESQL__HOST}/airflow"
ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://airflow:airflow@${AIRFLOW__POSTGRESQL__HOST}/airflow"
ENV AIRFLOW__CELERY__RESULT_BACKEND="db+postgresql://airflow:airflow@${AIRFLOW__POSTGRESQL__HOST}/airflow"
ENV AIRFLOW__CELERY__BROKER_URL="redis://:@${AIRFLOW__REDIS__HOST}/0"
ENV AIRFLOW__LOGGING__BASE_LOG_FOLDER="${AIRFLOW_HOME}/log"
ENV AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY="${AIRFLOW_HOME}/log/scheduler"
ENV AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION="${AIRFLOW_HOME}/log/dag_processor_manager/dag_processor_manager.log"
ENV AIRFLOW__CORE__HOSTNAME_CALLABLE="airflow.utils.net.get_host_ip_address"
ENV AIRFLOW__CORE__FERNET_KEY="FERNET_KEY_INPUT"
ENV AIRFLOW__WEBSERVER__SECRET_KEY="HOST_SECRET_KEY"
ENV AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION="true"
ENV AIRFLOW__CORE__LOAD_EXAMPLES="false"
ENV AIRFLOW__API__AUTH_BACKENDS="airflow.api.auth.backend.basic_auth"
ENV _PIP_ADDITIONAL_REQUIREMENTS="${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-google}"

RUN pip install apache-airflow[celery]==2.3.3
RUN pip install psycopg2-binary
RUN pip install Redis
RUN airflow db init
RUN mkdir ./airflow/dags
RUN mkdir ./airflow/plugins


RUN echo ${SDA_TOKEN} > /etc/moreh/token

여기서 주의해줘야하는 환경변수는 아래와 같다

  • AIRFLOW__MASTER__IP - Master Node의 IP주소를 입력해준다
  • AIRFLOW__CORE__HOSTNAME__CALLABLE - Master Node의 IP주소를 가져오게 하는건데 바로 IP주소를 입력하는것이 아닌 위와같은 방식으로 입력해야 인식이 된다. default는 socket.getfqdn으로 현재 컴퓨터의 Hostname을 입력하게된다. 해당 값이 세팅을 안할경우 Master Node Webserver에서 WorkerNode의 Log를 볼수가 없다
  • AIRFLOW__WEBSERVER__SECRET_KEY - 해당 값은 Master Node의 webserver container로 접속하여 airflow config list | grep secret 명령어로 확인할수가있다
  • AIRFLOW__LOGGING_BASE_LOG_FOLDER - 이거는 왜인지는 모르겠으나, Master 에서 Worker의 로그를 가져올때 logs폴더를 보는것이 아닌 log폴더를 보고있다. 그래서 위와같이 수정했고 그밑에 LOG_LOCATION, LOG_DIRECTORY 두개도 동일하게 logs에서 log로 바꿔줬다

이렇게 세팅된 Dockerfile을 실행하고 localhost:5555 (Flower)로 접속하면 아래와같이 Worker Node와 Master Node가 연결이 된것을 볼수가있다(model-vm00 Master, model-vm01 Worker)

Celery Worker환경에서 DAG실행시 주의해야할점

Airflow의 Dag을 실행할때 Celery worker환경에서 주의해야할점은, Master, Worker둘다 같은 이름, 같은 dag파일이 존재해야한다. 하나라도 틀리거나 존재하지않으면 DAG이 실행이 안된다는점을 알아둬야한다.

반응형