본문 바로가기
카테고리 없음

Apache Airflow란? 데이터 파이프라인 자동화부터 기본 예제까지

by JustJunsu 2024. 11. 14.

Airflow란 무엇인가?

Apache Airflow는 데이터 파이프라인(workflows)을 관리하고 자동화하기 위한 오픈소스 플랫폼입니다. 데이터 파이프라인을 시각적으로 설계하고, 스케줄링 및 모니터링할 수 있는 강력한 도구로, 데이터 엔지니어들이 작업의 흐름을 효율적으로 관리할 수 있도록 돕습니다.
Airflow는 파이썬 코드로 워크플로우를 정의하고, DAG(Directed Acyclic Graph) 형태로 작업의 흐름을 설정하여 각 단계의 의존성을 체계적으로 관리합니다.

왜 Apache Airflow가 필요한가?

데이터 파이프라인의 자동화는 현대 데이터 엔지니어링에서 필수적입니다. 특히 대규모 데이터의 ETL(추출, 변환, 로드) 작업을 반복적으로 수행하거나, 여러 작업 간의 복잡한 의존 관계를 다룰 때 Airflow는 다음과 같은 장점을 제공합니다.

  1. 효율적인 스케줄링: Airflow는 정해진 주기에 따라 작업을 자동으로 실행할 수 있어 반복적인 작업을 쉽게 관리할 수 있습니다.
  2. 모니터링과 알림 기능: 대시보드를 통해 실시간 모니터링이 가능하며, 오류 발생 시 알림을 통해 빠르게 대응할 수 있습니다.
  3. 유연한 의존성 설정: DAG를 통해 작업 간의 의존성을 정의하여 특정 작업이 끝난 후 다음 작업이 실행되도록 제어할 수 있습니다.
  4. 확장성과 유연성: 다양한 플러그인을 통해 확장이 가능하며, 복잡한 파이프라인도 Airflow로 손쉽게 관리할 수 있습니다.

Airflow는 어떻게 사용되는가? (주요 사용 방법)

Airflow는 주로 다음과 같은 단계로 데이터 파이프라인을 관리합니다.

  1. DAG 정의: DAG(Directed Acyclic Graph)를 설정하여 파이프라인을 정의합니다. 파이썬 코드로 작성되며, DAG 안에 여러 개의 Task를 추가할 수 있습니다.
  2. Task 설정: Task는 DAG 내에서 실행되는 개별 작업입니다. Operator를 사용하여 다양한 작업을 수행하며, Task 간의 의존성을 정의해 순서에 따라 작업이 진행됩니다.
  3. 스케줄링 설정: 특정 시점 혹은 주기로 DAG가 실행되도록 스케줄을 설정합니다. 예를 들어, 매일 새벽 1시에 데이터를 처리하도록 예약할 수 있습니다.
  4. 실행 및 모니터링: Airflow의 대시보드에서 DAG와 각 Task의 실행 상태를 모니터링하며, 오류가 발생한 Task는 재실행하거나 설정을 변경해 작업을 재개할 수 있습니다.

Airflow 시작하기 - 간단한 예제 코드

아래는 Airflow의 기본적인 예제 코드입니다. 이 예제에서는 두 개의 Task가 순차적으로 실행되며, 첫 번째 Task는 "Hello"를 출력하고, 두 번째 Task는 "World"를 출력합니다.

# hello_world_dag.py
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

# 함수 정의
def print_hello():
    print("Hello")

def print_world():
    print("World")

# DAG 정의
with DAG(
    dag_id="hello_world_dag",
    start_date=datetime(2023, 11, 1),
    schedule_interval="@daily",  # 매일 실행
    catchup=False,
) as dag:

    # Task 정의
    task_hello = PythonOperator(
        task_id="print_hello",
        python_callable=print_hello
    )

    task_world = PythonOperator(
        task_id="print_world",
        python_callable=print_world
    )

    # Task 의존성 설정
    task_hello >> task_world

 

예제 코드 설명

  • DAG 정의: dag_id로 DAG의 이름을 지정하고, start_date와 schedule_interval을 설정합니다. 이 예제에서는 매일 한 번씩 실행되도록 설정하였습니다.
  • Task 정의: PythonOperator를 사용하여 각 Task를 정의하고, 각각 print_hello와 print_world 함수를 호출하도록 설정했습니다.
  • 의존성 설정: task_hello >> task_world와 같이 DAG의 흐름을 설정하여 task_hello가 완료된 후 task_world가 실행되도록 했습니다.

결론

  1. 설치 및 환경 설정: pip install apache-airflow 명령어로 로컬 환경에 설치하거나, Docker를 통해 Airflow를 실행할 수 있습니다.
  2. DAG 작성: 위와 같은 파이썬 스크립트를 작성하여 DAG와 Task를 정의합니다.
  3. 스케줄링 및 실행: 작성한 DAG를 Airflow의 웹 UI에서 활성화하고, 설정한 주기에 맞게 실행 상태를 확인합니다.
  4. 모니터링 및 오류 대응: 대시보드에서 작업 흐름을 확인하고, 알림 설정을 통해 오류 발생 시 즉시 대응할 수 있습니다.

Reference