Kotlin Co-Routine

  • CoRoutine is like a light weight thread, that is running in application layer (not in operating system layer).
  • A coroutine can suspend on one thread and resume on a different thread
    • At any given moment, a coroutine runs on exactly one thread.
    • The dispatcher determines which thread(s) a coroutine uses.
      • Multiple coroutines can run on the same thread (one at a time though).
  • By default one co-routine throwing (non CancellationException) uncaught exception will cancel job hierarchy, meaning:
    • Cancel parent (recursively). Parent is determined through job hierarchy.
    • Cancel sibling co-routine (parent will cancel all it's children recursively).
    • Rethrow original exception to the parent.
  • Uncaught exception on async will auto cancel siblings right away. (example)
  • When you catch exceptions see if they are CancellationException if they are DON'T keep going.
  • Cooperative Cancellation - coroutines must check for cancellation at suspension points, and respect it.

Job is the primary mechanism for controlling cancellation behavior of co-routines.

Job - provides another avenue of stopping work other than traditional exception flow. Separate cancellation mechanism that is not as apparent as typical exceptions, where failed co-routine cancel's all co-routines that are related to it through Job hierarchy. (Note: Supervisor-Job is exception)

In ALL scopes that use Regular Job:

Highlight

Job - provide another avenue of stopping work other than traditional exceptions. Separate cancellation mechanism that is not as apparent as typical exception flow, where failed co-routine cancel's all co-routines that are related to it through Job hierarchy.

  • Each coroutine has its own Job instance.
  • Each Job instance is tied to it's parent job (if it has a parent Job). This tie in enables cancellation behavior.

Example

If regular Job() were used. If Coroutine#6 throws non-CancellationException exception the entire hierarchy is gonig to be cancelled and Coroutine#1 will rethrow the exception that Coroutine#6 threw.

img

Creating Error Boundary

Gotchas

too many nested note refs

img

In the “Active” state, a job is running and doing its job. If the job is created with a coroutine builder, this is the state where the body of this coroutine will be executed. In this state, we can start child coroutines. Most coroutines will start in the “Active” state. Only those that are started lazily will start with the “New” state. These need to be started in order for them to move to the “Active” state. When a coroutine is executing its body, it is surely in the “Active” state. When it is done, its state changes to “Completing”, where it waits for its children. Once all its children are done, the job changes its state to “Completed”, which is a terminal one. Alternatively, if a job cancels or fails when running (in the “Active” or “Completing” state), its state will change to “Cancelling”. In this state, we have the last chance to do some clean-up, like closing connections or freeing resources (we will see how to do this in the next chapter). Once this is done, the job will move to the “Cancelled” state.

StateisActiveisCompletedisCancelled
New (optional initial state)falsefalsefalse
Active (default initial state)truefalsefalse
Completing (transient state)truefalsefalse
Cancelling (transient state)falsefalsetrue
Cancelled (final state)falsetruetrue
Completed (final state)falsetruefalse

SupervisorJob is similar to a regular Job with the difference that cancellation is propagated only downwards, which means:

  • Children can fail independently of each other.
  • Does not auto-fail parent.

A failure or cancellation of a child does not cause the supervisor job to fail and does not affect its other children.

Straightforward Examples

too many nested note refs

Trickier examples

too many nested note refs

graph TD; A[coroutineContext] -->|Has a| B("Job") A -->|Has a| C[Dispatcher] A -->|Has a| CoroutineExceptionHandler B -->|Parent of| J1[Child Job 1] B -->|Parent of| J2[Child Job 2] B -->|Parent of| J3[Child Job 3] J1 -->|Manages| D1[Coroutine 1] J2 -->|Manages| D2[Coroutine 2] J3 -->|Manages| D3[Coroutine 3] D1 -->|Runs| E1[Task 1] D2 -->|Runs| E2[Task 2] D3 -->|Runs| E3[Task 3] classDef blue fill:#3b82f6,stroke:#ffffff,stroke-width:2px; classDef green fill:#10b981,stroke:#ffffff,stroke-width:2px; classDef red fill:#ef4444,stroke:#ffffff,stroke-width:2px; class A blue; class B green; class D1,D2,D3 red;

References

CoroutineContext is a collection of elements that define a coroutine’s behavior.

  • Job — controls the lifecycle of the coroutine.
  • Dispatcher — defines the thread/thread-pool the work will be dispatched to.
  • CoroutineExceptionHandler — handles uncaught exceptions.
  • CoroutineName — Adds a name to the coroutine (useful for debugging).
Diagram

graph TD; A[coroutineContext] -->|Has a| B("Job") A -->|Has a| C[Dispatcher] A -->|Has a| CoroutineExceptionHandler B -->|Parent of| J1[Child Job 1] B -->|Parent of| J2[Child Job 2] B -->|Parent of| J3[Child Job 3] J1 -->|Manages| D1[Coroutine 1] J2 -->|Manages| D2[Coroutine 2] J3 -->|Manages| D3[Coroutine 3] D1 -->|Runs| E1[Task 1] D2 -->|Runs| E2[Task 2] D3 -->|Runs| E3[Task 3] classDef blue fill:#3b82f6,stroke:#ffffff,stroke-width:2px; classDef green fill:#10b981,stroke:#ffffff,stroke-width:2px; classDef red fill:#ef4444,stroke:#ffffff,stroke-width:2px; class A blue; class B green; class D1,D2,D3 red;

References

References

In simple terms, a CoroutineScope in Kotlin is like a "container" that manages your coroutines. It defines the boundaries and the "lifetime" for the coroutines you start within it, ensuring that they are automatically canceled when they are no longer needed.

Job is the primary mechanism for controlling cancellation behavior of co-routines.

Job - provides another avenue of stopping work other than traditional exception flow. Separate cancellation mechanism that is not as apparent as typical exceptions, where failed co-routine cancel's all co-routines that are related to it through Job hierarchy. (Note: Supervisor-Job is exception)

too many nested note refs

img

In the “Active” state, a job is running and doing its job. If the job is created with a coroutine builder, this is the state where the body of this coroutine will be executed. In this state, we can start child coroutines. Most coroutines will start in the “Active” state. Only those that are started lazily will start with the “New” state. These need to be started in order for them to move to the “Active” state. When a coroutine is executing its body, it is surely in the “Active” state. When it is done, its state changes to “Completing”, where it waits for its children. Once all its children are done, the job changes its state to “Completed”, which is a terminal one. Alternatively, if a job cancels or fails when running (in the “Active” or “Completing” state), its state will change to “Cancelling”. In this state, we have the last chance to do some clean-up, like closing connections or freeing resources (we will see how to do this in the next chapter). Once this is done, the job will move to the “Cancelled” state.

StateisActiveisCompletedisCancelled
New (optional initial state)falsefalsefalse
Active (default initial state)truefalsefalse
Completing (transient state)truefalsefalse
Cancelling (transient state)falsefalsetrue
Cancelled (final state)falsetruetrue
Completed (final state)falsetruefalse

SupervisorJob is similar to a regular Job with the difference that cancellation is propagated only downwards, which means:

  • Children can fail independently of each other.
  • Does not auto-fail parent.

A failure or cancellation of a child does not cause the supervisor job to fail and does not affect its other children.

too many nested note refs

What is a "Scope with Regular Job"?

A Scope with Regular Job in Kotlin coroutines refers to any CoroutineScope that uses Job() and hence abides by default exception handling behavior - where uncaught non-cancellation exceptions in child coroutine Cancels (shutsdown) its entire hierarchy of co-routines.

Default Exception Behavior

In ALL scopes that use Regular Job:

Highlight

Job - provide another avenue of stopping work other than traditional exceptions. Separate cancellation mechanism that is not as apparent as typical exception flow, where failed co-routine cancel's all co-routines that are related to it through Job hierarchy.

  • Each coroutine has its own Job instance.
  • Each Job instance is tied to it's parent job (if it has a parent Job). This tie in enables cancellation behavior.

Example

If regular Job() were used. If Coroutine#6 throws non-CancellationException exception the entire hierarchy is gonig to be cancelled and Coroutine#1 will rethrow the exception that Coroutine#6 threw.

img

Creating Error Boundary

Gotchas

too many nested note refs

Code example

too many nested note refs

too many nested note refs

Key Takeaway

Unless you explicitly use SupervisorJob or supervisorScope, you're working with a scope with regular job that follows structured concurrency principles where one child's failure affects the entire coroutine family.

GT-Sandbox-Snapshot: Example - 1, Cancelling using SupervisorJob

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.*

interface Server {
  suspend fun start()
  suspend fun stop()
}

val out = Out.standard()

class ServerImpl(
  private val scope: CoroutineScope // Injected scope for better control
) : Server {

  private val job = SupervisorJob() // Ensures independent coroutines
  private val serverScope = scope + job

  override suspend fun start() {
    out.info("Starting server")

    serverScope.launch(CoroutineName("ServerWork-1")) {
      out.info("Running server work in thread: ${Thread.currentThread().name}")
      delay(2000)
      out.info("ServerWork-1 completed")
    }

    serverScope.launch(CoroutineName("ServerWork-2")) {
      out.info("Running additional server work in thread: ${Thread.currentThread().name}")
      delay(1500)
      out.info("ServerWork-2  completed")
    }
  }

  override suspend fun stop() {
    out.info("Stopping server")

    job.cancel()
  }
}

fun main() = runBlocking {
  out.info("--------------------------------------------------------------------------------")
  out.info("Example where the server is aborted prior to finishing:")
  runWithDelayBeforeStopping(1000)

  out.info("--------------------------------------------------------------------------------")
  out.info("Example where the server has time to finish:")
  runWithDelayBeforeStopping(3000)
}

private suspend fun runWithDelayBeforeStopping(delayBeforeStopping: Long) {
  val server = ServerImpl(CoroutineScope(Dispatchers.Default)) // Inject external scope
  server.start()
  delay(delayBeforeStopping) // Use delay instead of Thread.sleep
  server.stop()
}

Command to reproduce:

gt.sandbox.checkout.commit bed37c930bb1de4223cb \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[elapsed:   51ms][🥇/tname:main/tid:1][coroutine:unnamed] --------------------------------------------------------------------------------
[elapsed:   62ms][🥇/tname:main/tid:1][coroutine:unnamed] Example where the server is aborted prior to finishing:
[elapsed:   65ms][🥇/tname:main/tid:1][coroutine:unnamed] Starting server
[elapsed:   73ms][⓶/tname:DefaultDispatcher-worker-1/tid:20][coroutine:ServerWork-1] Running server work in thread: DefaultDispatcher-worker-1
[elapsed:   74ms][⓷/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] Running additional server work in thread: DefaultDispatcher-worker-2
[elapsed: 1084ms][🥇/tname:main/tid:1][coroutine:unnamed] Stopping server
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] --------------------------------------------------------------------------------
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] Example where the server has time to finish:
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] Starting server
[elapsed: 1088ms][⓶/tname:DefaultDispatcher-worker-1/tid:20][coroutine:ServerWork-1] Running server work in thread: DefaultDispatcher-worker-1
[elapsed: 1088ms][⓷/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] Running additional server work in thread: DefaultDispatcher-worker-2
[elapsed: 2594ms][⓷/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] ServerWork-2  completed
[elapsed: 3093ms][⓷/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-1] ServerWork-1 completed
[elapsed: 4089ms][🥇/tname:main/tid:1][coroutine:unnamed] Stopping server
Cannot re-use scope

https://chatgpt.com/share/31be55da-2e7f-4dc8-9fc3-000384f58086

Sample - 1
import kotlinx.coroutines.*

fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default)

    val job = scope.launch {
        println("Job started")
        delay(1000)
        println("Job completed")
    }

    delay(500)  // Wait for a while
    scope.cancel()  // Cancel the scope

    // Attempt to launch a new job in the cancelled scope
    val newJob = scope.launch {
        println("New job started")
        delay(1000)
        println("New job completed")
    }

    newJob.invokeOnCompletion { exception ->
        if (exception is CancellationException) {
            println("New job was cancelled due to scope cancellation")
        } else if (exception != null) {
            println("New job completed with exception: $exception")
        } else {
            println("New job completed successfully")
        }
    }

    // Wait for all jobs to complete
    joinAll(job, newJob)
}
Sample - 2
import kotlinx.coroutines.*

fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default)

    val job = scope.launch {
        println("Job started")
        delay(1000)
        println("Job completed")
    }

    delay(500)  // Wait for a while
    scope.cancel()  // Cancel the scope

    // Create a new scope and launch a new job
    val newScope = CoroutineScope(Dispatchers.Default)
    val newJob = newScope.launch {
        println("New job started")
        delay(1000)
        println("New job completed")
    }

    // Wait for all jobs to complete
    joinAll(job, newJob)
}

Scope functions

coroutineScope: The hero of coroutines.

coroutineScope creates a boundary that establishes a new CoroutineScope.

The new scope inherits its coroutineContext from the outer scope, but overrides the context’s Job. It also causes the current coroutine to suspend until all child coroutines have finished their execution.

The scope created by coroutineScope has a new Job - this creates an exception boundary where failures surface. However, exceptions will continue propagating up the Job hierarchy unless intercepted by exception handling (try-catch, CoroutineExceptionHandler) at the scope boundary.

Coroutine scope functions create an error/exception boundary: Allow us to try-catch the Exception to prevent it from bubbling up to parent Job.

Error boundary example

Without error boundary

launch { // Job A
    launch { // Job B - direct child of A
        throw Exception() // Exception happens in B
                         // B fails and cancels A
                         // NO stopping point at B
                         // Cancels A
    }
}

Moving towards error boundary using coroutineScope

launch { // Job A  
    coroutineScope { // Job B - creates exception boundary
        launch { // Job C - child of B
            throw Exception() // Exception happens in C
                              // C fails, B fails and surfaces exception
                              // CAN be caught at B's boundary (not caught right now)
        }
    }
}

Example creating error boundary try-catch:

launch {
   try {
       // coroutineScope will wait for all the child co-routines to finish.
       coroutineScope {
           launch { 
               delay(1000)
               throw RuntimeException("boom!") 
           }
       }
   } catch (e: CancellationException) {
       // Re-throw CancellationException - do NOT suppress cancellation!
       throw e
   } catch (e: Exception) {
      println("Caught: ${e.message}")
       // Handle other exceptions - stops propagation
       // Coroutine continues normally here
   }
   
   println("Launch continues after exception handling")
}

Details on individual Scope functions:

too many nested note refs

too many nested note refs

too many nested note refs

too many nested note refs

Notes

If you need to use functionalities from two coroutine scope func- tions, you need to use one inside another. For instance, to set both a timeout and a dispatcher, you can use withTimeoutOrNull inside withContext. - Kotlin Coroutines Deep Dive

suspend fun calculateAnswerOrNull(): User? =
    withContext(Dispatchers.Default) {
        withTimeoutOrNull(1000) {
            calculateAnswer()
        }
}

Building scope gotchas

There is a cost to spawn and await on a co-routine, but the cost appears to be quite minimal: On the order of 1/1000-1/100 of a millisecond for async/await usage.

Below are benchmark results, first calling function WITHOUT co-routine and second is with async/await mechanism.

Benchmark: Looped add (no-coroutines)

Metric(nanos)(millis)
Min100.000
P50 (Median)200.000
P90200.000
P95510.000
P99610.000
P99.92710.000
Max38,7050.039
Total21,389,56021.390
Iterations1,000,000

Benchmark: Looped add (with async await)

Metric(micros)(millis)
Min20.002
P50 (Median)20.002
P9030.003
P9530.003
P9950.005
P99.990.009
Max4,2844.284
Total2,371,7682371.768
Iterations1,000,000

Code to reproduce

GT-Sandbox-Snapshot

Code

package com.glassthought.sandbox

import com.asgard.testTools.benchmarker.BenchMarkerJVM
import com.glassthought.sandbox.util.out.impl.out
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import kotlin.system.exitProcess


private suspend fun mainImpl() {
  BenchMarkerJVM.benchMark(
    {
      loopedAdd()
    },
    description = "Looped add (no-coroutines)",
  ).printFormattedTable()

  BenchMarkerJVM.benchMark(
    {
      asyncAwaitLoopedAdd()
    },
    description = "Looped add (with async await)",
  ).printFormattedTable()

}

suspend fun asyncAwaitLoopedAdd() {
  coroutineScope {
    val result = async {
      loopedAdd()
    }
    result.await()
  }
}

private fun loopedAdd() {
  for (i in 1..100) {
    add(i, i + 1)
  }
}

private fun add(i: Int, j: Int): Int {
  return i + j
}

fun main(): Unit = runBlocking {
  out.info("START - ON MAIN")

  try {
    mainImpl()

    out.info("DONE - WITHOUT errors on MAIN")
  } catch (e: Exception) {
    out.error("Back at MAIN got an exception! of type=[${e::class.simpleName}] with msg=[${e.message}] cause=[${e.cause}]. Exiting with error code 1")

    exitProcess(1)
  }
}

Command to reproduce:

gt.sandbox.checkout.commit b946649204a145ed938f \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

