python 멀티스레딩, 비동기!

threading

스레드를 사용하여 동시성 프로그래밍을 할 수 있게 해주는 모듈

  • Thread
    스레드를 생성하고 제어할 수 있는 기본 클래스
  • Lock
    스레드 간의 동기화를 위한 잠금 메커니즘을 제공
  • Event
    스레드 간의 신호 전달을 위한 이벤트 메커니즘을 제공
  • Timer
    일정 시간 후에 함수를 실행하도록 타이머를 설정

threading.Thread

Thread 생성 및 실행시 동시에 실행되다보니 어떤 코드가 먼저 수행될지 알 수 없다.

import threading

def print_numbers():
    for i in range(1, 6):
        print(f"Number: {i}")

def print_letters():
    for letter in ['A', 'B', 'C', 'D', 'E']:
        print(f"Letter: {letter}")

# 스레드 생성
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)

# 스레드 시작
thread1.start()
thread2.start()

# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()

print("All threads have finished.")
# Number: 1
# Number: 2
# Number: 3
# Number: 4
# Letter: A
# Number: 5
# Letter: B
# Letter: C
# Letter: D
# Letter: E
# All threads have finished.

threading.Lock

동시 접근하는 변수에 대해서는 락 처리를 할 수 있다.

import threading

# 전역변수(동시접근)
lock = threading.Lock()
shared_resource = 0

def increment():
    global shared_resource
    for _ in range(1000000):
        with lock:
            shared_resource += 1

def decrement():
    global shared_resource
    for _ in range(1000000):
        with lock:
            shared_resource -= 1

# 스레드 생성
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)

# 스레드 시작
thread1.start()
thread2.start()

# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()

print(f"Final value of shared_resource: {shared_resource}") # 0

10만번 반복했음에도 lock 을 통해 접근을 제어시켜 0이 출력됨

Event

외부에서 thread 를 제어시킬 수 있는 방법이 있다.

threading.Event 를 생성한 thread 에 등록하고 flag 값을 통해 thread 의 중지를 할 수 있다.

import threading
import time


def wait_for_event(e: threading.Event):
    print("Thread started and waiting for event to be set")
    e.wait()  # Event 가 set 되기를 무한 대기
    print("Event has been set, resuming thread")


def wait_for_event_timeout(e: threading.Event, t):
    while not e.is_set():
        print(f"Thread checking for event, will timeout in {t} seconds")
        event_is_set = e.wait(t)  # t 초만큼 Event 가 set 되기를 대기
        if event_is_set:
            print("Event has been set, resuming thread")
        else:
            print("Timeout occurred, continuing to check for event")


if __name__ == '__main__':
    # Event 객체 생성
    event1 = threading.Event()
    event2 = threading.Event()

    # 스레드 생성 및 시작
    thread1 = threading.Thread(target=wait_for_event, args=(event1,))
    thread2 = threading.Thread(target=wait_for_event_timeout, args=(event2, 1))

    # 스레드 시작
    thread1.start()
    thread2.start()

    # 잠시 대기 후 이벤트 설정
    time.sleep(5)
    event1.set()
    print("event1 setting complete")

    # 잠시 대기 후 이벤트 설정
    time.sleep(5)
    event2.set()
    print("event2 setting complete")

    # 스레드가 완료될 때까지 대기
    thread1.join()
    thread2.join()

    print("All threads have finished.")
  • is_set()
    flag==True 인지 확인.
  • set()
    flag=True 로 설정.
  • clear()
    flag=False 로 설정.
  • wait(timeout=None)
    flag==True 될 때까지 대기. timeout 이 설정된 경우, 지정된 시간 동안만 대기.

flag 를 통해 wait 중인 thread 코드를 이어서 진행할지 결정할 수 있다.
위와 같이 while not e.is_set() 같은 조건을 통해 thread 종료도 결정할 수 있다.

보통 아래와 같이 threading.Timer 와 같이 사용되는 경우가 많음으로 참고

# Event 객체 생성
event = threading.Event()

# 5초 후에 이벤트를 설정하는 타이머 생성
timer = threading.Timer(5.0, set_event, [event])

Thread 상속

threading.Thread 를 상속받아서 멀티스레드 역할을 수행하는 클래스를 정의할 수 있음.

