Kotlin Co-Routine
- CoRoutine is like a light weight thread, that is running in application layer (not in operating system layer).
- A coroutine can suspend on one thread and resume on a different thread
- At any given moment, a coroutine runs on exactly one thread.
- The dispatcher determines which thread(s) a coroutine uses.
- Multiple coroutines can run on the same thread (one at a time though).
- By default one co-routine throwing (non CancellationException) uncaught exception will cancel job hierarchy, meaning:
- Cancel parent (recursively). Parent is determined through job hierarchy.
- Cancel sibling co-routine (parent will cancel all it's children recursively).
- Rethrow original exception to the parent.
- Uncaught exception on
async
will auto cancel siblings right away. (example) - When you catch exceptions see if they are CancellationException if they are DON'T keep going.
- Cooperative Cancellation - coroutines must check for cancellation at suspension points, and respect it.
- ⚠️ Firstly, make sure to understand intended Exception/Cancellation Behavior with Regular Job ⚠️.
- ⚠️ Swallowing cancellation exception prevents cancellation ⚠️
- ⚠️ Unhandled exception from async can go to parent scope without going through await() ⚠️
- ⚠️ cancel() call stops on next suspension point, NOT right away ⚠️
- ⚠️ join() does NOT rethrow ⚠️
Job is not inherited: it is used as a parent
DONT:
runBlocking(SupervisorJob())
makes no sense. pg 433.
Job is the primary mechanism for controlling cancellation behavior of co-routines.
Job - provides another avenue of stopping work other than traditional exception flow. Separate cancellation mechanism that is not as apparent as typical exceptions, where failed co-routine cancel's all co-routines that are related to it through Job hierarchy. (Note: Supervisor-Job is exception)
In ALL scopes that use Regular Job:
- Uncaught, non-cancellation exception → Cancels Siblings + Propagates recursively through parent Job hierarchy, with each parent shutting down its children, Effectively shutting down the entire hierarchy up to the top most Job (or up to where you caught exception at error boundary).
- Cancels siblings: Cancellation Cooperative Functions/suspension point in sibling functions will throw CancellationException
- Propagates to parent: Parent scope will rethrow the original exception that was thrown in co-routine.
Highlight
Job - provide another avenue of stopping work other than traditional exceptions. Separate cancellation mechanism that is not as apparent as typical exception flow, where failed co-routine cancel's all co-routines that are related to it through Job hierarchy.
- Each coroutine has its own Job instance.
- Each Job instance is tied to it's parent job (if it has a parent Job). This tie in enables cancellation behavior.
Example
If regular Job()
were used. If Coroutine#6 throws non-CancellationException exception the entire hierarchy is gonig to be cancelled and Coroutine#1 will rethrow the exception that Coroutine#6 threw.
Creating Error Boundary
- Look at Coroutine scope function and Error Boundary to create error/ boundaries which allows us to try/catch exceptions stopping them from going higher up.
Gotchas
too many nested note refs
In the “Active” state, a job is running and doing its job. If the job is created with a coroutine builder, this is the state where the body of this coroutine will be executed. In this state, we can start child coroutines. Most coroutines will start in the “Active” state. Only those that are started lazily will start with the “New” state. These need to be started in order for them to move to the “Active” state. When a coroutine is executing its body, it is surely in the “Active” state. When it is done, its state changes to “Completing”, where it waits for its children. Once all its children are done, the job changes its state to “Completed”, which is a terminal one. Alternatively, if a job cancels or fails when running (in the “Active” or “Completing” state), its state will change to “Cancelling”. In this state, we have the last chance to do some clean-up, like closing connections or freeing resources (we will see how to do this in the next chapter). Once this is done, the job will move to the “Cancelled” state.
State | isActive | isCompleted | isCancelled |
---|---|---|---|
New (optional initial state) | false | false | false |
Active (default initial state) | true | false | false |
Completing (transient state) | true | false | false |
Cancelling (transient state) | false | false | true |
Cancelled (final state) | false | true | true |
Completed (final state) | false | true | false |
SupervisorJob is similar to a regular Job with the difference that cancellation is propagated only downwards, which means:
- Children can fail independently of each other.
- Does not auto-fail parent.
A failure or cancellation of a child does not cause the supervisor job to fail and does not affect its other children.
Straightforward Examples
too many nested note refs
Trickier examples
too many nested note refs
References
CoroutineContext
is a collection of elements that define a coroutine’s behavior.
- Job — controls the lifecycle of the coroutine.
- Dispatcher — defines the thread/thread-pool the work will be dispatched to.
- CoroutineExceptionHandler — handles uncaught exceptions.
CoroutineName
— Adds a name to the coroutine (useful for debugging).
Diagram
References
References
In simple terms, a CoroutineScope in Kotlin is like a "container" that manages your coroutines. It defines the boundaries and the "lifetime" for the coroutines you start within it, ensuring that they are automatically canceled when they are no longer needed.
Job is the primary mechanism for controlling cancellation behavior of co-routines.
Job - provides another avenue of stopping work other than traditional exception flow. Separate cancellation mechanism that is not as apparent as typical exceptions, where failed co-routine cancel's all co-routines that are related to it through Job hierarchy. (Note: Supervisor-Job is exception)
too many nested note refs
In the “Active” state, a job is running and doing its job. If the job is created with a coroutine builder, this is the state where the body of this coroutine will be executed. In this state, we can start child coroutines. Most coroutines will start in the “Active” state. Only those that are started lazily will start with the “New” state. These need to be started in order for them to move to the “Active” state. When a coroutine is executing its body, it is surely in the “Active” state. When it is done, its state changes to “Completing”, where it waits for its children. Once all its children are done, the job changes its state to “Completed”, which is a terminal one. Alternatively, if a job cancels or fails when running (in the “Active” or “Completing” state), its state will change to “Cancelling”. In this state, we have the last chance to do some clean-up, like closing connections or freeing resources (we will see how to do this in the next chapter). Once this is done, the job will move to the “Cancelled” state.
State | isActive | isCompleted | isCancelled |
---|---|---|---|
New (optional initial state) | false | false | false |
Active (default initial state) | true | false | false |
Completing (transient state) | true | false | false |
Cancelling (transient state) | false | false | true |
Cancelled (final state) | false | true | true |
Completed (final state) | false | true | false |
SupervisorJob is similar to a regular Job with the difference that cancellation is propagated only downwards, which means:
- Children can fail independently of each other.
- Does not auto-fail parent.
A failure or cancellation of a child does not cause the supervisor job to fail and does not affect its other children.
too many nested note refs
What is a "Scope with Regular Job"?
A Scope with Regular Job in Kotlin coroutines refers to any CoroutineScope
that uses Job() and hence abides by default exception handling behavior - where uncaught non-cancellation exceptions in child coroutine Cancels (shutsdown) its entire hierarchy of co-routines.
Default Exception Behavior
In ALL scopes that use Regular Job:
- Uncaught, non-cancellation exception → Cancels Siblings + Propagates recursively through parent Job hierarchy, with each parent shutting down its children, Effectively shutting down the entire hierarchy up to the top most Job (or up to where you caught exception at error boundary).
- Cancels siblings: Cancellation Cooperative Functions/suspension point in sibling functions will throw CancellationException
- Propagates to parent: Parent scope will rethrow the original exception that was thrown in co-routine.
Highlight
Job - provide another avenue of stopping work other than traditional exceptions. Separate cancellation mechanism that is not as apparent as typical exception flow, where failed co-routine cancel's all co-routines that are related to it through Job hierarchy.
- Each coroutine has its own Job instance.
- Each Job instance is tied to it's parent job (if it has a parent Job). This tie in enables cancellation behavior.
Example
If regular Job()
were used. If Coroutine#6 throws non-CancellationException exception the entire hierarchy is gonig to be cancelled and Coroutine#1 will rethrow the exception that Coroutine#6 threw.
Creating Error Boundary
- Look at Coroutine scope function and Error Boundary to create error/ boundaries which allows us to try/catch exceptions stopping them from going higher up.
Gotchas
too many nested note refs
Code example
too many nested note refs
too many nested note refs
Key Takeaway
Unless you explicitly use SupervisorJob
or supervisorScope
, you're working with a scope with regular job that follows structured concurrency principles where one child's failure affects the entire coroutine family.
GT-Sandbox-Snapshot: Example - 1, Cancelling using SupervisorJob
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.*
interface Server {
suspend fun start()
suspend fun stop()
}
val out = Out.standard()
class ServerImpl(
private val scope: CoroutineScope // Injected scope for better control
) : Server {
private val job = SupervisorJob() // Ensures independent coroutines
private val serverScope = scope + job
override suspend fun start() {
out.info("Starting server")
serverScope.launch(CoroutineName("ServerWork-1")) {
out.info("Running server work in thread: ${Thread.currentThread().name}")
delay(2000)
out.info("ServerWork-1 completed")
}
serverScope.launch(CoroutineName("ServerWork-2")) {
out.info("Running additional server work in thread: ${Thread.currentThread().name}")
delay(1500)
out.info("ServerWork-2 completed")
}
}
override suspend fun stop() {
out.info("Stopping server")
job.cancel()
}
}
fun main() = runBlocking {
out.info("--------------------------------------------------------------------------------")
out.info("Example where the server is aborted prior to finishing:")
runWithDelayBeforeStopping(1000)
out.info("--------------------------------------------------------------------------------")
out.info("Example where the server has time to finish:")
runWithDelayBeforeStopping(3000)
}
private suspend fun runWithDelayBeforeStopping(delayBeforeStopping: Long) {
val server = ServerImpl(CoroutineScope(Dispatchers.Default)) // Inject external scope
server.start()
delay(delayBeforeStopping) // Use delay instead of Thread.sleep
server.stop()
}
Command to reproduce:
gt.sandbox.checkout.commit bed37c930bb1de4223cb \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"
Recorded output of command:
[elapsed: 51ms][🥇/tname:main/tid:1][coroutine:unnamed] --------------------------------------------------------------------------------
[elapsed: 62ms][🥇/tname:main/tid:1][coroutine:unnamed] Example where the server is aborted prior to finishing:
[elapsed: 65ms][🥇/tname:main/tid:1][coroutine:unnamed] Starting server
[elapsed: 73ms][⓶/tname:DefaultDispatcher-worker-1/tid:20][coroutine:ServerWork-1] Running server work in thread: DefaultDispatcher-worker-1
[elapsed: 74ms][⓷/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] Running additional server work in thread: DefaultDispatcher-worker-2
[elapsed: 1084ms][🥇/tname:main/tid:1][coroutine:unnamed] Stopping server
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] --------------------------------------------------------------------------------
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] Example where the server has time to finish:
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] Starting server
[elapsed: 1088ms][⓶/tname:DefaultDispatcher-worker-1/tid:20][coroutine:ServerWork-1] Running server work in thread: DefaultDispatcher-worker-1
[elapsed: 1088ms][⓷/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] Running additional server work in thread: DefaultDispatcher-worker-2
[elapsed: 2594ms][⓷/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] ServerWork-2 completed
[elapsed: 3093ms][⓷/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-1] ServerWork-1 completed
[elapsed: 4089ms][🥇/tname:main/tid:1][coroutine:unnamed] Stopping server
Cannot re-use scope
https://chatgpt.com/share/31be55da-2e7f-4dc8-9fc3-000384f58086
Sample - 1
import kotlinx.coroutines.*
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default)
val job = scope.launch {
println("Job started")
delay(1000)
println("Job completed")
}
delay(500) // Wait for a while
scope.cancel() // Cancel the scope
// Attempt to launch a new job in the cancelled scope
val newJob = scope.launch {
println("New job started")
delay(1000)
println("New job completed")
}
newJob.invokeOnCompletion { exception ->
if (exception is CancellationException) {
println("New job was cancelled due to scope cancellation")
} else if (exception != null) {
println("New job completed with exception: $exception")
} else {
println("New job completed successfully")
}
}
// Wait for all jobs to complete
joinAll(job, newJob)
}
Sample - 2
import kotlinx.coroutines.*
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default)
val job = scope.launch {
println("Job started")
delay(1000)
println("Job completed")
}
delay(500) // Wait for a while
scope.cancel() // Cancel the scope
// Create a new scope and launch a new job
val newScope = CoroutineScope(Dispatchers.Default)
val newJob = newScope.launch {
println("New job started")
delay(1000)
println("New job completed")
}
// Wait for all jobs to complete
joinAll(job, newJob)
}
Scope functions
coroutineScope: The hero of coroutines.
coroutineScope
creates a boundary that establishes a new CoroutineScope
.
The new scope inherits its coroutineContext
from the outer scope, but overrides the context’s Job
. It also causes the current coroutine to suspend until all child coroutines have finished their execution.
The scope created by coroutineScope
has a new Job - this creates an exception boundary where failures surface. However, exceptions will continue propagating up the Job hierarchy unless intercepted by exception handling (try-catch, CoroutineExceptionHandler) at the scope boundary.
Coroutine scope functions create an error/exception boundary: Allow us to try-catch the Exception to prevent it from bubbling up to parent Job.
Error boundary example
Without error boundary
launch { // Job A
launch { // Job B - direct child of A
throw Exception() // Exception happens in B
// B fails and cancels A
// NO stopping point at B
// Cancels A
}
}
Moving towards error boundary using coroutineScope
launch { // Job A
coroutineScope { // Job B - creates exception boundary
launch { // Job C - child of B
throw Exception() // Exception happens in C
// C fails, B fails and surfaces exception
// CAN be caught at B's boundary (not caught right now)
}
}
}
Example creating error boundary try-catch:
launch {
try {
// coroutineScope will wait for all the child co-routines to finish.
coroutineScope {
launch {
delay(1000)
throw RuntimeException("boom!")
}
}
} catch (e: CancellationException) {
// Re-throw CancellationException - do NOT suppress cancellation!
throw e
} catch (e: Exception) {
println("Caught: ${e.message}")
// Handle other exceptions - stops propagation
// Coroutine continues normally here
}
println("Launch continues after exception handling")
}
Details on individual Scope functions:
too many nested note refs
too many nested note refs
too many nested note refs
too many nested note refs
Notes
If you need to use functionalities from two coroutine scope func- tions, you need to use one inside another. For instance, to set both a timeout and a dispatcher, you can use withTimeoutOrNull inside withContext. - Kotlin Coroutines Deep Dive
suspend fun calculateAnswerOrNull(): User? =
withContext(Dispatchers.Default) {
withTimeoutOrNull(1000) {
calculateAnswer()
}
}
Building scope gotchas
There is a cost to spawn and await on a co-routine, but the cost appears to be quite minimal: On the order of 1/1000
-1/100
of a millisecond for async/await usage.
Below are benchmark results, first calling function WITHOUT co-routine and second is with async/await mechanism.
Benchmark: Looped add (no-coroutines)
Metric | (nanos) | (millis) |
---|---|---|
Min | 10 | 0.000 |
P50 (Median) | 20 | 0.000 |
P90 | 20 | 0.000 |
P95 | 51 | 0.000 |
P99 | 61 | 0.000 |
P99.9 | 271 | 0.000 |
Max | 38,705 | 0.039 |
Total | 21,389,560 | 21.390 |
Iterations | 1,000,000 |
Benchmark: Looped add (with async await)
Metric | (micros) | (millis) |
---|---|---|
Min | 2 | 0.002 |
P50 (Median) | 2 | 0.002 |
P90 | 3 | 0.003 |
P95 | 3 | 0.003 |
P99 | 5 | 0.005 |
P99.9 | 9 | 0.009 |
Max | 4,284 | 4.284 |
Total | 2,371,768 | 2371.768 |
Iterations | 1,000,000 |
Code to reproduce
GT-Sandbox-Snapshot
Code
package com.glassthought.sandbox
import com.asgard.testTools.benchmarker.BenchMarkerJVM
import com.glassthought.sandbox.util.out.impl.out
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import kotlin.system.exitProcess
private suspend fun mainImpl() {
BenchMarkerJVM.benchMark(
{
loopedAdd()
},
description = "Looped add (no-coroutines)",
).printFormattedTable()
BenchMarkerJVM.benchMark(
{
asyncAwaitLoopedAdd()
},
description = "Looped add (with async await)",
).printFormattedTable()
}
suspend fun asyncAwaitLoopedAdd() {
coroutineScope {
val result = async {
loopedAdd()
}
result.await()
}
}
private fun loopedAdd() {
for (i in 1..100) {
add(i, i + 1)
}
}
private fun add(i: Int, j: Int): Int {
return i + j
}
fun main(): Unit = runBlocking {
out.info("START - ON MAIN")
try {
mainImpl()
out.info("DONE - WITHOUT errors on MAIN")
} catch (e: Exception) {
out.error("Back at MAIN got an exception! of type=[${e::class.simpleName}] with msg=[${e.message}] cause=[${e.cause}]. Exiting with error code 1")
exitProcess(1)
}
}
Command to reproduce:
gt.sandbox.checkout.commit b946649204a145ed938f \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"
Recorded output of command:
Picked up JAVA_TOOL_OPTIONS: -Dkotlinx.coroutines.debug
Picked up JAVA_TOOL_OPTIONS: -Dkotlinx.coroutines.debug
[INFO][elapsed: 14ms][🥇][①][coroutname:@coroutine#1][tname:main/tid:1] START - ON MAIN
#### Benchmark: Looped add (no-coroutines)
| Metric | (nanos) | (millis) |
|--------------|-----------------|-----------------|
| Min | 10 | 0.000 |
| P50 (Median) | 20 | 0.000 |
| P90 | 20 | 0.000 |
| P95 | 60 | 0.000 |
| P99 | 70 | 0.000 |
| P99.9 | 240 | 0.000 |
| Max | 110,706 | 0.111 |
| Total | 21,951,663 | 21.952 |
| Iterations | 1,000,000 | |
#### Benchmark: Looped add (with async await)
| Metric | (micros) | (millis) |
|--------------|-----------------|-----------------|
| Min | 2 | 0.002 |
| P50 (Median) | 2 | 0.002 |
| P90 | 3 | 0.003 |
| P95 | 3 | 0.003 |
| P99 | 5 | 0.005 |
| P99.9 | 8 | 0.008 |
| Max | 2,145 | 2.145 |
| Total | 2,323,353 | 2323.353 |
| Iterations | 1,000,000 | |
[INFO][elapsed: 2658ms][🥇][①][coroutname:@coroutine#1][tname:main/tid:1] DONE - WITHOUT errors on MAIN
'suspend' keyword
Fundamentals of suspend
in Kotlin
High-Level Overview
In Kotlin, the suspend
keyword is used to mark a function as suspendable, meaning it can be paused and resumed at a later time.
Suspending functions allow you to pause the execution of the current coroutine without blocking the thread.
Misconception: suspend does NOT make your function run async by itself.
Suspend does not instruct Kotlin to execute a function in a background thread. Suspending functions are only asynchronous if used explicitly as such.
Why Use suspend
?
- Non-blocking Operations:
suspend
functions allow you to perform long-running operations (like network requests or disk IO) without blocking the main thread. - Improved Readability: Code using
suspend
functions can be written in a sequential manner, making it more readable compared to callback-based approaches. - Resource Efficiency: By not blocking threads, coroutines can handle a large number of tasks with a small number of threads, leading to better resource utilization.
How suspend
Works
A suspend
function can suspend its execution without blocking the thread, and resume execution later. It can only be called from another suspend
function or a coroutine.
Basic Example
Basic Example
Code
package gt.sandbox
import gt.sandbox.internal.output.Out
import kotlinx.coroutines.*
val out = Out.standard()
suspend fun fetchData(s: String) {
out.println("Fetching data... $s")
delay(1000) // This suspends the coroutine for 1 second without blocking the thread
out.println("Data fetched $s")
}
fun main(): Unit = runBlocking {
launch {
fetchData("a-1")
fetchData("a-2")
}
launch {
fetchData("b")
}
launch {
fetchData("c")
}
}
Command to reproduce:
gt.sandbox.checkout.commit 14d257c97de10a4 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-06-09T03:40:03.685024Z][ms-elapsed-since-start: 45][tname:main/tid:1] Fetching data... a-1
[2024-06-09T03:40:03.704826Z][ms-elapsed-since-start: 55][tname:main/tid:1] Fetching data... b
[2024-06-09T03:40:03.704975Z][ms-elapsed-since-start: 55][tname:main/tid:1] Fetching data... c
[2024-06-09T03:40:04.708638Z][ms-elapsed-since-start: 1059][tname:main/tid:1] Data fetched a-1
[2024-06-09T03:40:04.709425Z][ms-elapsed-since-start: 1060][tname:main/tid:1] Fetching data... a-2
[2024-06-09T03:40:04.709835Z][ms-elapsed-since-start: 1060][tname:main/tid:1] Data fetched b
[2024-06-09T03:40:04.710102Z][ms-elapsed-since-start: 1061][tname:main/tid:1] Data fetched c
[2024-06-09T03:40:05.715174Z][ms-elapsed-since-start: 2066][tname:main/tid:1] Data fetched a-2
BUILD SUCCESSFUL in 2s
2 actionable tasks: 2 executed
References
Children
- Cancellation
- Channel
- CoRoutine Gotchas
- CoRoutine Highlighted
- Commandments
- Continuation
- Coroutine Benchmark
- Coroutine Builders
- Coroutine Diagram
- CoroutineContext
- CoroutineExceptionHandler
- CoroutineScope
- Debug
- Dispatcher
- Eg
- Example
- Job
- Pattern
- Ref
- VS Thread
- runTest & TestScope (virtualized-time in tests)