funtask1() { println("start task1 in Thread ${Thread.currentThread()}") println("end task1 in Thread ${Thread.currentThread()}") }
funtask2() { println("start task2 in Thread ${Thread.currentThread()}") println("end task2 in Thread ${Thread.currentThread()}") }
println("start")
kotlin.run { task1() task2() println("called task1 and task2 from ${Thread.currentThread()}") } println("done") /* start called task1 and task2 from Thread[main,5,main] start task1 in Thread Thread[main,5,main] start task2 in Thread Thread[main,5,main] end task1 in Thread Thread[main,5,main] end task2 in Thread Thread[main,5,main] done, 1031 */
출력값과 실행시간을 보면 run 내부 함수들은 동시실행 되었으며, 코드노이즈 부분에서 js 의 callback method, java 의 reactive stream 과 비교했을 때 더 간결한 코드작성이 가능하다.
각 메서드의 콜스택이 main 메서드 코드 사이사이에 인터리브되어 모든 함수가 main 스레드 에서 실행됨을 알 수 있다.
suspend 함수 (일시중단함수)
suspend 키워드는 시작하고 멈추고 다시 시작할 수 있는 함수로 비동기 실행을 위한 중단 지점(suspention point) 을 의미하는 키워드이다.
I/O 처리나 외부 서비스와의 연계등 중단될만한 지점이 있다면 suspend 함수 로 정의해두고 내부에 동시성 코드를 작성하면 된다.
kotlin 에서 제공하는 대표적인 suspend 함수로 [delay, yield] 가 있다. suspend 함수를 맞닥뜨리면 스레드는 작업이 완료될 동안 다른작업을 수행한다. 또한 suspend 함수 는 항상 suspend 함수 내부에서 사용되어야 한다.
delay 는 지정된 밀리초만큼 실행을 정지시킨다. 간단히 스레드 하나짜리 스레드풀과 suspend 함수를 테스트하면 아래와 같다.
suspendfuntask1() { println("start task1 in Thread ${Thread.currentThread()}") yield() println("end task1 in Thread ${Thread.currentThread()}") }
suspendfuntask2() { println("start task2 in Thread ${Thread.currentThread()}") yield() println("end task2 in Thread ${Thread.currentThread()}") }
funmain() { println("start") runBlocking { launch { task1() } launch { task2() } println("called task1 and task2 from ${Thread.currentThread()}") } println("done") } /* start called task1 and task2 from Thread[main,5,main] start task1 in Thread Thread[main,5,main] start task2 in Thread Thread[main,5,main] end task1 in Thread Thread[main,5,main] end task2 in Thread Thread[main,5,main] done */
다른 코루틴 중단함수에서 yield 를 호출해주거나 중단함수가 종료되면 다시 제어를 되찾는다.
코루틴 Context
모든 코루틴 동작에는 코루틴 Context 가 존재한다. 코루틴을 식별, 실행하기 위한 다양한 정보들이 코루틴 Context 에 포함되어 있다.
가장 간단한 코루틴 Context 는 Coroutine Id 만 존재하며, 여러가지 요소를 추가해 결합된 코루틴 Context 정의가 가능하다.
아래와 같이 코루틴 Context 와 사용할 Dispatchers 요소를 결합하여 코루틴이 어떤 스레드풀에서 동작할지 결정할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Coroutine Id 까지 출력시키기 위해 -Dkotlinx.coroutines.debug 옵션 추가 funmain() { println("start") runBlocking { launch(Dispatchers.Default) { task1() } launch { task2() } println("called task1 and task2 from ${Thread.currentThread()}") } println("done") } /* start start task1 in Thread Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main] called task1 and task2 from Thread[main @coroutine#1,5,main] end task1 in Thread Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main] start task2 in Thread Thread[main @coroutine#3,5,main] end task2 in Thread Thread[main @coroutine#3,5,main] done */
Coroutine ID 가 [coroutine#1, coroutine#2, coroutine#3] 출력되는것으로 보아 전부 다른 코루틴 Context 에서 수행된는 것을 알 수 있다.
runBlocking 과 launch(Dispatchers.Default) 의 경우 실행되는 스레드가 달라 병렬실행 된다. runBlocking 과 그냥 launch 의 경우 실행되는 스레드가 같아 동시실행 된다.
위 예제처럼 Dispatcher 요소만 결합하면 알아서 코루틴을 스레드에 따라 코드를 병렬실행 혹은 동시실행 할지 결정할 수 있다.
Dispatchers.Default: 기본 스레드풀, 일반적인 CPU 작업을 수행하는 코루틴에 사용
Dispatchers.IO: IO작업 실행을 위한 코루틴에 사용
Dispatchers.Main: 안드로이드, Swing UI 를 구성 코루틴에 사용
만약 커스텀한 Dispatcher 를 결합하고 싶다면 ExecutorService 로부터 스레드풀을 직접 만들어 사용가능하다.
1 2 3 4 5 6
val dispatcher: ExecutorCoroutineDispatcher = Executors.newFixedThreadPool(10) .asCoroutineDispatcher() ... ... dispatcher.close() // close 함수를 호출하지 않으면 메인함수가 종료되지 않음으로 호출 필수
ExecutorService 를 패키징한 코루틴 표준 라이브러리 함수도 있다.
1 2
val singleThreadDispatcher = newSingleThreadContext("Single Thread ThreadPool") val multiThreadDispatcher = newFixedThreadPoolContext(10, "Multi Thread ThreadTool")
동시실행, 병렬실행에 대한 설정이 복잡한 스레드관련 코드 없이 코루틴 Context 결합만으로 쉽게 구축가능한것이 kotlin 코루틴 의 장점이다.
Continuation
중단지점으로 인해 동작하는 스레드가 변경되어도 동일한 코루틴 Context 가 유지되는 기능을 Continuation 이라 한다.
suspend 함수는 중단되었다가도 언제든 다시 실행될 수 있도록 하기위해 문맥정보를 유지하는 Continuation 기능을 지원한다.
task(2) 메서드의 출력값을 보면 코루틴 Context 이름은 동일하지만 실행되는 스레드는 다른것을 확인할 수 있다.
suspendfuntask(n: Long): Long { val factor = 2 println("start $n task in Thread ${Thread.currentThread()}") delay(n * 1000) println("end $n task in Thread ${Thread.currentThread()}") return n * factor }
funmain() { runBlocking { val compute = Compute() launch(Dispatchers.Default) { task(1) } launch(Dispatchers.Default) { task(2) } } } /* start 1 task in Thread Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main] start 2 task in Thread Thread[DefaultDispatcher-worker-2 @coroutine#3,5,main] end 1 task in Thread Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main] end 2 task in Thread Thread[DefaultDispatcher-worker-1 @coroutine#3,5,main] */
스레드간 코루틴 Context 를 유지하기 위해 변환된 바이트코드를 보면 굉장히 복잡하다.
continuation 을 유지하기 위해 외부에서 문맥정보에 해당하는 참조객체를 매개변수로 전달하고, 스레드풀에서 실행할 수 있는 코루틴 코드를 가진 익명객체를 사용한다.
java 에선 구현이 가능할까 싶다할 코루틴 기능을 kotlin 에서 쉽게 제공한다는 점이 중요하다.
코루틴 Scope
코루틴 Context 가 스레드가 사용하는 코루틴 실행 문맥정보라면, 코루틴 Scope 는 스레드가 코루틴 Context 정보를 가지고 실행시킬 코드영역, 코루틴 Context 가 영향을 끼치는 영역이라 할 수 있다.
1 2 3 4 5 6 7 8 9 10
publicinterfaceCoroutineScope { /** * The context of this scope. * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope. * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages. * * By convention, should contain an instance of a [job][Job] to enforce structured concurrency. */ publicval coroutineContext: CoroutineContext }
코루틴 Context 는 계층형태로 이루어져 있으며, 각 실행되는 코루틴 Context 는 서로 다르더라도 부모-자식 관계를 가지며 최상위 코루틴 Scope 안에서 동작된다.
좀더 자세한 그림은 아래와 같다.
생성된 코루틴 Scope 로부터 코루틴을 정의하고, 그 안에 또 새로운 코루틴을 정의했다.
코루틴 생성시 별도의 코루틴 Context 설정이 없다면 부모 코루틴 Context 로부터 그대로 속성값을 이어받음
마지막 block 매개변수로 CoroutineScope 의 확장함수로 구현된 람다 메서드를 전달받는데 해당 람다가 새로운 코루틴으로 동작하게 된다.
launch 메서드가 사용된 CoroutineScope 의 확장함수이니 동알한 CoroutineScope 에서 동작하게 된다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
funmain() { println("Before runBlocking") runBlocking { println("Before launch") launch { delay(2000L) println("Hello, World!") } println("After launch") // After launch 가 먼저 찍힌것을 확인 } println("After runBlocking") } /* Before runBlocking Before launch After launch Hello, World! After runBlocking */
표현식을 반환하지 않는 대부분의 코루틴 Builder 메서드들이 Job 인스턴스를 구현한 객체를 반환하는데 코루틴의 취소, 완료대기 등에 사용된다.
withTimeout, withTimeoutOrNull 등의 함수또한 Job 인스턴스를 반환한다.
async
launch 와 마찬가지로 CoroutineScope 인터페이스의 확장함수이다. async 는 표현식을 사용해 값을 반환해야 하는 경우에 사용한다.
suspendfuntask(taskName: String, time: Long) { println("$taskName start in Thread ${Thread.currentThread()}") delay(time) println("$taskName end in Thread ${Thread.currentThread()}") }
funmain() { println("start") val time = measureTimeMillis { runBlocking { println("called coroutine context ${Thread.currentThread()}") withContext(Dispatchers.Default) { task("first", 1000) } withContext(coroutineContext) { task("second", 1000) } } } println("done, time: $time") } /* start called coroutine context Thread[main @coroutine#1,5,main] first start in Thread Thread[DefaultDispatcher-worker-1 @coroutine#1,5,main] first end in Thread Thread[DefaultDispatcher-worker-1 @coroutine#1,5,main] second start in Thread Thread[main @coroutine#1,5,main] second end in Thread Thread[main @coroutine#1,5,main] done, time: 2056 */
출력처럼 실행되는 스레드풀은 다르지만 동일한 코루틴 Context 이름을 가진다. 그리고 동일한 코루틴 Context 를 유지하기 위해 동기성으로 코루틴 Scope 를 실행 시킨다.
하나의 스레드에서 실행하기 무거운 작업의 경우 스레드풀을 지정해서 실행하면 전체적인 시스템 부하를 줄일 수 있다.
coroutineScope
주어진 코루틴 Scope 와 코루틴 Context 를 이어받아 코루틴을 생성하는 함수
1 2 3 4 5
publicsuspendfun<R>coroutineScope(block: suspendCoroutineScope.() -> R): R = suspendCoroutineUninterceptedOrReturn { uCont -> val coroutine = ScopeCoroutine(uCont.context, uCont) coroutine.startUndispatchedOrReturn(coroutine, block) }
기존 스레드와 코루틴 Context 을 그대로 이어서 실행함으로 완료될 때까지 일시중단 된다.
funmain() { runBlocking { val job = launch { try { withContext(NonCancellable) { repeat(10) { i -> println("Job: Working $i, active $isActive") delay(500L) } } } catch (e: Exception) { println("예외처리 class:${e.javaClass.simpleName}, msg:${e.message}, active:$isActive") } } launch { delay(1300L) if (job.isActive) job.cancel() } } } /* Job: Working 0, active true Job: Working 1, active true ... 끝까지 수행됨 Job: Working 8, active true Job: Working 9, active true */
withContext(NonCancellable) 로 인해 try catch 까지 예외가 전파되지 않는다.
취소위임
코루틴 Context 내부에는 isActive 속성이 존재하며 Job, Deferred 객체에서 cancel 메서드를 통해 isActive 속성을 변경할 수 있다.
아래와 같이 실시간으로 isActive 속성을 검사하는 방식을 사용하면 중단지점이 없더라도 코루틴 취소 기능을 내부 코드에 위임시켜while 무한루프에서 탈출할 수 있다.
위 예에서도 자식 코루틴 Context 에 정의된 h2 예외핸들러 가 사용되지 않고 top level context 에 정의된 h1 예외핸들러 가 호출된다.
만약 top level context 에 예외핸들러 를 정의하지 않았다면 오류가 제어되지 않아 비정상 종료된다.
async 예외처리
async 는 Deferred 객체가 반환되고 await 함수가 해당 코루틴 Scope 의 완료시점임을 알 수 있다. 따라서 코루틴 Scope 내부에서 예외가 발생해도 실제 예외를 던지는 시점을 await 까지 늦추고 await 를 try catch 블록으로 감싸 예외처리를 진행한다.