[시리즈] 파이썬에서 여러 작업을 동시에 처리하기

파이썬 동시성 처리: 비동기, 스레딩, 멀티프로세싱 비교

asyncio를 이용한 비동기 처리

이번 포스팅에서는 파이썬의 기본적인 비동기 처리 내장 모듈 asyncio 의 사용 방법, 그리고 이를 통해 살펴볼 수 있는 파이썬에서의 비동기 처리 전략에 대해 알아보도록 하겠다.

1. async

async 는 비동기 함수를 정의하는 키워드이다. 이 키워드를 붙이면, 그 함수는 일반 함수가 아닌 코루틴 함수가 된다.

1
2
async def fetch_data():
    return "데이터"

이 함수는 호출한다고 해서 바로 실행되는 것이 아니라, 코루틴 객체를 반환하게 된다. 이렇게 만들어진 코루틴을 await 으로 결과를 기다리거나, create_task 또는 gather 등으로 실행을 예약해야 실제 동작하게 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def fetch_data():
    return "데이터"

async def main():
    result = fetch_data()
    print(f"1 : {result}")
    
    result = await fetch_data()
    print(f"2 : {result}")

asyncio.run(main())
1
2
1 : <coroutine object fetch_data at 0x1018b6400>
2 : 데이터


또한 함수 앞에 async 를 붙였다고 해서 자동으로 비동기 처리가 되는 것은 아니며, 해당 함수 내부에서 동작하는 작업들에 대해서도 비동기적으로 대기할 수 있도록 처리하거나, 비동기적으로 작동할 수 있는 작업들을 사용해야 이벤트 루프가 다른 작업을 실행할 수 있다.

  • 비동기 함수 내에 동기 작업을 사용한 경우
1
2
3
4
5
6
7
8
9
10
11
import asyncio
import time

async def timer_sync():
    start = time.time()
    time.sleep(2)
    print(f"{time.time() - start:3.1f} :: wakeup!")

asyncio.run(timer_sync())

# --> 2.0초 :: wakeup!
  • 비동기 함수 내에 비동기 작업을 사용한 경우
1
2
3
4
5
6
7
8
9
10
11
import asyncio
import time

async def timer_async():
    start = time.time()
    asyncio.sleep(2)
    print(f"{time.time() - start:3.1f}초 :: wakeup!")

asyncio.run(timer_async())

# --> 0.0초 :: wakeup!


2. Coroutine 코루틴

asyncio 비동기 처리의 기본 실행 단위로, 실행을 중간에 멈췄다가, 나중에 멈춘 지점부터 다시 이어서 실행할 수 있는 함수를 의미한다.

파이썬에서는 async def 로 정의된 함수가 바로 코루틴 함수가 되며, 이 함수를 호출하면 바로 실행되는 게 아니라 코루틴 객체가 만들어져 반환된다.

코루틴 객체를 실행시키는 방법은 await, asyncio.run(), create_task() 같은 방식으로 실행해야 실제로 동작하게 된다.

코루틴은 await 지점에서 잠시 멈출 수 있고, 그동안 이벤트 루프는 다른 코루틴을 실행하게 된다. 이러한 특성 덕분에 asyncio 는 하나의 스레드 안에서도 대기시간을 최소화하면서 여러 작업을 효울적으로 처리할 수 있는 것이다.

1
2
3
4
5
코루틴 A 실행
→ await 지점에서 잠시 멈춤
→ 코루틴 B 실행
→ 코루틴 B도 await 지점에서 멈춤
→ 코루틴 A가 다시 이어서 실행됨


3. asyncio를 이용한 비동기 처리의 종류

asyncio 는 단순히 asyncawait 만 사용하는 건 아니다. 작업을 언제 시작할지, 결과를 어떻게 모을지, 먼저 끝나는 작업을 어떻게 처리할지에 따라 다양한 방식을 활용할 수 있다.

