python 멀티스레딩, 비동기!

threading

스레드를 사용하여 동시성 프로그래밍을 할 수 있게 해주는 모듈

  • Thread
    스레드를 생성하고 제어할 수 있는 기본 클래스
  • Lock
    스레드 간의 동기화를 위한 잠금 메커니즘을 제공
  • Event
    스레드 간의 신호 전달을 위한 이벤트 메커니즘을 제공
  • Timer
    일정 시간 후에 함수를 실행하도록 타이머를 설정

threading.Thread

Thread 생성 및 실행시 동시에 실행되다보니 어떤 코드가 먼저 수행될지 알 수 없다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading

def print_numbers():
for i in range(1, 6):
print(f"Number: {i}")

def print_letters():
for letter in ['A', 'B', 'C', 'D', 'E']:
print(f"Letter: {letter}")

# 스레드 생성
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)

# 스레드 시작
thread1.start()
thread2.start()

# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()

print("All threads have finished.")
# Number: 1
# Number: 2
# Number: 3
# Number: 4
# Letter: A
# Number: 5
# Letter: B
# Letter: C
# Letter: D
# Letter: E
# All threads have finished.

threading.Lock

동시 접근하는 변수에 대해서는 락 처리를 할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading

# 전역변수(동시접근)
lock = threading.Lock()
shared_resource = 0

def increment():
global shared_resource
for _ in range(1000000):
with lock:
shared_resource += 1

def decrement():
global shared_resource
for _ in range(1000000):
with lock:
shared_resource -= 1

# 스레드 생성
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)

# 스레드 시작
thread1.start()
thread2.start()

# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()

print(f"Final value of shared_resource: {shared_resource}") # 0

10만번 반복했음에도 lock 을 통해 접근을 제어시켜 0이 출력됨

Event

외부에서 thread 를 제어시킬 수 있는 방법이 있다.

threading.Event 를 생성한 thread 에 등록하고 flag 값을 통해 thread 의 중지를 할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import threading
import time


def wait_for_event(e: threading.Event):
print("Thread started and waiting for event to be set")
e.wait() # Event 가 set 되기를 무한 대기
print("Event has been set, resuming thread")


def wait_for_event_timeout(e: threading.Event, t):
while not e.is_set():
print(f"Thread checking for event, will timeout in {t} seconds")
event_is_set = e.wait(t) # t 초만큼 Event 가 set 되기를 대기
if event_is_set:
print("Event has been set, resuming thread")
else:
print("Timeout occurred, continuing to check for event")


if __name__ == '__main__':
# Event 객체 생성
event1 = threading.Event()
event2 = threading.Event()

# 스레드 생성 및 시작
thread1 = threading.Thread(target=wait_for_event, args=(event1,))
thread2 = threading.Thread(target=wait_for_event_timeout, args=(event2, 1))

# 스레드 시작
thread1.start()
thread2.start()

# 잠시 대기 후 이벤트 설정
time.sleep(5)
event1.set()
print("event1 setting complete")

# 잠시 대기 후 이벤트 설정
time.sleep(5)
event2.set()
print("event2 setting complete")

# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()

print("All threads have finished.")
  • is_set()
    flag==True 인지 확인.
  • set()
    flag=True 로 설정.
  • clear()
    flag=False 로 설정.
  • wait(timeout=None)
    flag==True 될 때까지 대기. timeout 이 설정된 경우, 지정된 시간 동안만 대기.

flag 를 통해 wait 중인 thread 코드를 이어서 진행할지 결정할 수 있다.
위와 같이 while not e.is_set() 같은 조건을 통해 thread 종료도 결정할 수 있다.

보통 아래와 같이 threading.Timer 와 같이 사용되는 경우가 많음으로 참고

1
2
3
4
5
# Event 객체 생성
event = threading.Event()

# 5초 후에 이벤트를 설정하는 타이머 생성
timer = threading.Timer(5.0, set_event, [event])

Thread 상속

