Airflow란 무엇인가?
Apache Airflow는 데이터 파이프라인(workflows)을 관리하고 자동화하기 위한 오픈소스 플랫폼입니다. 데이터 파이프라인을 시각적으로 설계하고, 스케줄링 및 모니터링할 수 있는 강력한 도구로, 데이터 엔지니어들이 작업의 흐름을 효율적으로 관리할 수 있도록 돕습니다.
Airflow는 파이썬 코드로 워크플로우를 정의하고, DAG(Directed Acyclic Graph) 형태로 작업의 흐름을 설정하여 각 단계의 의존성을 체계적으로 관리합니다.
왜 Apache Airflow가 필요한가?
데이터 파이프라인의 자동화는 현대 데이터 엔지니어링에서 필수적입니다. 특히 대규모 데이터의 ETL(추출, 변환, 로드) 작업을 반복적으로 수행하거나, 여러 작업 간의 복잡한 의존 관계를 다룰 때 Airflow는 다음과 같은 장점을 제공합니다.
- 효율적인 스케줄링: Airflow는 정해진 주기에 따라 작업을 자동으로 실행할 수 있어 반복적인 작업을 쉽게 관리할 수 있습니다.
- 모니터링과 알림 기능: 대시보드를 통해 실시간 모니터링이 가능하며, 오류 발생 시 알림을 통해 빠르게 대응할 수 있습니다.
- 유연한 의존성 설정: DAG를 통해 작업 간의 의존성을 정의하여 특정 작업이 끝난 후 다음 작업이 실행되도록 제어할 수 있습니다.
- 확장성과 유연성: 다양한 플러그인을 통해 확장이 가능하며, 복잡한 파이프라인도 Airflow로 손쉽게 관리할 수 있습니다.
Airflow는 어떻게 사용되는가? (주요 사용 방법)
Airflow는 주로 다음과 같은 단계로 데이터 파이프라인을 관리합니다.
- DAG 정의: DAG(Directed Acyclic Graph)를 설정하여 파이프라인을 정의합니다. 파이썬 코드로 작성되며, DAG 안에 여러 개의 Task를 추가할 수 있습니다.
- Task 설정: Task는 DAG 내에서 실행되는 개별 작업입니다. Operator를 사용하여 다양한 작업을 수행하며, Task 간의 의존성을 정의해 순서에 따라 작업이 진행됩니다.
- 스케줄링 설정: 특정 시점 혹은 주기로 DAG가 실행되도록 스케줄을 설정합니다. 예를 들어, 매일 새벽 1시에 데이터를 처리하도록 예약할 수 있습니다.
- 실행 및 모니터링: Airflow의 대시보드에서 DAG와 각 Task의 실행 상태를 모니터링하며, 오류가 발생한 Task는 재실행하거나 설정을 변경해 작업을 재개할 수 있습니다.
Airflow 시작하기 - 간단한 예제 코드
아래는 Airflow의 기본적인 예제 코드입니다. 이 예제에서는 두 개의 Task가 순차적으로 실행되며, 첫 번째 Task는 "Hello"를 출력하고, 두 번째 Task는 "World"를 출력합니다.
# hello_world_dag.py
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
# 함수 정의
def print_hello():
print("Hello")
def print_world():
print("World")
# DAG 정의
with DAG(
dag_id="hello_world_dag",
start_date=datetime(2023, 11, 1),
schedule_interval="@daily", # 매일 실행
catchup=False,
) as dag:
# Task 정의
task_hello = PythonOperator(
task_id="print_hello",
python_callable=print_hello
)
task_world = PythonOperator(
task_id="print_world",
python_callable=print_world
)
# Task 의존성 설정
task_hello >> task_world
예제 코드 설명
- DAG 정의: dag_id로 DAG의 이름을 지정하고, start_date와 schedule_interval을 설정합니다. 이 예제에서는 매일 한 번씩 실행되도록 설정하였습니다.
- Task 정의: PythonOperator를 사용하여 각 Task를 정의하고, 각각 print_hello와 print_world 함수를 호출하도록 설정했습니다.
- 의존성 설정: task_hello >> task_world와 같이 DAG의 흐름을 설정하여 task_hello가 완료된 후 task_world가 실행되도록 했습니다.
결론
- 설치 및 환경 설정: pip install apache-airflow 명령어로 로컬 환경에 설치하거나, Docker를 통해 Airflow를 실행할 수 있습니다.
- DAG 작성: 위와 같은 파이썬 스크립트를 작성하여 DAG와 Task를 정의합니다.
- 스케줄링 및 실행: 작성한 DAG를 Airflow의 웹 UI에서 활성화하고, 설정한 주기에 맞게 실행 상태를 확인합니다.
- 모니터링 및 오류 대응: 대시보드에서 작업 흐름을 확인하고, 알림 설정을 통해 오류 발생 시 즉시 대응할 수 있습니다.