방식 용도
await 비동기 작업의 결과를 기다림
create_task() 작업을 실행 예약
gather() 여러 작업을 동시에 실행하고 결과 수집
wait() 완료/대기 작업 상태 관리
as_completed() 끝나는 순서대로 결과 처리
TaskGroup 여러 작업을 구조적으로 관리

asyncio를 이용한 비동기 처리

1. await

가장 기본적인 asyncio 사용 방식으로, 비동기 함수의 결과가 필요할 때 사용한다.

await 은 실행하고자 하는 작업 함수 앞에 키워드로 붙여주며, 이는 이 작업이 끝날 때까지 기다려라. 다만 다른 코루틴 작업 흐름은 중단시키지 마라 라는 동작을 하게 된다.

즉, await 만 주르륵 순서대로 쓴다고 해서 코루틴 작업 교체가 일어나는 것이 아니다.

1
result = await fetch_data()

아래는 await 을 사용한 예시이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time
import asyncio

start = time.time()

def time_log(msg):
    print(f"{time.time()-start:4.1f}초 :: {msg}")

async def run_washing_machine():
    time_log("세탁기 시작")
    await asyncio.sleep(4)
    time_log("세탁기 종료")
    
async def run_robot_vacuum():
    time_log("로봇청소기 시작")
    await asyncio.sleep(3)
    time_log("로봇청소기 종료")

async def take_shower():
    time_log("샤워 시작")
    await asyncio.sleep(2)
    time_log("샤워 종료")

async def study():
    time_log("공부 시작")
    await asyncio.sleep(2)
    time_log("공부 종료")

async def main():
    await run_washing_machine()
    await run_robot_vacuum()
    await take_shower()
    await study()
    print(f"\n[최종 종료 시각] {time.time() - start:4.1f}초")

asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
 0.0초 :: 세탁기 시작
 4.0초 :: 세탁기 종료
 4.0초 :: 로봇청소기 시작
 7.0초 :: 로봇청소기 종료
 7.0초 :: 샤워 시작
 9.0초 :: 샤워 종료
 9.0초 :: 공부 시작
11.0초 :: 공부 종료

[최종 종료 시각] 11.0초


2. create_task()

코루틴을 즉시 실행 예약할 때 사용한다. create_task() 안에 실행하고자 하는 비동기 함수를 파라미터로 넣어주면 된다.

작업을 먼저 시작해두고, 나중에 결과를 받아올 때 사용할 수 있다.

1
2
3
import asyncio

task = asyncio.create_task(fetch_data())

아래는 create_task() 를 사용한 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import time
import asyncio

start = time.time()

def time_log(msg):
    print(f"{time.time()-start:4.1f}초 :: {msg}")

async def run_washing_machine():
    time_log("세탁기 시작")
    await asyncio.sleep(4)
    time_log("세탁기 종료")
    
async def run_robot_vacuum():
    time_log("로봇청소기 시작")
    await asyncio.sleep(3)
    time_log("로봇청소기 종료")

async def take_shower():
    time_log("샤워 시작")
    await asyncio.sleep(2)
    time_log("샤워 종료")

async def study():
    time_log("공부 시작")
    await asyncio.sleep(2)
    time_log("공부 종료")

async def main():
    # create_task : 이 작업을 일단 백그라운드로 시작하고, 나는 다른 일을 하다가 나중에 결과를 확인하겠다.
    washing_task = asyncio.create_task(run_washing_machine())
    vacuum_task = asyncio.create_task(run_robot_vacuum())
    
    # 내가 직접 해야하는 작업들은 동기 처리
    await take_shower() # await : 이 작업이 끝날때까지 기다려라. 대신 다른 흐름은 중단시키지 마라
    await study()
    
    # 예약해둔 코루틴 작업의 결과물을 받아옴
    await washing_task
    await vacuum_task
    
    print(f"\n[최종 종료 시각] {time.time() - start:4.1f}초")

asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
 0.0초 :: 샤워 시작
 0.0초 :: 세탁기 시작
 0.0초 :: 로봇청소기 시작
 2.0초 :: 샤워 종료
 2.0초 :: 공부 시작
 3.0초 :: 로봇청소기 종료
 4.0초 :: 세탁기 종료
 4.0초 :: 공부 종료
 
[최종 종료 시각]  4.0초
  • 이 코드의 흐름을 파이프로 나타내보면 아래와 같다.
1
2
3
4
5
6
7
main()
 |
 |-- washing_task ───────────4sec────────────────┐
 |                                               │
 |-- vacumm_task ────────────3sec───────────┐    │
 |                                          │    │
 |-- await take_shower ───── await study ───┴────┴── await Task 확인 ── 종료


3. gather

gather 는 여러 비동기 작업을 동시에 실행하고, 모든 결과를 한 번에 모을 때 사용한다. 예를 들어 여러 API에 요청을 동시에 보내고, 결과를 리스트 형태로 받을 수가 있다.

1
2
3
4
5
6
7
import asyncio

results = await asyncio.gather(
    fetch_data("A"),
    fetch_data("B"),
    fetch_data("C"),
)

아래는 gather 를 사용한 비동기 작업이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import time

start = time.time()

async def fetch_data(input_char:str):
    await asyncio.sleep(68-ord(input_char))
    print(f"{time.time()-start:4.1f}초 :: {input_char}")
    return input_char.lower()
    
async def main():
    results = await asyncio.gather(
        fetch_data("A"),
        fetch_data("B"),
        fetch_data("C")
    )
    print(f"\nresults :: {results}")

asyncio.run(main())
1
2
3
4
5
 1.0초 :: C
 2.0초 :: B
 3.0초 :: A

results :: ['a', 'b', 'c']

여기서 알 수 있는 점은, 각 작업의 종료 시점에 관계 없이 gather에 들어간 순선대로 results에 추가된다는 점이다.


4. TaskGroup

Python 3.11 부터 사용할 수 있는 방식으로, 여러 비동기 작업을 구조적으로 관리할 때 사용한다.

TaskGroup 은 보통 with 문과 함께 컨텍스트로 사용되며, 해당 컨텍스트 안에서 정의된 비동기 작업은 함께 묶여 한꺼번에 처리된다. 쉽게 말해, 여러 비동기 작업을 묶은 하나의 동기 작업을 만드는 것이다.

create_tasek() 보다 여러 작업의 생명주기를 안전하게 관리하기 좋다는 장점이 있다.

TaskGroup 안에서 만들어진 코루틴의 결과(반환값)는 작업.result() 메서드를 통해 받아올 수 있다.

1
2
3
4
5
6
7
import asyncio

async with asyncio.TaskGroup() as tg:
    task_a = tg.create_task(fetch_data("A"))
    task_b = tg.create_task(fetch_data("B"))

print(task_a.result())

아래는 TaskGroup 을 이용한 비동기 작업 처리 코드이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time

start = time.time()

async def fetch_data(input_char:str):
    await asyncio.sleep(68-ord(input_char))
    print(f"{time.time()-start:4.1f}초 :: {input_char}")
    return input_char.lower()
    
async def main():
    
    async with asyncio.TaskGroup() as tg:
        task_a = tg.create_task(fetch_data("A"))
        task_b = tg.create_task(fetch_data("B"))
        
        c_result = await fetch_data("C")
    
    print("out of tg block")
    print(task_a.result(), task_b.result(), c_result)

asyncio.run(main())
1
2
3
4
5
 1.0초 :: C
 2.0초 :: B
 3.0초 :: A
out of tg block
a b c


5. wait

여러 작업 중 일부가 끝났는지 확인하거나, 완료된 작업과 아직 진행 중인 작업을 나누어 다룰 때 사용한다.

  • 파라미터 : (1) 작업 목록 (2) 몇초 후에 반환할지 (3) 언제 반환할지
  • 반환값 : 두 개로, (1) 완료된 Task 집합 (2) 아직 끝나지 않은 Task 집합 이다.