class MyThread(threading.Thread):
    def __init__(self, name, event):
        super().__init__()
        self.name = name
        self.event = event

    def run(self):
        print(f"{self.name} is waiting for the event to be set.")
        self.event.wait()  # 이벤트가 설정될 때까지 대기
        for i in range(1, 6):
            print(f"{self.name} - Number: {i}")
            time.sleep(1)

# Event 객체 생성
event = threading.Event()

# 스레드 생성
thread1 = MyThread("Thread 1", event)
thread2 = MyThread("Thread 2", event)

# 스레드 시작
thread1.start()
thread2.start()

# 잠시 대기 후 이벤트 설정
time.sleep(5)
print("Main thread setting event")
event.set()

# 스레드가 완료될 때까지 대기
thread1.join()
thread2.join()

print("All threads have finished.")

threading 예제

import datetime

def set_interval(sec):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop():
                while not stopped.wait(sec):
                    func(*args, **kwargs)

            threading.Thread(target=loop, daemon=True).start()
            return stopped
        return wrapper
    return decorator


@set_interval(2)
def i_wanna_interval(msg: str):
    print(datetime.datetime.now().second, msg)


if __name__ == '__main__':
    # Interval 함수 시작
    stop_event = i_wanna_interval("hello")
    threading.Timer(11, stop_event.set).start()
    # 47 hello
    # 49 hello
    # 51 hello
    # 53 hello
    # 55 hello

generator

python generator 는 반복(iteration)을 생성하는 특별한 유형의 함수.

제너레이터는 이터레이터 처럼 작동하는 객체yield, next 키워드를 사용해 이터레이터 프로토콜 구현체 정의가 가능하다.

yield 키워드를 사용하여 순회할 요소를 반환받는다.

import inspect

def simple_generator():
    yield 1
    yield 2
    yield 3

if __name__ == '__main__':
    gen = simple_generator()

    for value in gen:
        print(value, end=",") # 1,2,3,
    print(inspect.isgeneratorfunction(simple_generator)) # True
    print(inspect.getgeneratorstate(gen))                # GEN_CLOSED
    gen.close()
  • inspect.isgeneratorfunction
    generator 함수인지 확인 가능

  • inspect.getgeneratorstate
    generator 현 상태(GEN_CREATED, GEN_SUSPENDED, GEN_CLOSED) 확인 가능.

range 클래스역시 generator 를 사용하는 형식으로 구현되어 있어
백만개의 정수를 모두 만드는 것이 아니라 50000 에 이르렀을 때 종료시킨다.

for value in range(10000000):
    if value == 50000:
        print("Found it")
        break

실제 아래와 같은 제너레이터 함수라 할 수 있다.

def generator(my_rage):
    for i in range(my_range):
        yield i

generator 컴프리헨션

list, dict 와 마찬가지로 컴프리핸션 문법을 사용해 쉽게 generator 생성이 가능하다.

# ( (표현식) for (항목1) in (리스트) if (조건문) )

if __name__ == '__main__':
    gen = (i for i in range(0, 10))
    for value in gen:
        print(value, end=",")
        # 0,1,2,3,4,5,6,7,8,9,

    gen = (i * 2 for i in range(0, 9) if i % 2 == 0)
    for value in gen:
        print(value, end=",")
        # 0,4,8,12,16,

generator - next

for 문을 사용하지 않고 next 함수를 통해 generator 순회 가능하다.

def my_generator():
    print("hello")
    yield 1  # StopIteration 발생
    print("world")
    yield 2  # StopIteration 발생
    print("python")
    yield 'a'

if __name__ == '__main__':
    g = my_generator()
    print(type(g))  # <class 'generator'>
    print(next(g))  # hello \n 1
    print(next(g))  # world \n 2
    print(next(g))  # python \n a

next 를 호출하여 generator 초기화와 동시에 yield 직전까지 실행시킨다.
generator 의 호출 과정은 체인형태로 스택에 쌓인며 중단되 지점에 모든 상태(변수, 명령포인터, 연산스택 등)가 모두 보존된다.

generator - send