threading.Thread 를 상속받아서 멀티스레드 역할을 수행하는 클래스를 정의할 수 있음.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class MyThread(threading.Thread):
def __init__(self, name, event):
super().__init__()
self.name = name
self.event = event

def run(self):
print(f"{self.name} is waiting for the event to be set.")
self.event.wait() # 이벤트가 설정될 때까지 대기
for i in range(1, 6):
print(f"{self.name} - Number: {i}")
time.sleep(1)

# Event 객체 생성
event = threading.Event()

# 스레드 생성
thread1 = MyThread("Thread 1", event)
thread2 = MyThread("Thread 2", event)

# 스레드 시작
thread1.start()
thread2.start()

# 잠시 대기 후 이벤트 설정
time.sleep(5)
print("Main thread setting event")
event.set()

# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()

print("All threads have finished.")

threading 예제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import datetime

def set_interval(sec):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
stopped = threading.Event()

def loop():
while not stopped.wait(sec):
func(*args, **kwargs)

threading.Thread(target=loop, daemon=True).start()
return stopped
return wrapper
return decorator


@set_interval(2)
def i_wanna_interval(msg: str):
print(datetime.datetime.now().second, msg)


if __name__ == '__main__':
# Interval 함수 시작
stop_event = i_wanna_interval("hello")
threading.Timer(11, stop_event.set).start()
# 47 hello
# 49 hello
# 51 hello
# 53 hello
# 55 hello

generator

python generator 는 반복(iteration)을 생성하는 특별한 유형의 함수.

제너레이터는 이터레이터 처럼 작동하는 객체yield, next 키워드를 사용해 이터레이터 프로토콜 구현체 정의가 가능하다.

yield 키워드를 사용하여 순회할 요소를 반환받는다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import inspect

def simple_generator():
yield 1
yield 2
yield 3

if __name__ == '__main__':
gen = simple_generator()

for value in gen:
print(value, end=",") # 1,2,3,
print(inspect.isgeneratorfunction(simple_generator)) # True
print(inspect.getgeneratorstate(gen)) # GEN_CLOSED
gen.close()
  • inspect.isgeneratorfunction
    generator 함수인지 확인 가능

  • inspect.getgeneratorstate
    generator 현 상태(GEN_CREATED, GEN_SUSPENDED, GEN_CLOSED) 확인 가능.

range 클래스역시 generator 를 사용하는 형식으로 구현되어 있어
백만개의 정수를 모두 만드는 것이 아니라 50000 에 이르렀을 때 종료시킨다.

1
2
3
4
for value in range(10000000):
if value == 50000:
print("Found it")
break

실제 아래와 같은 제너레이터 함수라 할 수 있다.

1
2
3
def generator(my_rage):
for i in range(my_range):
yield i

generator 컴프리헨션

list, dict 와 마찬가지로 컴프리핸션 문법을 사용해 쉽게 generator 생성이 가능하다.

1
2
3
4
5
6
7
8
9
10
11
12
# ( (표현식) for (항목1) in (리스트) if (조건문) )

if __name__ == '__main__':
gen = (i for i in range(0, 10))
for value in gen:
print(value, end=",")
# 0,1,2,3,4,5,6,7,8,9,

gen = (i * 2 for i in range(0, 9) if i % 2 == 0)
for value in gen:
print(value, end=",")
# 0,4,8,12,16,

generator - next

for 문을 사용하지 않고 next 함수를 통해 generator 순회 가능하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def my_generator():
print("hello")
yield 1 # StopIteration 발생
print("world")
yield 2 # StopIteration 발생
print("python")
yield 'a'

if __name__ == '__main__':
g = my_generator()
print(type(g)) # <class 'generator'>
print(next(g)) # hello \n 1
print(next(g)) # world \n 2
print(next(g)) # python \n a

next 를 호출하여 generator 초기화와 동시에 yield 직전까지 실행시킨다.
generator 의 호출 과정은 체인형태로 스택에 쌓인며 중단되 지점에 모든 상태(변수, 명령포인터, 연산스택 등)가 모두 보존된다.

generator - send

