Airflow Executor

1. Executor의 개념

  • 직역하면 실행기
  • 작업(Task) 인스턴스가 실행되는 메커니즘, 즉 작업을 어떻게 실행할지 결정하는 핵심 구성요소
  • Executor는 플러그인 방식으로 설계되어 있어, 교체가 가능함

2. Executor의 종류

(1) 실행 위치에 따른 구분

  • Task를 스케줄러가 위치한 머신에서 실행하느냐, 외부 워커나 클러스터에서 실행하느냐로 구분할 수 있다.
  • 위치에 따라 Local과 Remote 로 나눌 수 있다.
Executor 구분 설명 예시
Local Executor 스케줄러 노드에서 프로세스를 생성해서 Task를 실행
단일 머신 기반 운영에 적합(즉, 규모가 크지 않은 경우)
Local Executor
Remote Executor 워커 풀잉나 외부 실행 환경에서 Task를 실행하는 경우
Celery 워커로 분산 실행하는 등의 여러 대 운영인 경우
Celery Executor
BatchExecutor
EdgeExecutor
KubernetesEecutor

(2) 세부 Executor 종류

Executor 설명 병렬처리
SequentialExecutor - 가장 단순한 실행 방식의 Executor
- 한 번에 하나의 Task만을 실행
SQLite 기반 환경에서 사용 가능하므로 별도 DB 필요 없음
테스트, 개발, 개인용에 적합
불가능
LocalExecutor - 하나의 서버에서 멀티프로세싱으로 병렬 실행 가능
- MySQL 또는 PostgreSQL 필요
- 중소 규모 환경에 적합
- 비교적 설정이 간단하며 병렬 처리가 가능
가능
CeleryExecutor - 여러 워커 노드를 두고 분산처리를 하는 Executor
- Celery + Redis/RabbitMQ 필요
- 대규모 환경에서 많이 사용
- 확장성이 좋으나 운영 복잡도가 높다는 단점
가능
KubernetesExecutor - Task마다 독립된 K8s Pod을 생성해 작업을 처리
- 클라우드/컨테이너 환겨엥 최적
- 리소스 격리 효과가 강력함
- 유연성이 좋으나, 초기 설정 난이도가 높음
가능
CeleryKubernetesExecutor - 일부는 Celery, 일부는 K8s로 처리하는 혼합형
- 유연하지만 구조가 매우 복잡
가능

3. Executor 설정하기

  • 설정파일(airflow.cfg)에서 [core] 섹션의 executor 부분에서 설정한다.
1
2
3
# airflow.cfg
[core]
executor = KubernetesExecutor
  • docker compose 를 사용하는 경우에는 AIRFLOW__CORE__EXECUTOR 옵션을 바꿔주면 된다.
    (그러면 airflow.cfg에 적용된다.)
1
2
3
4
5
...(전략)
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: ${AIRFLOW_CORE_EXECUTOR:-LocalExecutor}
...(후략)

4. 여러 Executor를 동시에 사용하기

  • 2.10.0 버전부터 multi-executor 구성 적용이 가능해짐
  • 이를 통해 각각의 Executor가 가지는 장점을 활용하고, 단점은 보완하는 체계 구축이 가능
  • 설정파일에서 아래와 가팅 여러 Executor를 쉼표로 구분하여 작성
1
2
3
# airflow.cfg
[core]
executor = LocalExecutor,CeleryExecutor
  • 이후 특정 작업에 대해 특정 Executor를 지정하고 싶다면 아래와 같이 활요
1
2
3
4
5
6
7
8
9
10
11
BashOperator(
    task_id="hello_world",
    executor="LocalExecutor",
    bash_command="echo 'hello world!'",
)

# 또는

@task(executor="LocalExecutor")
def hello_world():
    print("hello world!")
  • 또한 DAG에 대한 기본 Executor를 설정할 수도 있음
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def hello_world():
    print("hello world!")


def hello_world_again():
    print("hello world again!")


with DAG(
    dag_id="hello_worlds",
    default_args={"executor": "LocalExecutor"},  # Applies to all tasks in the Dag
) as dag:
    # All tasks will use the executor from default args automatically
    hw = hello_world()
    hw_again = hello_world_again()

Reference

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#executor-types

Comments