1
2
3
4
5
6
import asyncio

done, pending = await asyncio.wait(tasks, timeout=3)

# done    → 완료된 Task 집합
# pending → 아직 끝나지 않은 Task 집합

아래에서 wait 을 사용한 두 가지 방법에 대한 예시 코드를 볼 수 있다.

(1) wait(timeout=n)

n 초 후에 wait 함수를 실행시키는 방법이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time

start = time.time()

async def fetch_data(data:str, duration:int):
    await asyncio.sleep(duration)
    print(f"{time.time()-start:4.1f}초 :: {data}")
    return data.lower()
    
async def main():
    tasks = [
        asyncio.create_task(fetch_data("A", 3)),
        asyncio.create_task(fetch_data("B", 2)),
        asyncio.create_task(fetch_data("C", 1))
    ]
    
    done, pending = await asyncio.wait(tasks, timeout=3)

    print("\n완료된 작업 : ", [task.result() for task in done])
    print("\n안 끝난 작업 : ", [task for task in pending])

asyncio.run(main())
1
2
3
4
5
1.0초 :: C
2.0초 :: B
완료된 작업 :  ['c', 'b']
안 끝난 작업 :  [<Task pending name='Task-2'...]
3.0초 :: A


(2) wait(return_when=’조건’)

특정 조건에 따라 wait 함수로부터 결과를 반환받는 방법이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time

start = time.time()

async def fetch_data(data:str, duration:int):
    await asyncio.sleep(duration)
    print(f"{time.time()-start:4.1f}초 :: {data}")
    return data.lower()
    
async def main():
    tasks = [
        asyncio.create_task(fetch_data("A", 3)),
        asyncio.create_task(fetch_data("B", 2)),
        asyncio.create_task(fetch_data("C", 1))
    ]
    
    done, pending = await asyncio.wait(tasks,
                                       return_when=asyncio.FIRST_COMPLETED)

    print("\n완료된 작업 : ", [task.result() for task in done])
    print("\n안 끝난 작업 : ", [task for task in pending])

asyncio.run(main())
1
2
3
4
1.0초 :: C
완료된 작업 :  ['c']
안 끝난 작업 : [<Task pending name='Task-2' ...,
             <Task pending name='Task-3' ...]

적용할 수 있는 주요 조건은 아래와 같다.

옵션 의미 비고
asyncio.ALL_COMPLETED 모든 작업이 끝날 때까지 기다림 기본값
asyncio.FIRST_COMPLETED 하나라도 끝나면 반환  
asyncio.FIRST_EXCEPTION 예외가 발생한 작업이 생기면 반환  


6. as_completed

여러 작업을 실행한 뒤, 끝나는 순서대로 하나씩 결과를 처리할 때 사용한다.

1
2
3
4
5
import asyncio

for task in asyncio.as_completed(tasks):
    result = await task
    print(result)

아래는 이를 사용한 예시 코드이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time

start = time.time()

async def fetch_data(data:str, duration:int):
    await asyncio.sleep(duration)
    print(f"{time.time()-start:4.1f}초 :: {data}")
    return data.lower()
    
async def main():
    tasks = [
        asyncio.create_task(fetch_data("A", 3)),
        asyncio.create_task(fetch_data("B", 2)),
        asyncio.create_task(fetch_data("C", 1))
    ]
    
    for task in asyncio.as_completed(tasks):
        result = await task
        print(f"처리 완료 : {result}\n")
    
asyncio.run(main())
1
2
3
4
5
6
7
8
1.0초 :: C
처리 완료 : c

2.0초 :: B
처리 완료 : b

3.0초 :: A
처리 완료 : a

[시리즈] 파이썬에서 여러 작업을 동시에 처리하기

파이썬 동시성 처리: 비동기, 스레딩, 멀티프로세싱 비교

Comments