next 메서드를 통해 yield 를 실행하면 반환값이 없지만
send 메서드를 통해 yield 를 실행하면 제너레이터에 값을 전달헤 반환값이 생긴다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def calculator():
total = 0
while True:
x = yield total
if x is None:
break
total += x


if __name__ == "__main__":
calc_gen = calculator()
# 제너레이터를 초기화
print(next(calc_gen)) # 0

# 값을 보내고 총합 출력
print(calc_gen.send(10)) # 10,
print(calc_gen.send(20)) # 30
print(calc_gen.send(-5)) # 25

# 제너레이터 종료
calc_gen.close()

첫번째 send 를 만나게 되면 값의 전달과 동시에 while 을 한번 돌고 yield 를 만나게 되면서 total 반환한다.

주의할 점은 sendgenerator 의 초기화 후 GEN_SUSPENDED 상태에서 호출 가능하다는 것.
최조 한번은 send(None) 을 호출하거나 next() 를 호출해서 GEN_SUSPENDED 상태로 만들어두어야 한다.

await, async

파이썬의 비동기 함수는 이벤트 루프 형태로 실행되며 추가 스레드를 생성하지 않는다.

async 키워드로 비동기 함수를 정의할 수 있으며 await 키워드로 async 실행완료 대기하는데 사용한다.
await 는 비동기 함수 안에서만 사용할 수 있으며 async 함수를 호출하거나 awaitable 객체 결과를 기다리는 데 사용된다.

Awaitable 클래스 종류는 아래와 같다.

1
2
3
             ┌──Coroutine
Awaitable◄───┤
└──Future◄────Task

await, async 를 통해 쉽게 코루틴 형태의 코드를 작성할 수 있다.

asyncio

비동기 프로그래밍을 위한 Python 표준 라이브러리

run, gather, sleep

  • asyncio.run()
    주어진 코루틴을 실행하고 완료될 때까지 대기,
    async 함수가 아님으로 await 키워드를 같이 사용할 필요가 없다.
  • asyncio.gather()
    여러 코루틴을 동시에 실행하고, 모든 코루틴이 완료될 때까지 대기
  • asyncio.sleep()
    지정된 시간 동안 비동기적으로 대기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")

async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")

async def main():
# 두 비동기 함수를 동시에 실행
await asyncio.gather(task1(), task2())


if __name__ == '__main__':
asyncio.run(main())
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 finished

Future, get_running_loop

  • asyncio.Future()
    미래에 값을 설정할 수 있는 객체를 생성. 보통 콜백 또는 이벤트와 함께 사용한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
async def set_future_value(future):
await asyncio.sleep(1)
future.set_result("Hello, World!")


async def main():
future: Future = asyncio.Future()
await asyncio.gather(set_future_value(future))
result = await future
print(result) # Hello, World!

if __name__ == '__main__':
asyncio.run(main())

set_exception 을 통해 예외를 발생시킬 수 있다.

1
future.set_exception(Exception('Something went wrong'))
  • asyncio.get_running_loop()
    현재 실행 중인 이벤트 루프를 반환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def set_future_value(future):
await asyncio.sleep(1)
future.set_result("Hello, World!")

async def main():
loop: AbstractEventLoop = asyncio.get_running_loop()
future: Future = loop.create_future()

await asyncio.gather(set_future_value(future))

result = await future
print(result)

if __name__ == '__main__':
asyncio.run(main())

asyncio.Future() 보단 현재 이벤트 루프에서 Future 를 생성하는 loop.create_future() 사용을 권장한다.

사실 Future 는 내부에서 비동기 프로세스를 정의하고 싶을 때 사용하는 듯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio

async def main():
loop = asyncio.get_running_loop()

# Future 객체 생성
future = loop.create_future()

# Future 를 사용한 내부 비동기 프로세스 정의
async def set_future_result():
await asyncio.sleep(2)
future.set_result("Future result")
# Future 모니터링
async def monitor_future():
while not future.done():
print("Waiting for future to complete...")
await asyncio.sleep(0.5)
print("Future completed!")

