KotlinのCoroutineScopeのメモ

やっとexperimental が外れたのでボチボチ使っていきたい。

https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.mdhttp://kotlinlang.org/docs/reference/coroutines/exception-handling.html を読みながら理解しようとしたが、サンプルを見ても「なぜこういう挙動になるのか」が分からないのでソースコードを読んでみた。

コルーチンとは何か

コルーチンは停止(yield)と再開(resume)が可能な実行単位である。スレッドと異なり停止中はリソースを占有しないように考慮されているし、Dispatcherが抽象化されているので、必要ならAndroidのメインスレッド上でコルーチンを動かすこともできる。

意外なことに、Coroutine そのものが名づけられた クラスやインタフェースは 存在しない。存在するのは Continuation インタフェースと AbstractCoroutine抽象クラス(後述)である。

コルーチンの開始。CoroutineStart.invoke() の内部から startCoroutine が呼ばれ、組み込み関数の createCoroutineなんたら() により 「サスペンド可能なラムダ式」が「Continuationインスタンス」に変わり、 拡張関数 resumeなんたら(Unit) が呼ばれてコルーチンの処理が開始する。Continuation がもし DispatchedContinuation なら resume 時にDispatcherを利用する。

コルーチンがどのスレッドで実行されるかはDispatcherによって異なる。 runBlocking{} は呼び出し元スレッドでイベントループを回す。 launch{},async{}はcontextに指定されたDispatcherがなければ Dispatchers.Default を使う。Dispatchersには数種類のDispatcherが定義されている。

Continuation の context プロパティについて。 JVM実装では createCoroutineFromSuspendFunction()
https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/coroutines/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt#L155 によると completion 引数のcontext が使われるらしい。

CoroutineContext

https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/-coroutine-context/index.html

コルーチン(Continuation)は実行に必要な情報をCoroutineContextに保持する。CoroutineContextは複数の属性を持つ。Job や Dispatcher などが設定される。

  • 属性のキーは CoroutineContext.Key である。参照アドレスでの比較が行われる。
  • 属性の値は CoroutineContext.Element である。ElementそのものもCoroutineContextである。
  • キーの定数はJobやContinuationInterceptorのコンパニオンオブジェクトが使われる
  • EventLoopBase は CoroutineDispatcher である。 CoroutineDispatcher は ContinuationInterceptor である。
  • CoroutineContext は合成できる。左辺の属性の一部が右辺の属性で上書きされる。
  • 派生クラス CombinedContextなど

kotlin.coroutines パッケージに coroutineContext という suspend property が定義されている。
coroutine か suspend function からのみ suspend property にアクセスできる。
suspend fun の内部から現在のCoroutineContextを取得することができる。

Job

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/

Jobはコルーチンの実行状態を示す。キャンセルや完了待機などの状態を持つ。

JobはCoroutineContextの一部として扱われるため、Job のベースクラスは CoroutineContext.Element と CoroutineContext である。Job どうしを合成することは可能だが、意味がないので合成結果は右辺そのものとなる。

Jobオブジェクトを扱う機会

  • キャンセルのための親ジョブを作る時
  • launch{} や async{} の戻り値

Jobオブジェクトへの操作

  • join(), joinAll(), cancel(), cancelAndJoin()

Jobには親子関係がある。
コード中のコメントから引用

coroutineContext内のJobインスタンスは、コルーチン自体を表します。
ジョブは親ジョブを持つことができます。
親を持つジョブは、その親が取り消されるとキャンセルされます。
親ジョブは すべての子プロセスが完了するまで_completing_ または _cancelling_ 状態で待機します。
_completing_状態は、ジョブの純粋な内部状態であることに注意してください。
_completing_ ジョブは外部から観察したらまだ _active_ なように見えますが、内部的には子たちのために待機しています。

ジョブの失敗と通常のキャンセルはキャンセル例外のcauseの種別により区別されます。
キャンセル例外のcauseが[CancellationException]の場合、ジョブ通常のキャンセルされたと解釈されます。
追加パラメータなしでキャンセルが呼び出された時にこれは発生します。
もしキャンセル例外のcauseがその他の例外だった場合、ジョブは失敗したと解釈されます。
ジョブのコードで問題が起きて例外が投げられた時にこれは発生します。

Jobインタフェースと派生した全てのインタフェースの関数はスレッドセーフであり
外部の同期なしで並行するコルーチンから安全に呼び出すことができます。

キャンセル時の例外ハンドリングではcauseも表示しておくべきか。

Job() ヘルパ関数は JobImpl(parent=null) を作成する。JobImpl はJobSupportである。
JobSupport クラスには cancelsParent プロパティがあり、キャンセル発生時に親をキャンセルするかどうかを決定する。
JobImpl cancelsParent=true, onCancelComplete=true, handlesException=false のプロパティがJobSupportと異なる。

コード中のコメントから引用

private fun cancelParent(cause: Throwable): Boolean {
// (causeが)CancellationExceptionなのは "通常"と解釈され、子供がそれを生成した場合は親はキャンセルされない。
// これにより、子がクラッシュしてその完了時に他の例外が生成されない限り、親は自身をキャンセルせずに子を(通常の)キャンセルすることができます。
...
}

Structured concurrency https://medium.com/@elizarov/structured-concurrency-722d765aa952 の正体は、Jobの親子関係と JobSupport の cancelsParent プロパティだった。

CoroutineScope

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/

CoroutineScope は通常、ラムダ式へのレシーバ変数として提供される。

  • coroutineContextプロパティへのアクセス
  • 拡張関数 actor, async, broadcast, launch, newCoroutineContext, plus, produce, promise
  • 派生クラス GlobalScope, ActorScope, ProducerScope
  • CoroutineScope + CoroutineContext でスコープを合成できる

