Airflow Executor
1. Executor의 개념
- 직역하면 실행기
- 작업(Task) 인스턴스가 실행되는 메커니즘, 즉 작업을 어떻게 실행할지 결정하는 핵심 구성요소
- Executor는 플러그인 방식으로 설계되어 있어, 교체가 가능함
2. Executor의 종류
(1) 실행 위치에 따른 구분
- Task를 스케줄러가 위치한 머신에서 실행하느냐, 외부 워커나 클러스터에서 실행하느냐로 구분할 수 있다.
- 위치에 따라 Local과 Remote 로 나눌 수 있다.
| Executor 구분 | 설명 | 예시 |
|---|---|---|
| Local Executor | 스케줄러 노드에서 프로세스를 생성해서 Task를 실행 단일 머신 기반 운영에 적합(즉, 규모가 크지 않은 경우) |
Local Executor |
| Remote Executor | 워커 풀잉나 외부 실행 환경에서 Task를 실행하는 경우 Celery 워커로 분산 실행하는 등의 여러 대 운영인 경우 |
Celery Executor BatchExecutor EdgeExecutor KubernetesEecutor |
(2) 세부 Executor 종류
| Executor | 설명 | 병렬처리 |
|---|---|---|
| SequentialExecutor | - 가장 단순한 실행 방식의 Executor - 한 번에 하나의 Task만을 실행 SQLite 기반 환경에서 사용 가능하므로 별도 DB 필요 없음 테스트, 개발, 개인용에 적합 |
불가능 |
| LocalExecutor | - 하나의 서버에서 멀티프로세싱으로 병렬 실행 가능 - MySQL 또는 PostgreSQL 필요 - 중소 규모 환경에 적합 - 비교적 설정이 간단하며 병렬 처리가 가능 |
가능 |
| CeleryExecutor | - 여러 워커 노드를 두고 분산처리를 하는 Executor - Celery + Redis/RabbitMQ 필요 - 대규모 환경에서 많이 사용 - 확장성이 좋으나 운영 복잡도가 높다는 단점 |
가능 |
| KubernetesExecutor | - Task마다 독립된 K8s Pod을 생성해 작업을 처리 - 클라우드/컨테이너 환겨엥 최적 - 리소스 격리 효과가 강력함 - 유연성이 좋으나, 초기 설정 난이도가 높음 |
가능 |
| CeleryKubernetesExecutor | - 일부는 Celery, 일부는 K8s로 처리하는 혼합형 - 유연하지만 구조가 매우 복잡 |
가능 |
3. Executor 설정하기
- 설정파일(airflow.cfg)에서
[core]섹션의executor부분에서 설정한다.
1
2
3
# airflow.cfg
[core]
executor = KubernetesExecutor
- docker compose 를 사용하는 경우에는
AIRFLOW__CORE__EXECUTOR옵션을 바꿔주면 된다.
(그러면 airflow.cfg에 적용된다.)
1
2
3
4
5
...(전략)
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: ${AIRFLOW_CORE_EXECUTOR:-LocalExecutor}
...(후략)
4. 여러 Executor를 동시에 사용하기
- 2.10.0 버전부터 multi-executor 구성 적용이 가능해짐
- 이를 통해 각각의 Executor가 가지는 장점을 활용하고, 단점은 보완하는 체계 구축이 가능
- 설정파일에서 아래와 가팅 여러 Executor를 쉼표로 구분하여 작성
1
2
3
# airflow.cfg
[core]
executor = LocalExecutor,CeleryExecutor
- 이후 특정 작업에 대해 특정 Executor를 지정하고 싶다면 아래와 같이 활요
1
2
3
4
5
6
7
8
9
10
11
BashOperator(
task_id="hello_world",
executor="LocalExecutor",
bash_command="echo 'hello world!'",
)
# 또는
@task(executor="LocalExecutor")
def hello_world():
print("hello world!")
- 또한 DAG에 대한 기본 Executor를 설정할 수도 있음
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def hello_world():
print("hello world!")
def hello_world_again():
print("hello world again!")
with DAG(
dag_id="hello_worlds",
default_args={"executor": "LocalExecutor"}, # Applies to all tasks in the Dag
) as dag:
# All tasks will use the executor from default args automatically
hw = hello_world()
hw_again = hello_world_again()
Comments