monitor_task = asyncio.create_task(monitor_future())
result_task = asyncio.create_task(set_future_result())

# 모든 태스크가 완료될 때까지 기다림
await asyncio.gather(monitor_task, result_task)
print(f"Result: {future.result()}")

if __name__ == '__main__':
asyncio.run(main())
# Waiting for future to complete...
# Waiting for future to complete...
# Waiting for future to complete...
# Waiting for future to complete...
# Future completed!
# Result: Future result

get_event_loop

  • asyncio.get_event_loop()
    현재 실행 중인 이벤트 루프를 반환, 만약 존재하지 않는다면 새 이벤트 루프를 생성

아래와 같이 일반 함수안에서도 이벤트 루프(async)를 생성하고 실행시킬 수 있다.(의미는 없지만)

1
2
3
4
5
6
7
8
9
10
11
12
def synchronous_function():
loop = asyncio.get_event_loop()
if loop.is_running():
print("Event loop is already running.")
else:
print("Event loop is not running. Creating and running a new loop.")
loop.run_until_complete(asyncio.sleep(1))


if __name__ == '__main__':
synchronous_function()
# Event loop is not running. Creating and running a new loop.

create_task, wait, wait_for

  • asyncio.create_task()
    코루틴 함수를 관리를 위한 Task 변수를 생성 및 실행
  • asyncio.wait()
    Task 변수의 실행 및 대기, default return_when=ALL_COMPLETED 시 종료됨
  • asyncio.wait_for
    timeout 을 설정하고 해당 시간안에 완료될 수 있도록 대기.

TaskFuture 상속받은 클래스이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
async def task1():
await asyncio.sleep(1)
return 'Task 1 complete'


async def task2():
await asyncio.sleep(2)
return 'Task 2 complete'


async def task3():
await asyncio.sleep(3)
return 'Task 3 complete'


async def long_task():
print("Task started")
await asyncio.sleep(5) # 5초 동안 작업을 수행
print("Task finished")
return "Result of the task"


async def main():
# create_task를 사용하여 Task 생성 및 실행
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
t3 = asyncio.create_task(task3())

# FIRST_COMPLETED(첫 번째 태스크가 완료될 때까지 기다림), ALL_COMPLETED(default), FIRST_EXCEPTION
done, pending = await asyncio.wait([t1, t2, t3], return_when=asyncio.FIRST_COMPLETED)
# done, pending 은 set[Future] 형태의 객체

# 완료된 태스크 결과 출력
for task in done:
print(task.result())

# 미완 태스크 결과 출력
for task in pending:
print(task)

try:
# long_task가 3초 내에 완료되도록 기다림
result = await asyncio.wait_for(long_task(), timeout=3)
print(f"Task completed: {result}")
except asyncio.TimeoutError:
print("Task timed out!")



if __name__ == '__main__':
asyncio.run(main())
# Task 2 complete
# Pending tasks:
# <Task pending name='Task-2' coro=<task1() running at /Users/kouzie/test.py:19> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x104d9b2e0>()]>>
# <Task pending name='Task-4' coro=<task3() running at /Users/kouzie/test.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x104d9b340>()]>>
# Task started
# Task timed out!

Event, Queue

  • asyncio.Event()
    비동기 이벤트 객체를 생성. 공유된 이벤트 객체를 통해 비동기적으로 코드를 수행한다.
    threding 의 Event 함수와 비슷.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def waiter(event):
print("Waiting for event...")
await event.wait()
print("Event received!")


async def setter(event):
await asyncio.sleep(2)
event.set()


async def main():
event: Event = asyncio.Event()
await asyncio.gather(waiter(event), setter(event))

if __name__ == '__main__':
asyncio.run(main())
  • asyncio.Queue()
    비동기적으로 동작하는 큐를 생성. 생산자-소비자 패턴을 구현할때 좋음.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def producer(queue):
for i in range(5):
await queue.put(i)
await asyncio.sleep(1)

async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
queue.task_done()

async def main():
queue: Queue[int] = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))

if __name__ == '__main__':
asyncio.run(main())