[시리즈] 파이썬에서 여러 작업을 동시에 처리하기
파이썬 동시성 처리: 비동기, 스레딩, 멀티프로세싱 비교
- [Python] 파이썬에서 여러 작업을 동시에 처리하는 방법
- [Python] 동기와 비동기의 개념, 그리과 파이썬에서의 예시
- ▶ [Python] asyncio를 이용한 비동기 처리
asyncio를 이용한 비동기 처리
이번 포스팅에서는 파이썬의 기본적인 비동기 처리 내장 모듈 asyncio 의 사용 방법, 그리고 이를 통해 살펴볼 수 있는 파이썬에서의 비동기 처리 전략에 대해 알아보도록 하겠다.
1. async
async 는 비동기 함수를 정의하는 키워드이다. 이 키워드를 붙이면, 그 함수는 일반 함수가 아닌 코루틴 함수가 된다.
1
2
async def fetch_data():
return "데이터"
이 함수는 호출한다고 해서 바로 실행되는 것이 아니라, 코루틴 객체를 반환하게 된다. 이렇게 만들어진 코루틴을 await 으로 결과를 기다리거나, create_task 또는 gather 등으로 실행을 예약해야 실제 동작하게 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
async def fetch_data():
return "데이터"
async def main():
result = fetch_data()
print(f"1 : {result}")
result = await fetch_data()
print(f"2 : {result}")
asyncio.run(main())
1
2
1 : <coroutine object fetch_data at 0x1018b6400>
2 : 데이터
또한 함수 앞에 async 를 붙였다고 해서 자동으로 비동기 처리가 되는 것은 아니며, 해당 함수 내부에서 동작하는 작업들에 대해서도 비동기적으로 대기할 수 있도록 처리하거나, 비동기적으로 작동할 수 있는 작업들을 사용해야 이벤트 루프가 다른 작업을 실행할 수 있다.
- 비동기 함수 내에 동기 작업을 사용한 경우
1
2
3
4
5
6
7
8
9
10
11
import asyncio
import time
async def timer_sync():
start = time.time()
time.sleep(2)
print(f"{time.time() - start:3.1f} :: wakeup!")
asyncio.run(timer_sync())
# --> 2.0초 :: wakeup!
- 비동기 함수 내에 비동기 작업을 사용한 경우
1
2
3
4
5
6
7
8
9
10
11
import asyncio
import time
async def timer_async():
start = time.time()
asyncio.sleep(2)
print(f"{time.time() - start:3.1f}초 :: wakeup!")
asyncio.run(timer_async())
# --> 0.0초 :: wakeup!
2. Coroutine 코루틴
asyncio 비동기 처리의 기본 실행 단위로, 실행을 중간에 멈췄다가, 나중에 멈춘 지점부터 다시 이어서 실행할 수 있는 함수를 의미한다.
파이썬에서는 async def 로 정의된 함수가 바로 코루틴 함수가 되며, 이 함수를 호출하면 바로 실행되는 게 아니라 코루틴 객체가 만들어져 반환된다.
코루틴 객체를 실행시키는 방법은 await, asyncio.run(), create_task() 같은 방식으로 실행해야 실제로 동작하게 된다.
코루틴은 await 지점에서 잠시 멈출 수 있고, 그동안 이벤트 루프는 다른 코루틴을 실행하게 된다. 이러한 특성 덕분에 asyncio 는 하나의 스레드 안에서도 대기시간을 최소화하면서 여러 작업을 효울적으로 처리할 수 있는 것이다.
1
2
3
4
5
코루틴 A 실행
→ await 지점에서 잠시 멈춤
→ 코루틴 B 실행
→ 코루틴 B도 await 지점에서 멈춤
→ 코루틴 A가 다시 이어서 실행됨
3. asyncio를 이용한 비동기 처리의 종류
asyncio 는 단순히 async 와 await 만 사용하는 건 아니다. 작업을 언제 시작할지, 결과를 어떻게 모을지, 먼저 끝나는 작업을 어떻게 처리할지에 따라 다양한 방식을 활용할 수 있다.
| 방식 | 용도 |
|---|---|
await |
비동기 작업의 결과를 기다림 |
create_task() |
작업을 실행 예약 |
gather() |
여러 작업을 동시에 실행하고 결과 수집 |
wait() |
완료/대기 작업 상태 관리 |
as_completed() |
끝나는 순서대로 결과 처리 |
TaskGroup |
여러 작업을 구조적으로 관리 |
asyncio를 이용한 비동기 처리
1. await
가장 기본적인 asyncio 사용 방식으로, 비동기 함수의 결과가 필요할 때 사용한다.
await 은 실행하고자 하는 작업 함수 앞에 키워드로 붙여주며, 이는 이 작업이 끝날 때까지 기다려라. 다만 다른 코루틴 작업 흐름은 중단시키지 마라 라는 동작을 하게 된다.
즉, await 만 주르륵 순서대로 쓴다고 해서 코루틴 작업 교체가 일어나는 것이 아니다.
1
result = await fetch_data()
아래는 await 을 사용한 예시이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time
import asyncio
start = time.time()
def time_log(msg):
print(f"{time.time()-start:4.1f}초 :: {msg}")
async def run_washing_machine():
time_log("세탁기 시작")
await asyncio.sleep(4)
time_log("세탁기 종료")
async def run_robot_vacuum():
time_log("로봇청소기 시작")
await asyncio.sleep(3)
time_log("로봇청소기 종료")
async def take_shower():
time_log("샤워 시작")
await asyncio.sleep(2)
time_log("샤워 종료")
async def study():
time_log("공부 시작")
await asyncio.sleep(2)
time_log("공부 종료")
async def main():
await run_washing_machine()
await run_robot_vacuum()
await take_shower()
await study()
print(f"\n[최종 종료 시각] {time.time() - start:4.1f}초")
asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
0.0초 :: 세탁기 시작
4.0초 :: 세탁기 종료
4.0초 :: 로봇청소기 시작
7.0초 :: 로봇청소기 종료
7.0초 :: 샤워 시작
9.0초 :: 샤워 종료
9.0초 :: 공부 시작
11.0초 :: 공부 종료
[최종 종료 시각] 11.0초
2. create_task()
코루틴을 즉시 실행 예약할 때 사용한다. create_task() 안에 실행하고자 하는 비동기 함수를 파라미터로 넣어주면 된다.
작업을 먼저 시작해두고, 나중에 결과를 받아올 때 사용할 수 있다.
1
2
3
import asyncio
task = asyncio.create_task(fetch_data())
아래는 create_task() 를 사용한 예시
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import time
import asyncio
start = time.time()
def time_log(msg):
print(f"{time.time()-start:4.1f}초 :: {msg}")
async def run_washing_machine():
time_log("세탁기 시작")
await asyncio.sleep(4)
time_log("세탁기 종료")
async def run_robot_vacuum():
time_log("로봇청소기 시작")
await asyncio.sleep(3)
time_log("로봇청소기 종료")
async def take_shower():
time_log("샤워 시작")
await asyncio.sleep(2)
time_log("샤워 종료")
async def study():
time_log("공부 시작")
await asyncio.sleep(2)
time_log("공부 종료")
async def main():
# create_task : 이 작업을 일단 백그라운드로 시작하고, 나는 다른 일을 하다가 나중에 결과를 확인하겠다.
washing_task = asyncio.create_task(run_washing_machine())
vacuum_task = asyncio.create_task(run_robot_vacuum())
# 내가 직접 해야하는 작업들은 동기 처리
await take_shower() # await : 이 작업이 끝날때까지 기다려라. 대신 다른 흐름은 중단시키지 마라
await study()
# 예약해둔 코루틴 작업의 결과물을 받아옴
await washing_task
await vacuum_task
print(f"\n[최종 종료 시각] {time.time() - start:4.1f}초")
asyncio.run(main())
1
2
3
4
5
6
7
8
9
10
0.0초 :: 샤워 시작
0.0초 :: 세탁기 시작
0.0초 :: 로봇청소기 시작
2.0초 :: 샤워 종료
2.0초 :: 공부 시작
3.0초 :: 로봇청소기 종료
4.0초 :: 세탁기 종료
4.0초 :: 공부 종료
[최종 종료 시각] 4.0초
- 이 코드의 흐름을 파이프로 나타내보면 아래와 같다.
1
2
3
4
5
6
7
main()
|
|-- washing_task ───────────4sec────────────────┐
| │
|-- vacumm_task ────────────3sec───────────┐ │
| │ │
|-- await take_shower ───── await study ───┴────┴── await Task 확인 ── 종료
3. gather
gather 는 여러 비동기 작업을 동시에 실행하고, 모든 결과를 한 번에 모을 때 사용한다. 예를 들어 여러 API에 요청을 동시에 보내고, 결과를 리스트 형태로 받을 수가 있다.
1
2
3
4
5
6
7
import asyncio
results = await asyncio.gather(
fetch_data("A"),
fetch_data("B"),
fetch_data("C"),
)
아래는 gather 를 사용한 비동기 작업이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import time
start = time.time()
async def fetch_data(input_char:str):
await asyncio.sleep(68-ord(input_char))
print(f"{time.time()-start:4.1f}초 :: {input_char}")
return input_char.lower()
async def main():
results = await asyncio.gather(
fetch_data("A"),
fetch_data("B"),
fetch_data("C")
)
print(f"\nresults :: {results}")
asyncio.run(main())
1
2
3
4
5
1.0초 :: C
2.0초 :: B
3.0초 :: A
results :: ['a', 'b', 'c']
여기서 알 수 있는 점은, 각 작업의 종료 시점에 관계 없이 gather에 들어간 순선대로 results에 추가된다는 점이다.
4. TaskGroup
Python 3.11 부터 사용할 수 있는 방식으로, 여러 비동기 작업을 구조적으로 관리할 때 사용한다.
TaskGroup 은 보통 with 문과 함께 컨텍스트로 사용되며, 해당 컨텍스트 안에서 정의된 비동기 작업은 함께 묶여 한꺼번에 처리된다. 쉽게 말해, 여러 비동기 작업을 묶은 하나의 동기 작업을 만드는 것이다.
create_tasek() 보다 여러 작업의 생명주기를 안전하게 관리하기 좋다는 장점이 있다.
TaskGroup 안에서 만들어진 코루틴의 결과(반환값)는 작업.result() 메서드를 통해 받아올 수 있다.
1
2
3
4
5
6
7
import asyncio
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(fetch_data("A"))
task_b = tg.create_task(fetch_data("B"))
print(task_a.result())
아래는 TaskGroup 을 이용한 비동기 작업 처리 코드이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time
start = time.time()
async def fetch_data(input_char:str):
await asyncio.sleep(68-ord(input_char))
print(f"{time.time()-start:4.1f}초 :: {input_char}")
return input_char.lower()
async def main():
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(fetch_data("A"))
task_b = tg.create_task(fetch_data("B"))
c_result = await fetch_data("C")
print("out of tg block")
print(task_a.result(), task_b.result(), c_result)
asyncio.run(main())
1
2
3
4
5
1.0초 :: C
2.0초 :: B
3.0초 :: A
out of tg block
a b c
5. wait
여러 작업 중 일부가 끝났는지 확인하거나, 완료된 작업과 아직 진행 중인 작업을 나누어 다룰 때 사용한다.
- 파라미터 : (1) 작업 목록 (2) 몇초 후에 반환할지 (3) 언제 반환할지
- 반환값 : 두 개로, (1) 완료된 Task 집합 (2) 아직 끝나지 않은 Task 집합 이다.
1
2
3
4
5
6
import asyncio
done, pending = await asyncio.wait(tasks, timeout=3)
# done → 완료된 Task 집합
# pending → 아직 끝나지 않은 Task 집합
아래에서 wait 을 사용한 두 가지 방법에 대한 예시 코드를 볼 수 있다.
(1) wait(timeout=n)
n 초 후에 wait 함수를 실행시키는 방법이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time
start = time.time()
async def fetch_data(data:str, duration:int):
await asyncio.sleep(duration)
print(f"{time.time()-start:4.1f}초 :: {data}")
return data.lower()
async def main():
tasks = [
asyncio.create_task(fetch_data("A", 3)),
asyncio.create_task(fetch_data("B", 2)),
asyncio.create_task(fetch_data("C", 1))
]
done, pending = await asyncio.wait(tasks, timeout=3)
print("\n완료된 작업 : ", [task.result() for task in done])
print("\n안 끝난 작업 : ", [task for task in pending])
asyncio.run(main())
1
2
3
4
5
1.0초 :: C
2.0초 :: B
완료된 작업 : ['c', 'b']
안 끝난 작업 : [<Task pending name='Task-2'...]
3.0초 :: A
(2) wait(return_when=’조건’)
특정 조건에 따라 wait 함수로부터 결과를 반환받는 방법이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time
start = time.time()
async def fetch_data(data:str, duration:int):
await asyncio.sleep(duration)
print(f"{time.time()-start:4.1f}초 :: {data}")
return data.lower()
async def main():
tasks = [
asyncio.create_task(fetch_data("A", 3)),
asyncio.create_task(fetch_data("B", 2)),
asyncio.create_task(fetch_data("C", 1))
]
done, pending = await asyncio.wait(tasks,
return_when=asyncio.FIRST_COMPLETED)
print("\n완료된 작업 : ", [task.result() for task in done])
print("\n안 끝난 작업 : ", [task for task in pending])
asyncio.run(main())
1
2
3
4
1.0초 :: C
완료된 작업 : ['c']
안 끝난 작업 : [<Task pending name='Task-2' ...,
<Task pending name='Task-3' ...]
적용할 수 있는 주요 조건은 아래와 같다.
| 옵션 | 의미 | 비고 |
|---|---|---|
asyncio.ALL_COMPLETED |
모든 작업이 끝날 때까지 기다림 | 기본값 |
asyncio.FIRST_COMPLETED |
하나라도 끝나면 반환 | |
asyncio.FIRST_EXCEPTION |
예외가 발생한 작업이 생기면 반환 |
6. as_completed
여러 작업을 실행한 뒤, 끝나는 순서대로 하나씩 결과를 처리할 때 사용한다.
1
2
3
4
5
import asyncio
for task in asyncio.as_completed(tasks):
result = await task
print(result)
아래는 이를 사용한 예시 코드이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time
start = time.time()
async def fetch_data(data:str, duration:int):
await asyncio.sleep(duration)
print(f"{time.time()-start:4.1f}초 :: {data}")
return data.lower()
async def main():
tasks = [
asyncio.create_task(fetch_data("A", 3)),
asyncio.create_task(fetch_data("B", 2)),
asyncio.create_task(fetch_data("C", 1))
]
for task in asyncio.as_completed(tasks):
result = await task
print(f"처리 완료 : {result}\n")
asyncio.run(main())
1
2
3
4
5
6
7
8
1.0초 :: C
처리 완료 : c
2.0초 :: B
처리 완료 : b
3.0초 :: A
처리 완료 : a
[시리즈] 파이썬에서 여러 작업을 동시에 처리하기
파이썬 동시성 처리: 비동기, 스레딩, 멀티프로세싱 비교
- [Python] 파이썬에서 여러 작업을 동시에 처리하는 방법
- [Python] 동기와 비동기의 개념, 그리과 파이썬에서의 예시
- ▶ [Python] asyncio를 이용한 비동기 처리
Comments