スコープの絶縁

runBlocking,coroutineScope,launch,async等のスコープ作成関数で作成したスコープ(ジョブ)は、その内部で作成した子スコープ(ジョブ)が全て終了するまでは終了しない。これはこれで便利なのだが、ライフサイクル的には親子のスコープ(ジョブ)を絶縁したい場合がある。

間違った例 coroutineScope{}

coroutineScope{} はコード外側のコンテキストからJob以外を継承してJobを新しくしたスコープを作成してblockを実行する。内部のJobのキャンセルがcoroutineScopeよりも外側に影響しないので絶縁できたかのように思えるが、他のスコープ作成関数と同様、coroutineScope{}自体は内部で作成した子ジョブが全て終了するまでブロックしてしまう。

val producer = coroutineScope{ produce<Int>{  (1..3).forEach { channel.send(it) } } }

この例は外側のスコープとproducerを分離しようとしているが、coroutineScope{} は produceのコードブロックが完了するまでブロックする。この時点ではproducer変数からデータを読む処理は開始しないので、ここで永遠にブロックしてしまう。

正しい例1 GlobalScope

GlobalScopeはEmptyCoroutineContext を持つグローバル定数。JobもDispatcherも割り当てられていない。
CoroutineScope(context) と同様、親スコープと子ジョブを分離する際に使われる。

val producer = GlobalScope.produce<Int>{ (1..3).forEach { channel.send(it) } } 

この例ではGlobalScopeを親にすることで、コードの外側のスコープとproducerのスコープを完全に分離する。
実際には必要に応じてproduceの引数にDispatcherやJob等のコンテキストを追加する。

正しい例2 CoroutineScope(context) ヘルパ関数 と SupervisorJob()

https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/exception-handling.md#supervision

val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
	val producer = produce<Int>{ (1..3).forEach { channel.send(it) } }
}

CoroutineScope() はコードの外側のスコープの影響を受けない。
また、もし引数のコンテキストにJobが含まれなければ空のジョブを追加する。

スコープを絶縁した後の注意

スコープの絶縁はコルーチンをリークさせる可能性がある。上の例ではproducer 変数のチャネルはcapacity=0 で作成されるため、チャネルからデータが読み取られるまでsend() の部分でブロックする。作成したコルーチンのライフサイクルをプログラマが管理する必要がある。

AbstractCoroutine

https://github.com/Kotlin/kotlinx.coroutines/blob/master/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt

@InternalCoroutinesApi なのでユーザがこのクラスを直接利用することはないが書いておく。

runBlocking などのレシーバ変数(is CoroutineScope) は実際には BlockingCoroutine 等の、AbstractCoroutine の派生クラスである。

AbstractCoroutine はJobSupportであり、Jobであり、Continuationであり、CoroutineScope である。
1つのクラスに複数の役割が持たせられている理由はおそらくオブジェクト確保を減らすためだろう。
Continuation 以外にコルーチン個別の実行制御に必要な情報は全て AbstractCoroutine に入っている訳だ。

AbstractCoroutine作成時の引数 parentContext の指定は runBlocking() 等ではデフォルト値 EmptyCoroutineContext となっている。AbstractCoroutineのJobSupportには cancelsParent=true が設定されているので、もし parentContextが存在すればブロック内部で発生したキャンセルは親に伝達される。

asyncとlaunch

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/launch.html

launchはコルーチンを開始してJobを返す。
asyncはコルーチンを開始してDeferredを返す。DeferredはJobであるが戻り値の扱いが多少異なる。

asyncとlaunchはどちらも newCoroutineContext() を使って外側のコンテキストを引き継ぎ、指定がなければディスパッチャをDispatchers.Defaultにする。親Jobがあればキャンセル発生時にキャンセルの伝達が起きる。
newCoroutineContext() は外側のコンテキストと引数(省略時はEmptyCoroutineContext)のコンテキストを合成し、ディスパッチャの指定がなければDispatchers.Defaultを補う。

asyncとlaunch はそっくりだが少しだけ異なる。

  • launch は StandaloneCoroutine か LazyStandaloneCoroutine を生成、startしてそれを返す
  • async は DeferredCoroutine かLazyDeferredCoroutine を生成、startしてそれを返す

そして StandaloneCoroutine と DeferredCoroutine もそっくりだ。

  • どちらもAbstractCoroutineである。cancelsParent=trueに設定される。
  • DeferredCoroutine はDeferred, SelectClause1 である。await() と onAwait と registerSelectClause1() が用意される。

CoroutineExceptionHandler

launch で非同期処理を行う際に CoroutineExceptionHandler というCoroutineContextを追加することで例外を補足できる。
しかし実際に試してみると Structured Concurrency と相性が悪く、親スコープが例外を処理してしまうとそこで例外が再送出されてしまう。
直接の親のジョブをSupervisorJobにするだけでは足りない。 GlobalScope、親ジョブのないCoroutineScope(SupervisorJob)、SupervidorScopeなどを使う。

val handler = CoroutineExceptionHandler { _, exception ->
    println("handler caught $exception")
    println("thread ${Thread.currentThread().name}")
}

val supervisor = SupervisorJob()

// NG
// coroutineScope {
//    launch(handler) {
//        error("inside launch1")
//    }
//    delay(1000)
// }

// NG
// runBlocking(supervisor) {
//    launch(handler) {
//        error("inside launch2")
//    }
//    delay(1000)
// }

// OK
GlobalScope.launch(handler) {
    error("inside launch3")
}
delay(1000)

// OK
with(CoroutineScope(supervisor)){
    launch(handler) { error("inside launch4") }
    delay(1000)
}

supervisorScope{
    launch(handler) { error("inside launch5") }
    delay(1000)
}