next 메서드를 통해 yield 를 실행하면 반환값이 없지만
send 메서드를 통해 yield 를 실행하면 제너레이터에 값을 전달헤 반환값이 생긴다.

def calculator():
    total = 0
    while True:
        x = yield total
        if x is None:
            break
        total += x


if __name__ == "__main__":
    calc_gen = calculator()
    # 제너레이터를 초기화
    print(next(calc_gen))  # 0

    # 값을 보내고 총합 출력
    print(calc_gen.send(10))  # 10, 
    print(calc_gen.send(20))  # 30
    print(calc_gen.send(-5))  # 25

    # 제너레이터 종료
    calc_gen.close()

첫번째 send 를 만나게 되면 값의 전달과 동시에 while 을 한번 돌고 yield 를 만나게 되면서 total 반환한다.

주의할 점은 sendgenerator 의 초기화 후 GEN_SUSPENDED 상태에서 호출 가능하다는 것.
최조 한번은 send(None) 을 호출하거나 next() 를 호출해서 GEN_SUSPENDED 상태로 만들어두어야 한다.

await, async

파이썬의 비동기 함수는 이벤트 루프 형태로 실행되며 추가 스레드를 생성하지 않는다.

async 키워드로 비동기 함수를 정의할 수 있으며 await 키워드로 async 실행완료 대기하는데 사용한다.
await 는 비동기 함수 안에서만 사용할 수 있으며 async 함수를 호출하거나 awaitable 객체 결과를 기다리는 데 사용된다.

Awaitable 클래스 종류는 아래와 같다.

             ┌──Coroutine
Awaitable◄───┤
             └──Future◄────Task

await, async 를 통해 쉽게 코루틴 형태의 코드를 작성할 수 있다.

asyncio

비동기 프로그래밍을 위한 Python 표준 라이브러리

run, gather, sleep

  • asyncio.run() 주어진 코루틴을 실행하고 완료될 때까지 대기,
    async 함수가 아님으로 await 키워드를 같이 사용할 필요가 없다.
  • asyncio.gather() 여러 코루틴을 동시에 실행하고, 모든 코루틴이 완료될 때까지 대기
  • asyncio.sleep() 지정된 시간 동안 비동기적으로 대기
import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

async def main():
    # 두 비동기 함수를 동시에 실행
    await asyncio.gather(task1(), task2())


if __name__ == '__main__':
    asyncio.run(main())
    # Task 1 started 
    # Task 2 started 
    # Task 2 finished 
    # Task 1 finished

Future, get_running_loop

  • asyncio.Future() 미래에 값을 설정할 수 있는 객체를 생성. 보통 콜백 또는 이벤트와 함께 사용한다.
async def set_future_value(future):
    await asyncio.sleep(1)
    future.set_result("Hello, World!")


async def main():
    future: Future = asyncio.Future()
    await asyncio.gather(set_future_value(future))
    result = await future
    print(result) # Hello, World!

if __name__ == '__main__':
    asyncio.run(main())

set_exception 을 통해 예외를 발생시킬 수 있다.

future.set_exception(Exception('Something went wrong'))
  • asyncio.get_running_loop() 현재 실행 중인 이벤트 루프를 반환
async def set_future_value(future):
    await asyncio.sleep(1)
    future.set_result("Hello, World!")

async def main():
    loop: AbstractEventLoop = asyncio.get_running_loop()
    future: Future = loop.create_future()
    
    await asyncio.gather(set_future_value(future))
    
    result = await future
    print(result)

if __name__ == '__main__':
    asyncio.run(main())

asyncio.Future() 보단 현재 이벤트 루프에서 Future 를 생성하는 loop.create_future() 사용을 권장한다.

사실 Future 는 내부에서 비동기 프로세스를 정의하고 싶을 때 사용하는 듯

import asyncio

async def main():
    loop = asyncio.get_running_loop()

    # Future 객체 생성
    future = loop.create_future()
    
    # Future 를 사용한 내부 비동기 프로세스 정의
    async def set_future_result():
        await asyncio.sleep(2)
        future.set_result("Future result")
    # Future 모니터링
    async def monitor_future():
        while not future.done():
            print("Waiting for future to complete...")
            await asyncio.sleep(0.5)
        print("Future completed!")
    
    monitor_task = asyncio.create_task(monitor_future())
    result_task = asyncio.create_task(set_future_result())
    
    # 모든 태스크가 완료될 때까지 기다림
    await asyncio.gather(monitor_task, result_task)
    print(f"Result: {future.result()}")

