defprint_numbers(): for i inrange(1, 6): print(f"Number: {i}")
defprint_letters(): for letter in ['A', 'B', 'C', 'D', 'E']: print(f"Letter: {letter}")
# 스레드 생성 thread1 = threading.Thread(target=print_numbers) thread2 = threading.Thread(target=print_letters)
# 스레드 시작 thread1.start() thread2.start()
# 스레드가 완료될 때까지 대기 thread1.join() thread2.join()
print("All threads have finished.") # Number: 1 # Number: 2 # Number: 3 # Number: 4 # Letter: A # Number: 5 # Letter: B # Letter: C # Letter: D # Letter: E # All threads have finished.
defwait_for_event(e: threading.Event): print("Thread started and waiting for event to be set") e.wait() # Event 가 set 되기를 무한 대기 print("Event has been set, resuming thread")
defwait_for_event_timeout(e: threading.Event, t): whilenot e.is_set(): print(f"Thread checking for event, will timeout in {t} seconds") event_is_set = e.wait(t) # t 초만큼 Event 가 set 되기를 대기 if event_is_set: print("Event has been set, resuming thread") else: print("Timeout occurred, continuing to check for event")
if __name__ == '__main__': # Event 객체 생성 event1 = threading.Event() event2 = threading.Event()
# 스레드 생성 및 시작 thread1 = threading.Thread(target=wait_for_event, args=(event1,)) thread2 = threading.Thread(target=wait_for_event_timeout, args=(event2, 1))
# 스레드 시작 thread1.start() thread2.start()
# 잠시 대기 후 이벤트 설정 time.sleep(5) event1.set() print("event1 setting complete")
# 잠시 대기 후 이벤트 설정 time.sleep(5) event2.set() print("event2 setting complete")
# 스레드가 완료될 때까지 대기 thread1.join() thread2.join()
print("All threads have finished.")
is_set() flag==True 인지 확인.
set() flag=True 로 설정.
clear() flag=False 로 설정.
wait(timeout=None) flag==True 될 때까지 대기. timeout 이 설정된 경우, 지정된 시간 동안만 대기.
flag 를 통해 wait 중인 thread 코드를 이어서 진행할지 결정할 수 있다. 위와 같이 while not e.is_set() 같은 조건을 통해 thread 종료도 결정할 수 있다.
보통 아래와 같이 threading.Timer 와 같이 사용되는 경우가 많음으로 참고
1 2 3 4 5
# Event 객체 생성 event = threading.Event()
# 5초 후에 이벤트를 설정하는 타이머 생성 timer = threading.Timer(5.0, set_event, [event])
Thread 상속
threading.Thread 를 상속받아서 멀티스레드 역할을 수행하는 클래스를 정의할 수 있음.
defrun(self): print(f"{self.name} is waiting for the event to be set.") self.event.wait() # 이벤트가 설정될 때까지 대기 for i inrange(1, 6): print(f"{self.name} - Number: {i}") time.sleep(1)
if __name__ == '__main__': # Interval 함수 시작 stop_event = i_wanna_interval("hello") threading.Timer(11, stop_event.set).start() # 47 hello # 49 hello # 51 hello # 53 hello # 55 hello
generator
python generator 는 반복(iteration)을 생성하는 특별한 유형의 함수.
제너레이터는 이터레이터 처럼 작동하는 객체로 yield, next 키워드를 사용해 이터레이터 프로토콜 구현체 정의가 가능하다.
yield 키워드를 사용하여 순회할 요소를 반환받는다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
import inspect
defsimple_generator(): yield1 yield2 yield3
if __name__ == '__main__': gen = simple_generator()
for value in gen: print(value, end=",") # 1,2,3, print(inspect.isgeneratorfunction(simple_generator)) # True print(inspect.getgeneratorstate(gen)) # GEN_CLOSED gen.close()
inspect.isgeneratorfunction generator 함수인지 확인 가능
inspect.getgeneratorstate generator 현 상태(GEN_CREATED, GEN_SUSPENDED, GEN_CLOSED) 확인 가능.
range 클래스역시 generator 를 사용하는 형식으로 구현되어 있어 백만개의 정수를 모두 만드는 것이 아니라 50000 에 이르렀을 때 종료시킨다.
1 2 3 4
for value inrange(10000000): if value == 50000: print("Found it") break
실제 아래와 같은 제너레이터 함수라 할 수 있다.
1 2 3
defgenerator(my_rage): for i inrange(my_range): yield i
generator 컴프리헨션
list, dict 와 마찬가지로 컴프리핸션 문법을 사용해 쉽게 generator 생성이 가능하다.
1 2 3 4 5 6 7 8 9 10 11 12
# ( (표현식) for (항목1) in (리스트) if (조건문) )
if __name__ == '__main__': gen = (i for i inrange(0, 10)) for value in gen: print(value, end=",") # 0,1,2,3,4,5,6,7,8,9,
gen = (i * 2for i inrange(0, 9) if i % 2 == 0) for value in gen: print(value, end=",") # 0,4,8,12,16,
generator - next
for 문을 사용하지 않고 next 함수를 통해 generator 순회 가능하다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
defmy_generator(): print("hello") yield1# StopIteration 발생 print("world") yield2# StopIteration 발생 print("python") yield'a'
if __name__ == '__main__': g = my_generator() print(type(g)) # <class 'generator'> print(next(g)) # hello \n 1 print(next(g)) # world \n 2 print(next(g)) # python \n a
next 를 호출하여 generator 초기화와 동시에 yield 직전까지 실행시킨다. generator 의 호출 과정은 체인형태로 스택에 쌓인며 중단되 지점에 모든 상태(변수, 명령포인터, 연산스택 등)가 모두 보존된다.
generator - send
next 메서드를 통해 yield 를 실행하면 반환값이 없지만 send 메서드를 통해 yield 를 실행하면 제너레이터에 값을 전달헤 반환값이 생긴다.
# Future 객체 생성 future = loop.create_future() # Future 를 사용한 내부 비동기 프로세스 정의 asyncdefset_future_result(): await asyncio.sleep(2) future.set_result("Future result") # Future 모니터링 asyncdefmonitor_future(): whilenot future.done(): print("Waiting for future to complete...") await asyncio.sleep(0.5) print("Future completed!") monitor_task = asyncio.create_task(monitor_future()) result_task = asyncio.create_task(set_future_result()) # 모든 태스크가 완료될 때까지 기다림 await asyncio.gather(monitor_task, result_task) print(f"Result: {future.result()}")
if __name__ == '__main__': asyncio.run(main()) # Waiting for future to complete... # Waiting for future to complete... # Waiting for future to complete... # Waiting for future to complete... # Future completed! # Result: Future result
get_event_loop
asyncio.get_event_loop() 현재 실행 중인 이벤트 루프를 반환, 만약 존재하지 않는다면 새 이벤트 루프를 생성
아래와 같이 일반 함수안에서도 이벤트 루프(async)를 생성하고 실행시킬 수 있다.(의미는 없지만)
1 2 3 4 5 6 7 8 9 10 11 12
defsynchronous_function(): loop = asyncio.get_event_loop() if loop.is_running(): print("Event loop is already running.") else: print("Event loop is not running. Creating and running a new loop.") loop.run_until_complete(asyncio.sleep(1))
if __name__ == '__main__': synchronous_function() # Event loop is not running. Creating and running a new loop.
create_task, wait, wait_for
asyncio.create_task() 코루틴 함수를 관리를 위한 Task 변수를 생성 및 실행
asyncio.wait() Task 변수의 실행 및 대기, defaultreturn_when=ALL_COMPLETED 시 종료됨
asyncio.wait_for timeout 을 설정하고 해당 시간안에 완료될 수 있도록 대기.