python 멀티스레딩, 비동기!
threading
스레드를 사용하여 동시성 프로그래밍을 할 수 있게 해주는 모듈
- Thread
스레드를 생성하고 제어할 수 있는 기본 클래스 - Lock
스레드 간의 동기화를 위한 잠금 메커니즘을 제공 - Event
스레드 간의 신호 전달을 위한 이벤트 메커니즘을 제공 - Timer
일정 시간 후에 함수를 실행하도록 타이머를 설정
threading.Thread
Thread
생성 및 실행시 동시에 실행되다보니 어떤 코드가 먼저 수행될지 알 수 없다.
import threading
def print_numbers():
for i in range(1, 6):
print(f"Number: {i}")
def print_letters():
for letter in ['A', 'B', 'C', 'D', 'E']:
print(f"Letter: {letter}")
# 스레드 생성
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)
# 스레드 시작
thread1.start()
thread2.start()
# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()
print("All threads have finished.")
# Number: 1
# Number: 2
# Number: 3
# Number: 4
# Letter: A
# Number: 5
# Letter: B
# Letter: C
# Letter: D
# Letter: E
# All threads have finished.
threading.Lock
동시 접근하는 변수에 대해서는 락 처리를 할 수 있다.
import threading
# 전역변수(동시접근)
lock = threading.Lock()
shared_resource = 0
def increment():
global shared_resource
for _ in range(1000000):
with lock:
shared_resource += 1
def decrement():
global shared_resource
for _ in range(1000000):
with lock:
shared_resource -= 1
# 스레드 생성
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)
# 스레드 시작
thread1.start()
thread2.start()
# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()
print(f"Final value of shared_resource: {shared_resource}") # 0
10만번 반복했음에도 lock 을 통해 접근을 제어시켜 0
이 출력됨
Event
외부에서 thread 를 제어시킬 수 있는 방법이 있다.
threading.Event
를 생성한 thread
에 등록하고 flag
값을 통해 thread
의 중지를 할 수 있다.
import threading
import time
def wait_for_event(e: threading.Event):
print("Thread started and waiting for event to be set")
e.wait() # Event 가 set 되기를 무한 대기
print("Event has been set, resuming thread")
def wait_for_event_timeout(e: threading.Event, t):
while not e.is_set():
print(f"Thread checking for event, will timeout in {t} seconds")
event_is_set = e.wait(t) # t 초만큼 Event 가 set 되기를 대기
if event_is_set:
print("Event has been set, resuming thread")
else:
print("Timeout occurred, continuing to check for event")
if __name__ == '__main__':
# Event 객체 생성
event1 = threading.Event()
event2 = threading.Event()
# 스레드 생성 및 시작
thread1 = threading.Thread(target=wait_for_event, args=(event1,))
thread2 = threading.Thread(target=wait_for_event_timeout, args=(event2, 1))
# 스레드 시작
thread1.start()
thread2.start()
# 잠시 대기 후 이벤트 설정
time.sleep(5)
event1.set()
print("event1 setting complete")
# 잠시 대기 후 이벤트 설정
time.sleep(5)
event2.set()
print("event2 setting complete")
# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()
print("All threads have finished.")
- is_set()
flag==True
인지 확인. - set()
flag=True
로 설정. - clear()
flag=False
로 설정. - wait(timeout=None)
flag==True
될 때까지 대기.timeout
이 설정된 경우, 지정된 시간 동안만 대기.
flag
를 통해 wait
중인 thread
코드를 이어서 진행할지 결정할 수 있다.
위와 같이 while not e.is_set()
같은 조건을 통해 thread
종료도 결정할 수 있다.
보통 아래와 같이 threading.Timer
와 같이 사용되는 경우가 많음으로 참고
# Event 객체 생성
event = threading.Event()
# 5초 후에 이벤트를 설정하는 타이머 생성
timer = threading.Timer(5.0, set_event, [event])
Thread 상속
threading.Thread
를 상속받아서 멀티스레드 역할을 수행하는 클래스를 정의할 수 있음.
class MyThread(threading.Thread):
def __init__(self, name, event):
super().__init__()
self.name = name
self.event = event
def run(self):
print(f"{self.name} is waiting for the event to be set.")
self.event.wait() # 이벤트가 설정될 때까지 대기
for i in range(1, 6):
print(f"{self.name} - Number: {i}")
time.sleep(1)
# Event 객체 생성
event = threading.Event()
# 스레드 생성
thread1 = MyThread("Thread 1", event)
thread2 = MyThread("Thread 2", event)
# 스레드 시작
thread1.start()
thread2.start()
# 잠시 대기 후 이벤트 설정
time.sleep(5)
print("Main thread setting event")
event.set()
# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()
print("All threads have finished.")
threading 예제
import datetime
def set_interval(sec):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
stopped = threading.Event()
def loop():
while not stopped.wait(sec):
func(*args, **kwargs)
threading.Thread(target=loop, daemon=True).start()
return stopped
return wrapper
return decorator
@set_interval(2)
def i_wanna_interval(msg: str):
print(datetime.datetime.now().second, msg)
if __name__ == '__main__':
# Interval 함수 시작
stop_event = i_wanna_interval("hello")
threading.Timer(11, stop_event.set).start()
# 47 hello
# 49 hello
# 51 hello
# 53 hello
# 55 hello
generator
python generator
는 반복(iteration)을 생성하는 특별한 유형의 함수.
제너레이터는 이터레이터 처럼 작동하는 객체로 yield
, next
키워드를 사용해 이터레이터 프로토콜 구현체 정의가 가능하다.
yield
키워드를 사용하여 순회할 요소를 반환받는다.
import inspect
def simple_generator():
yield 1
yield 2
yield 3
if __name__ == '__main__':
gen = simple_generator()
for value in gen:
print(value, end=",") # 1,2,3,
print(inspect.isgeneratorfunction(simple_generator)) # True
print(inspect.getgeneratorstate(gen)) # GEN_CLOSED
gen.close()
-
inspect.isgeneratorfunction
generator
함수인지 확인 가능 -
inspect.getgeneratorstate
generator
현 상태(GEN_CREATED, GEN_SUSPENDED, GEN_CLOSED) 확인 가능.
range
클래스역시 generator
를 사용하는 형식으로 구현되어 있어
백만개의 정수를 모두 만드는 것이 아니라 50000 에 이르렀을 때 종료시킨다.
for value in range(10000000):
if value == 50000:
print("Found it")
break
실제 아래와 같은 제너레이터 함수라 할 수 있다.
def generator(my_rage):
for i in range(my_range):
yield i
generator 컴프리헨션
list
, dict
와 마찬가지로 컴프리핸션 문법을 사용해 쉽게 generator 생성이 가능하다.
# ( (표현식) for (항목1) in (리스트) if (조건문) )
if __name__ == '__main__':
gen = (i for i in range(0, 10))
for value in gen:
print(value, end=",")
# 0,1,2,3,4,5,6,7,8,9,
gen = (i * 2 for i in range(0, 9) if i % 2 == 0)
for value in gen:
print(value, end=",")
# 0,4,8,12,16,
generator - next
for
문을 사용하지 않고 next
함수를 통해 generator
순회 가능하다.
def my_generator():
print("hello")
yield 1 # StopIteration 발생
print("world")
yield 2 # StopIteration 발생
print("python")
yield 'a'
if __name__ == '__main__':
g = my_generator()
print(type(g)) # <class 'generator'>
print(next(g)) # hello \n 1
print(next(g)) # world \n 2
print(next(g)) # python \n a
next
를 호출하여 generator
초기화와 동시에 yield
직전까지 실행시킨다.
generator
의 호출 과정은 체인형태로 스택에 쌓인며 중단되 지점에 모든 상태(변수, 명령포인터, 연산스택 등)가 모두 보존된다.
generator - send
next
메서드를 통해 yield
를 실행하면 반환값이 없지만
send
메서드를 통해 yield
를 실행하면 제너레이터에 값을 전달헤 반환값이 생긴다.
def calculator():
total = 0
while True:
x = yield total
if x is None:
break
total += x
if __name__ == "__main__":
calc_gen = calculator()
# 제너레이터를 초기화
print(next(calc_gen)) # 0
# 값을 보내고 총합 출력
print(calc_gen.send(10)) # 10,
print(calc_gen.send(20)) # 30
print(calc_gen.send(-5)) # 25
# 제너레이터 종료
calc_gen.close()
첫번째 send
를 만나게 되면 값의 전달과 동시에 while
을 한번 돌고 yield
를 만나게 되면서 total
반환한다.
주의할 점은 send
는 generator
의 초기화 후 GEN_SUSPENDED
상태에서 호출 가능하다는 것.
최조 한번은 send(None)
을 호출하거나 next()
를 호출해서 GEN_SUSPENDED
상태로 만들어두어야 한다.
await, async
파이썬의 비동기 함수는 이벤트 루프 형태로 실행되며 추가 스레드를 생성하지 않는다.
async
키워드로 비동기 함수를 정의할 수 있으며 await
키워드로 async
실행완료 대기하는데 사용한다.
await
는 비동기 함수 안에서만 사용할 수 있으며 async
함수를 호출하거나 awaitable
객체 결과를 기다리는 데 사용된다.
Awaitable
클래스 종류는 아래와 같다.
┌──Coroutine
Awaitable◄───┤
└──Future◄────Task
await, async
를 통해 쉽게 코루틴 형태의 코드를 작성할 수 있다.
asyncio
비동기 프로그래밍을 위한 Python
표준 라이브러리
run, gather, sleep
asyncio.run()
주어진 코루틴을 실행하고 완료될 때까지 대기,
async
함수가 아님으로await
키워드를 같이 사용할 필요가 없다.asyncio.gather()
여러 코루틴을 동시에 실행하고, 모든 코루틴이 완료될 때까지 대기asyncio.sleep()
지정된 시간 동안 비동기적으로 대기
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
async def main():
# 두 비동기 함수를 동시에 실행
await asyncio.gather(task1(), task2())
if __name__ == '__main__':
asyncio.run(main())
# Task 1 started
# Task 2 started
# Task 2 finished
# Task 1 finished
Future, get_running_loop
asyncio.Future()
미래에 값을 설정할 수 있는 객체를 생성. 보통 콜백 또는 이벤트와 함께 사용한다.
async def set_future_value(future):
await asyncio.sleep(1)
future.set_result("Hello, World!")
async def main():
future: Future = asyncio.Future()
await asyncio.gather(set_future_value(future))
result = await future
print(result) # Hello, World!
if __name__ == '__main__':
asyncio.run(main())
set_exception
을 통해 예외를 발생시킬 수 있다.
future.set_exception(Exception('Something went wrong'))
asyncio.get_running_loop()
현재 실행 중인 이벤트 루프를 반환
async def set_future_value(future):
await asyncio.sleep(1)
future.set_result("Hello, World!")
async def main():
loop: AbstractEventLoop = asyncio.get_running_loop()
future: Future = loop.create_future()
await asyncio.gather(set_future_value(future))
result = await future
print(result)
if __name__ == '__main__':
asyncio.run(main())
asyncio.Future()
보단 현재 이벤트 루프에서 Future
를 생성하는 loop.create_future()
사용을 권장한다.
사실 Future
는 내부에서 비동기 프로세스를 정의하고 싶을 때 사용하는 듯
import asyncio
async def main():
loop = asyncio.get_running_loop()
# Future 객체 생성
future = loop.create_future()
# Future 를 사용한 내부 비동기 프로세스 정의
async def set_future_result():
await asyncio.sleep(2)
future.set_result("Future result")
# Future 모니터링
async def monitor_future():
while not future.done():
print("Waiting for future to complete...")
await asyncio.sleep(0.5)
print("Future completed!")
monitor_task = asyncio.create_task(monitor_future())
result_task = asyncio.create_task(set_future_result())
# 모든 태스크가 완료될 때까지 기다림
await asyncio.gather(monitor_task, result_task)
print(f"Result: {future.result()}")
if __name__ == '__main__':
asyncio.run(main())
# Waiting for future to complete...
# Waiting for future to complete...
# Waiting for future to complete...
# Waiting for future to complete...
# Future completed!
# Result: Future result
get_event_loop
asyncio.get_event_loop()
현재 실행 중인 이벤트 루프를 반환, 만약 존재하지 않는다면 새 이벤트 루프를 생성
아래와 같이 일반 함수안에서도 이벤트 루프(async)를 생성하고 실행시킬 수 있다.(의미는 없지만)
def synchronous_function():
loop = asyncio.get_event_loop()
if loop.is_running():
print("Event loop is already running.")
else:
print("Event loop is not running. Creating and running a new loop.")
loop.run_until_complete(asyncio.sleep(1))
if __name__ == '__main__':
synchronous_function()
# Event loop is not running. Creating and running a new loop.
create_task, wait, wait_for
asyncio.create_task()
코루틴 함수를 관리를 위한Task
변수를 생성 및 실행asyncio.wait()
Task
변수의 실행 및 대기,default
return_when=ALL_COMPLETED
시 종료됨asyncio.wait_for
timeout
을 설정하고 해당 시간안에 완료될 수 있도록 대기.
Task
는Future
상속받은 클래스이다.
async def task1():
await asyncio.sleep(1)
return 'Task 1 complete'
async def task2():
await asyncio.sleep(2)
return 'Task 2 complete'
async def task3():
await asyncio.sleep(3)
return 'Task 3 complete'
async def long_task():
print("Task started")
await asyncio.sleep(5) # 5초 동안 작업을 수행
print("Task finished")
return "Result of the task"
async def main():
# create_task를 사용하여 Task 생성 및 실행
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
t3 = asyncio.create_task(task3())
# FIRST_COMPLETED(첫 번째 태스크가 완료될 때까지 기다림), ALL_COMPLETED(default), FIRST_EXCEPTION
done, pending = await asyncio.wait([t1, t2, t3], return_when=asyncio.FIRST_COMPLETED)
# done, pending 은 set[Future] 형태의 객체
# 완료된 태스크 결과 출력
for task in done:
print(task.result())
# 미완 태스크 결과 출력
for task in pending:
print(task)
try:
# long_task가 3초 내에 완료되도록 기다림
result = await asyncio.wait_for(long_task(), timeout=3)
print(f"Task completed: {result}")
except asyncio.TimeoutError:
print("Task timed out!")
if __name__ == '__main__':
asyncio.run(main())
# Task 2 complete
# Pending tasks:
# <Task pending name='Task-2' coro=<task1() running at /Users/kouzie/test.py:19> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x104d9b2e0>()]>>
# <Task pending name='Task-4' coro=<task3() running at /Users/kouzie/test.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x104d9b340>()]>>
# Task started
# Task timed out!
Event, Queue
asyncio.Event()
비동기 이벤트 객체를 생성. 공유된 이벤트 객체를 통해 비동기적으로 코드를 수행한다.
threding 의 Event 함수와 비슷.
async def waiter(event):
print("Waiting for event...")
await event.wait()
print("Event received!")
async def setter(event):
await asyncio.sleep(2)
event.set()
async def main():
event: Event = asyncio.Event()
await asyncio.gather(waiter(event), setter(event))
if __name__ == '__main__':
asyncio.run(main())
asyncio.Queue()
비동기적으로 동작하는 큐를 생성. 생산자-소비자 패턴을 구현할때 좋음.
async def producer(queue):
for i in range(5):
await queue.put(i)
await asyncio.sleep(1)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue: Queue[int] = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
if __name__ == '__main__':
asyncio.run(main())