if __name__ == '__main__':
    asyncio.run(main())
# Waiting for future to complete...
# Waiting for future to complete...
# Waiting for future to complete...
# Waiting for future to complete...
# Future completed!
# Result: Future result

get_event_loop

  • asyncio.get_event_loop() 현재 실행 중인 이벤트 루프를 반환, 만약 존재하지 않는다면 새 이벤트 루프를 생성

아래와 같이 일반 함수안에서도 이벤트 루프(async)를 생성하고 실행시킬 수 있다.(의미는 없지만)

def synchronous_function():
    loop = asyncio.get_event_loop()
    if loop.is_running():
        print("Event loop is already running.")
    else:
        print("Event loop is not running. Creating and running a new loop.")
        loop.run_until_complete(asyncio.sleep(1))


if __name__ == '__main__':
    synchronous_function()
# Event loop is not running. Creating and running a new loop.

create_task, wait, wait_for

  • asyncio.create_task() 코루틴 함수를 관리를 위한 Task 변수를 생성 및 실행
  • asyncio.wait() Task 변수의 실행 및 대기, default return_when=ALL_COMPLETED 시 종료됨
  • asyncio.wait_for timeout 을 설정하고 해당 시간안에 완료될 수 있도록 대기.

TaskFuture 상속받은 클래스이다.

async def task1():
    await asyncio.sleep(1)
    return 'Task 1 complete'


async def task2():
    await asyncio.sleep(2)
    return 'Task 2 complete'


async def task3():
    await asyncio.sleep(3)
    return 'Task 3 complete'


async def long_task():
    print("Task started")
    await asyncio.sleep(5)  # 5초 동안 작업을 수행
    print("Task finished")
    return "Result of the task"


async def main():
    # create_task를 사용하여 Task 생성 및 실행
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    t3 = asyncio.create_task(task3())

    # FIRST_COMPLETED(첫 번째 태스크가 완료될 때까지 기다림), ALL_COMPLETED(default), FIRST_EXCEPTION
    done, pending = await asyncio.wait([t1, t2, t3], return_when=asyncio.FIRST_COMPLETED)
    # done, pending 은 set[Future] 형태의 객체

    # 완료된 태스크 결과 출력
    for task in done:
        print(task.result())

    # 미완 태스크 결과 출력
    for task in pending:
        print(task)

    try:
        # long_task가 3초 내에 완료되도록 기다림
        result = await asyncio.wait_for(long_task(), timeout=3)
        print(f"Task completed: {result}")
    except asyncio.TimeoutError:
        print("Task timed out!")



if __name__ == '__main__':
    asyncio.run(main())
# Task 2 complete
# Pending tasks:
# <Task pending name='Task-2' coro=<task1() running at /Users/kouzie/test.py:19> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x104d9b2e0>()]>>
# <Task pending name='Task-4' coro=<task3() running at /Users/kouzie/test.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x104d9b340>()]>>
# Task started
# Task timed out!

Event, Queue

  • asyncio.Event() 비동기 이벤트 객체를 생성. 공유된 이벤트 객체를 통해 비동기적으로 코드를 수행한다.
    threding 의 Event 함수와 비슷.
async def waiter(event):
    print("Waiting for event...")
    await event.wait()
    print("Event received!")


async def setter(event):
    await asyncio.sleep(2)
    event.set()


async def main():
    event: Event = asyncio.Event()
    await asyncio.gather(waiter(event), setter(event))

if __name__ == '__main__':
    asyncio.run(main())
  • asyncio.Queue() 비동기적으로 동작하는 큐를 생성. 생산자-소비자 패턴을 구현할때 좋음.
async def producer(queue):
    for i in range(5):
        await queue.put(i)
        await asyncio.sleep(1)

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        queue.task_done()

async def main():
    queue: Queue[int] = asyncio.Queue()
    await asyncio.gather(producer(queue), consumer(queue))

if __name__ == '__main__':
    asyncio.run(main())

카테고리:

업데이트: