Docker Operator
1. 소개
- Airflow에서 Docker 컨테이너를 하나의 Task 실행 단위로 다룰 수 있게 해주는 Operator
- 이를 사용하면 특정 작업을 별도의 컨테이너 환경에서 실행할 수 있다.
- 이 덕분에 호스트나 Airflow 환경과 분리된 상태로 작업을 처리할 수 있다.
- Python 패키지 충돌, 시스템 라이브러리 차이, 실행 환경 문제를 줄일 수 있다.
- 특히나 데이터 전처리, 모델 훈련 등 독립된 실행 환경이 필요한 작업에 안성맞춤!
- 다만, Airflow가 Docker 위에서 동작하는 경우에는 추가 설정이 필요하다.
2. 환경 준비
- Docker 위에서 Airflow 가 동작하는 경우에 필요한 환경 준비 내용
- 본 포스팅에서는 1번(Host의 Docker Engine을 이용) 방법을 적용했다.
(1) Host의 docker engine을 이용
- 첫 번째 방법은 Host의 docker engine을 이용하는 방법이다.
- 이를 위해 Airflow Container는 Host의 docker 소켓에 접근이 가능해야 한다.
- 이를 위해 Host의 docker.sock를 마운트 해준다.
1
2
3
4
5
6
7
8
9
10
x-airflow-common:
&airflow-common
...(중략)
volumes:
- ${AIRFLOW_PROJ_DIR:-./airflow}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-./airflow}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-./airflow}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-./airflow}/plugins:/opt/airflow/plugins
- /var/run/docker.sock:/var/run/docker.sock # 추가
...(후략)
(2) DinD 이용
- 또는 DinD(Docker-in-Docker)를 이용할 수도 있다.
- 하지만 이는 권한 설정이 까다롭고 보안상 위험할 수 있어 권장하지 않는다.
- 대신 KubernetesPodOperator 사용을 권장하는 추세이다.
3. 단일 DockerOperator Task 예제
airflow.providers.docker.operators.docker의DockerOperator를 사용한다.- 아래는 도커를 실행시킨 뒤 “hello from container”를 출력하는 예제 코드이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime
with DAG(
dag_id="run_process_via_host_docker",
start_date=datetime(2026,1,1),
schedule=None,
catchup=False
) as dag:
run_job = DockerOperator(
task_id="run_job",
image="python:3.11-slim",
command="python -c \"print('hello from container')\"",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
auto_remove="success",
)
- 위와 같이 만든 py 파일을 DAG 디렉터리에 넣고, Airflow에서 DAG가 인식되면 Run

- 실행 결과

4. 소스코드나 데이터셋을 Mount 하기
(1) 개요
- DockerOperator를 사용하는 가장 핵심적인 이유는 “실행환경의 격리” 라고 생각한다.
- 그리고 이러한 격리가 필요한 경우는, 보통 크고 복잡한 작업인 경우들이 대부분이라고 생각한다.
- 따라서 여러 소스코드나 데이터셋을 Task에서 처리할 일이 많을 것이다.
- 이 때 필요한 것이 바로 Mount 기능이다.
(2) 방법
- Airflow 워커가 접근 가능한 경로에 코드와 데이터셋을 둔다.
- 그 경로를 새로 띄워지는 컨테이너 안에 마운트한다.
- 먼저, 컨테이너에서 실행시킬 파이썬 파일 하나를 만들어봤다.
- 이 파이썬 코드는 20초동안 sleep 하다가 일어나 텍스트 파일에 적힌 자신의 이름을 외친다.
1
2
3
4
5
6
7
8
9
# sleeping.py
import time
with open("/workspace/src/data/name.txt", "r", encoding="utf-8") as f:
name = f.readline()
print("Go to Sleep...")
time.sleep(20)
print(f"Wake Up!!!! my name is {name}!!")
- 그리고 이를 불러와 컨테이너에서 실행시키는 Task 를 포함하는 DAG도 만들었다.
- 명심할 것은, source 즉 원천 데이터가 호스트에서 접근 가능한 경로로 작성되어야 한다는 것
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# docker_operator_mount_example.py
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
from datetime import datetime
with DAG(
dag_id="docker_mount",
start_date=datetime(2026,1,1),
schedule=None,
catchup=False
) as dag:
sleeping = DockerOperator(
task_id="sleeping",
image="python:3.11-slim",
command="python /workspace/src/scripts/sleeping.py",
docker_url="unix://var/run/docker.sock",
mounts=[
Mount(
source="/opt/airflow/src/scripts", # 원천데이터 : 호스트에서 접근 가능한 경로여야 함 (Airflow 도커 안쪽이 아님!)
target="/workspace/src/scripts", # 타겟 : 워커 컨테이너 내에 마운트할 경로
type="bind", # 마운트 유형
),
Mount(
source="/opt/airflow/src/data",
target="/workspace/src/data",
type="bind",
read_only=True # 마운트 읽기전용 여부
),
],
network_mode="bridge",
working_dir="/workspace",
auto_remove="success",
)
- 파일의 위치를 그려보면 다음과 같다.(호스트 기준)
1
2
3
4
5
6
7
8
9
10
11
12
13
opt
├─airflow
│ ├─config
│ ├─dags
│ │ └─docker_operator_mount_example.py
│ ├─logs
│ ├─plugins
│ ├─src
│ │ ├─scripts
│ │ │ └─sleeping.py
│ │ └─data
│ │ └─name.txt
...
(3) 실행 결과
- DAG가 정상적으로 잡히면, 트리거로 실행한다.
- sleep 도중에 호스트에서
docker ps명령어를 실행하면 아래와 같이 워커 컨테이너가 출력되는 것도 볼 수 있다.
1
2
3
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED ...
a12bc98c1221 python:3.11-slim "python /workspace/s…" 8 seconds ago ...
- 실행 결과 로그를 보면, 정상적으로 자신의 이름을 외치는 것을 볼 수 있다.

5. 다중 DockerOperator Task 예제
- DockerOperator 는 각 Task마다 각각의 컨테이너를 띄운다.
- 따라서 여러 개의 DockerOperator Task가 있다면, 해당 DAG에서는 여러 컨테이너들이 띄워졌다 종료된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# docker_operator_multi_task_examply.py
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime
with DAG(
dag_id="multi_task_docker_operator",
start_date=datetime(2026,1,1),
schedule=None,
catchup=False
) as dag:
first_job = DockerOperator(
task_id="first_job",
image="python:3.11-slim",
command="python -c \"print('hello from container')\"",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
auto_remove="success",
)
second_job = DockerOperator(
task_id="second_job",
image="python:3.11-slim",
command="python -c \"print('2nd job is here!')\"",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
auto_remove="success",
)
- 실행 결과


Comments