Picked up JAVA_TOOL_OPTIONS: -Dkotlinx.coroutines.debug
Picked up JAVA_TOOL_OPTIONS: -Dkotlinx.coroutines.debug
[INFO][elapsed:   14ms][🥇][①][coroutname:@coroutine#1][tname:main/tid:1] START - ON MAIN
#### Benchmark: Looped add (no-coroutines)
| Metric       | (nanos)         | (millis)        |
|--------------|-----------------|-----------------|
| Min          |              10 |           0.000 |
| P50 (Median) |              20 |           0.000 |
| P90          |              20 |           0.000 |
| P95          |              60 |           0.000 |
| P99          |              70 |           0.000 |
| P99.9        |             240 |           0.000 |
| Max          |         110,706 |           0.111 |
| Total        |      21,951,663 |          21.952 |
| Iterations   |       1,000,000 |                 |

#### Benchmark: Looped add (with async await)
| Metric       | (micros)        | (millis)        |
|--------------|-----------------|-----------------|
| Min          |               2 |           0.002 |
| P50 (Median) |               2 |           0.002 |
| P90          |               3 |           0.003 |
| P95          |               3 |           0.003 |
| P99          |               5 |           0.005 |
| P99.9        |               8 |           0.008 |
| Max          |           2,145 |           2.145 |
| Total        |       2,323,353 |        2323.353 |
| Iterations   |       1,000,000 |                 |

[INFO][elapsed: 2658ms][🥇][①][coroutname:@coroutine#1][tname:main/tid:1] DONE - WITHOUT errors on MAIN


'suspend' keyword

Fundamentals of suspend in Kotlin

High-Level Overview

In Kotlin, the suspend keyword is used to mark a function as suspendable, meaning it can be paused and resumed at a later time.

Suspending functions allow you to pause the execution of the current coroutine without blocking the thread.

Misconception: suspend does NOT make your function run async by itself.

Suspend does not instruct Kotlin to execute a function in a background thread. Suspending functions are only asynchronous if used explicitly as such.

Why Use suspend?

  1. Non-blocking Operations: suspend functions allow you to perform long-running operations (like network requests or disk IO) without blocking the main thread.
  2. Improved Readability: Code using suspend functions can be written in a sequential manner, making it more readable compared to callback-based approaches.
  3. Resource Efficiency: By not blocking threads, coroutines can handle a large number of tasks with a small number of threads, leading to better resource utilization.

How suspend Works

A suspend function can suspend its execution without blocking the thread, and resume execution later. It can only be called from another suspend function or a coroutine.

Basic Example

Basic Example

Code

package gt.sandbox

import gt.sandbox.internal.output.Out
import kotlinx.coroutines.*

val out = Out.standard()

suspend fun fetchData(s: String) {
    out.println("Fetching data... $s")
    delay(1000) // This suspends the coroutine for 1 second without blocking the thread
    out.println("Data fetched $s")
}

fun main(): Unit = runBlocking {
    launch {
        fetchData("a-1")
        fetchData("a-2")
    }
    launch {
        fetchData("b")
    }
    launch {
        fetchData("c")
    }
}

Command to reproduce:

gt.sandbox.checkout.commit 14d257c97de10a4 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-06-09T03:40:03.685024Z][ms-elapsed-since-start:   45][tname:main/tid:1] Fetching data... a-1
[2024-06-09T03:40:03.704826Z][ms-elapsed-since-start:   55][tname:main/tid:1] Fetching data... b
[2024-06-09T03:40:03.704975Z][ms-elapsed-since-start:   55][tname:main/tid:1] Fetching data... c
[2024-06-09T03:40:04.708638Z][ms-elapsed-since-start: 1059][tname:main/tid:1] Data fetched a-1
[2024-06-09T03:40:04.709425Z][ms-elapsed-since-start: 1060][tname:main/tid:1] Fetching data... a-2
[2024-06-09T03:40:04.709835Z][ms-elapsed-since-start: 1060][tname:main/tid:1] Data fetched b
[2024-06-09T03:40:04.710102Z][ms-elapsed-since-start: 1061][tname:main/tid:1] Data fetched c
[2024-06-09T03:40:05.715174Z][ms-elapsed-since-start: 2066][tname:main/tid:1] Data fetched a-2

BUILD SUCCESSFUL in 2s
2 actionable tasks: 2 executed

References


Children
  1. Cancellation
  2. Channel
  3. CoRoutine Gotchas
  4. CoRoutine Highlighted
  5. Commandments
  6. Continuation
  7. Coroutine Benchmark
  8. Coroutine Builders
  9. Coroutine Diagram
  10. CoroutineContext
  11. CoroutineExceptionHandler
  12. CoroutineScope
  13. Debug
  14. Dispatcher
  15. Eg
  16. Example
  17. Job
  18. Pattern
  19. Ref
  20. VS Thread
  21. runTest & TestScope (virtualized-time in tests)