Kotlin Co-Routine

The core functionality: Is the ability to suspend a coroutine at some point and resume it in the future.

img

Example Code

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.*
import kotlin.random.Random

// Simulating a View interface for displaying data
interface View {
  suspend fun showNews(news: List<String>)
  suspend fun showUser(user: String)
  suspend fun showProgressBar()
  suspend fun hideProgressBar()
}

val out = Out.standard()

// A simple implementation of the View interface
class ConsoleView : View {
  override suspend fun showNews(news: List<String>) {
    out.printlnGreen("News: $news")
  }

  override suspend fun showUser(user: String) {
    out.printlnGreen("User: $user")
  }

  override suspend fun showProgressBar() {
    out.printlnBlue("Loading...")
  }

  override suspend fun hideProgressBar() {
    out.printlnBlue("Done loading.")
  }
}

// Simulated functions to fetch data
suspend fun getNewsFromApi(): List<String> {
  out.println("Starting news fetch...")
  delay(1000L) // Simulating network delay
  out.println("News fetch complete.")
  return listOf("News 1", "News 2", "News 3")
}

suspend fun getUserData(): String {
  out.println("Starting user data fetch...")
  delay(800L) // Simulating network delay
  out.println("User data fetch complete.")

  return "John Doe"
}


// Suspending functions to update news and profile
suspend fun updateNews(view: View) {
  view.showProgressBar()
  val news = getNewsFromApi()
  val sortedNews = news.sortedByDescending { Random.nextInt(1, 100) } // Simulated sorting
  view.showNews(sortedNews)
  view.hideProgressBar()
}

suspend fun updateProfile(view: View) {
  val user = getUserData()
  view.showUser(user)
}

// Main function
fun main() = runBlocking {
  out.println("Starting the program...")

  val view = ConsoleView()
  val scope = CoroutineScope(Dispatchers.Default)

  scope.launch(CoroutineName("UpdateNews")) {
    updateNews(view)
  }

  scope.launch(CoroutineName("UpdateProfile")) {
    updateProfile(view)
  }

  // Keep the program alive long enough to see the output
  delay(2000L)

  out.println("Program finished.")
}

Command to reproduce:

gt.sandbox.checkout.commit a22a14e145f6445dd1dc \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[elapsed:   31ms][tname:main/tid:1][coroutine:unnamed] Starting the program...
[elapsed:   53ms][tname:DefaultDispatcher-worker-1/tid:20][coroutine:UpdateNews] Loading...
[elapsed:   53ms][tname:DefaultDispatcher-worker-1/tid:20][coroutine:UpdateNews] Starting news fetch...
[elapsed:   54ms][tname:DefaultDispatcher-worker-2/tid:21][coroutine:UpdateProfile] Starting user data fetch...
[elapsed:  859ms][tname:DefaultDispatcher-worker-1/tid:20][coroutine:UpdateProfile] User data fetch complete.
[elapsed:  864ms][tname:DefaultDispatcher-worker-1/tid:20][coroutine:UpdateProfile] User: John Doe
[elapsed: 1057ms][tname:DefaultDispatcher-worker-1/tid:20][coroutine:UpdateNews] News fetch complete.
[elapsed: 1078ms][tname:DefaultDispatcher-worker-1/tid:20][coroutine:UpdateNews] News: [News 3, News 2, News 1]
[elapsed: 1078ms][tname:DefaultDispatcher-worker-1/tid:20][coroutine:UpdateNews] Done loading.
[elapsed: 2058ms][tname:main/tid:1][coroutine:unnamed] Program finished.

In simple terms, a CoroutineScope in Kotlin is like a "container" that manages your coroutines. It defines the boundaries and the "lifetime" for the coroutines you start within it, ensuring that they are automatically canceled when they are no longer needed.

GT-Sandbox-Snapshot: Example - 1, Cancelling using SupervisorJob

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.*

interface Server {
  suspend fun start()
  suspend fun stop()
}

val out = Out.standard()

class ServerImpl(
  private val scope: CoroutineScope // Injected scope for better control
) : Server {

  private val job = SupervisorJob() // Ensures independent coroutines
  private val serverScope = scope + job

  override suspend fun start() {
    out.info("Starting server")

    serverScope.launch(CoroutineName("ServerWork-1")) {
      out.info("Running server work in thread: ${Thread.currentThread().name}")
      delay(2000)
      out.info("ServerWork-1 completed")
    }

    serverScope.launch(CoroutineName("ServerWork-2")) {
      out.info("Running additional server work in thread: ${Thread.currentThread().name}")
      delay(1500)
      out.info("ServerWork-2  completed")
    }
  }

  override suspend fun stop() {
    out.info("Stopping server")

    job.cancel()
  }
}

fun main() = runBlocking {
  out.info("--------------------------------------------------------------------------------")
  out.info("Example where the server is aborted prior to finishing:")
  runWithDelayBeforeStopping(1000)

  out.info("--------------------------------------------------------------------------------")
  out.info("Example where the server has time to finish:")
  runWithDelayBeforeStopping(3000)
}

private suspend fun runWithDelayBeforeStopping(delayBeforeStopping: Long) {
  val server = ServerImpl(CoroutineScope(Dispatchers.Default)) // Inject external scope
  server.start()
  delay(delayBeforeStopping) // Use delay instead of Thread.sleep
  server.stop()
}

Command to reproduce:

gt.sandbox.checkout.commit bed37c930bb1de4223cb \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[elapsed:   51ms][🥇/tname:main/tid:1][coroutine:unnamed] --------------------------------------------------------------------------------
[elapsed:   62ms][🥇/tname:main/tid:1][coroutine:unnamed] Example where the server is aborted prior to finishing:
[elapsed:   65ms][🥇/tname:main/tid:1][coroutine:unnamed] Starting server
[elapsed:   73ms][⓶/tname:DefaultDispatcher-worker-1/tid:20][coroutine:ServerWork-1] Running server work in thread: DefaultDispatcher-worker-1
[elapsed:   74ms][â“·/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] Running additional server work in thread: DefaultDispatcher-worker-2
[elapsed: 1084ms][🥇/tname:main/tid:1][coroutine:unnamed] Stopping server
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] --------------------------------------------------------------------------------
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] Example where the server has time to finish:
[elapsed: 1087ms][🥇/tname:main/tid:1][coroutine:unnamed] Starting server
[elapsed: 1088ms][⓶/tname:DefaultDispatcher-worker-1/tid:20][coroutine:ServerWork-1] Running server work in thread: DefaultDispatcher-worker-1
[elapsed: 1088ms][â“·/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] Running additional server work in thread: DefaultDispatcher-worker-2
[elapsed: 2594ms][â“·/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-2] ServerWork-2  completed
[elapsed: 3093ms][â“·/tname:DefaultDispatcher-worker-2/tid:21][coroutine:ServerWork-1] ServerWork-1 completed
[elapsed: 4089ms][🥇/tname:main/tid:1][coroutine:unnamed] Stopping server
Cannot re-use scope

https://chatgpt.com/share/31be55da-2e7f-4dc8-9fc3-000384f58086

Sample - 1
import kotlinx.coroutines.*

fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default)

    val job = scope.launch {
        println("Job started")
        delay(1000)
        println("Job completed")
    }

    delay(500)  // Wait for a while
    scope.cancel()  // Cancel the scope

    // Attempt to launch a new job in the cancelled scope
    val newJob = scope.launch {
        println("New job started")
        delay(1000)
        println("New job completed")
    }

    newJob.invokeOnCompletion { exception ->
        if (exception is CancellationException) {
            println("New job was cancelled due to scope cancellation")
        } else if (exception != null) {
            println("New job completed with exception: $exception")
        } else {
            println("New job completed successfully")
        }
    }

    // Wait for all jobs to complete
    joinAll(job, newJob)
}
Sample - 2
import kotlinx.coroutines.*

fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default)

    val job = scope.launch {
        println("Job started")
        delay(1000)
        println("Job completed")
    }

    delay(500)  // Wait for a while
    scope.cancel()  // Cancel the scope

    // Create a new scope and launch a new job
    val newScope = CoroutineScope(Dispatchers.Default)
    val newJob = newScope.launch {
        println("New job started")
        delay(1000)
        println("New job completed")
    }

    // Wait for all jobs to complete
    joinAll(job, newJob)
}

graph TD; subgraph CoroutineScope A[coroutineContext] -->|Has a| B(Parent Job) A -->|Has a| C[Dispatcher] end B -->|Parent of| J1[Child Job 1] B -->|Parent of| J2[Child Job 2] B -->|Parent of| J3[Child Job 3] J1 -->|Manages| D1[Coroutine 1] J2 -->|Manages| D2[Coroutine 2] J3 -->|Manages| D3[Coroutine 3] D1 -->|Runs| E1[Task 1] D2 -->|Runs| E2[Task 2] D3 -->|Runs| E3[Task 3] classDef blue fill:#3b82f6,stroke:#ffffff,stroke-width:2px; classDef green fill:#10b981,stroke:#ffffff,stroke-width:2px; classDef red fill:#ef4444,stroke:#ffffff,stroke-width:2px; class A blue; class B green; class D1,D2,D3 red;


'suspend' keyword

Fundamentals of suspend in Kotlin

High-Level Overview

In Kotlin, the suspend keyword is used to mark a function as suspendable, meaning it can be paused and resumed at a later time.

Why Use suspend?

  1. Non-blocking Operations: suspend functions allow you to perform long-running operations (like network requests or disk IO) without blocking the main thread.
  2. Improved Readability: Code using suspend functions can be written in a sequential manner, making it more readable compared to callback-based approaches.
  3. Resource Efficiency: By not blocking threads, coroutines can handle a large number of tasks with a small number of threads, leading to better resource utilization.

How suspend Works

A suspend function can suspend its execution without blocking the thread, and resume execution later. It can only be called from another suspend function or a coroutine.

Examples

Basic running on 'main'

Code

package gt.sandbox

import gt.sandbox.internal.output.Out
import kotlinx.coroutines.*

val out = Out.standard()

suspend fun fetchData(s: String) {
    out.println("Fetching data... $s")
    delay(1000) // This suspends the coroutine for 1 second without blocking the thread
    out.println("Data fetched $s")
}

fun main(): Unit = runBlocking {
    launch {
        fetchData("a-1")
        fetchData("a-2")
    }
    launch {
        fetchData("b")
    }
    launch {
        fetchData("c")
    }
}

Command to reproduce:

gt.sandbox.checkout.commit 14d257c97de10a4 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-06-09T03:40:03.685024Z][ms-elapsed-since-start:   45][tname:main/tid:1] Fetching data... a-1
[2024-06-09T03:40:03.704826Z][ms-elapsed-since-start:   55][tname:main/tid:1] Fetching data... b
[2024-06-09T03:40:03.704975Z][ms-elapsed-since-start:   55][tname:main/tid:1] Fetching data... c
[2024-06-09T03:40:04.708638Z][ms-elapsed-since-start: 1059][tname:main/tid:1] Data fetched a-1
[2024-06-09T03:40:04.709425Z][ms-elapsed-since-start: 1060][tname:main/tid:1] Fetching data... a-2
[2024-06-09T03:40:04.709835Z][ms-elapsed-since-start: 1060][tname:main/tid:1] Data fetched b
[2024-06-09T03:40:04.710102Z][ms-elapsed-since-start: 1061][tname:main/tid:1] Data fetched c
[2024-06-09T03:40:05.715174Z][ms-elapsed-since-start: 2066][tname:main/tid:1] Data fetched a-2

BUILD SUCCESSFUL in 2s
2 actionable tasks: 2 executed

With Dispatchers.IO

Simple dispatcher IO

About

Here we show example of simple usage of dispatcher io.

We can see that different OS threads are being used.

Code

package gt.sandbox

import gt.sandbox.internal.output.Out
import kotlinx.coroutines.*

val out = Out.standard()

suspend fun fetchData(s: String) {
    out.println("Fetching data... $s")
    delay(1000) // This suspends the coroutine for 1 second without blocking the thread
    out.println("Data fetched $s")
}

fun main(): Unit = runBlocking {
    launch(Dispatchers.IO) {
        fetchData("a-1")
        fetchData("a-2")
    }
    launch(Dispatchers.IO) {
        fetchData("b")
    }
    launch(Dispatchers.IO) {
        fetchData("c")
    }
}

Command to reproduce:

gt.sandbox.checkout.commit 1fc85e0 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-06-09T04:18:21.291646Z][ms-elapsed-since-start:   50][tname:DefaultDispatcher-worker-2/tid:22] Fetching data... c
[2024-06-09T04:18:21.291456Z][ms-elapsed-since-start:   50][tname:DefaultDispatcher-worker-1/tid:21] Fetching data... a-1
[2024-06-09T04:18:21.291454Z][ms-elapsed-since-start:   50][tname:DefaultDispatcher-worker-3/tid:23] Fetching data... b
[2024-06-09T04:18:22.323347Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-2/tid:22] Data fetched c
[2024-06-09T04:18:22.323736Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-3/tid:23] Data fetched a-1
[2024-06-09T04:18:22.323347Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-1/tid:21] Data fetched b
[2024-06-09T04:18:22.324494Z][ms-elapsed-since-start: 1072][tname:DefaultDispatcher-worker-3/tid:23] Fetching data... a-2
[2024-06-09T04:18:23.325568Z][ms-elapsed-since-start: 2073][tname:DefaultDispatcher-worker-3/tid:23] Data fetched a-2

BUILD SUCCESSFUL in 2s
2 actionable tasks: 2 executed

Increasing beyond double number of cores

About

In this exame we illustrate the Dispatchers.IO re-using the same OS thread for 2 different executions.

This is illustrated in these lines of output:


[2024-06-09T04:23:29.653692Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 19
[2024-06-09T04:23:29.653867Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 22

Code

package gt.sandbox

import gt.sandbox.internal.output.Out
import kotlinx.coroutines.*

val out = Out.standard()

suspend fun fetchData(s: String) {
    out.println("Fetching data... $s")
    delay(1000) // This suspends the coroutine for 1 second without blocking the thread
    out.println("Data fetched $s")
}

fun main(): Unit = runBlocking {
    val availableCores = Runtime.getRuntime().availableProcessors()
    out.println("Number of available cores: $availableCores")

    for (i in 1..(availableCores* 2) + 2) {
        launch(Dispatchers.IO) {
            fetchData("task $i")
        }
    }
}

Command to reproduce:

gt.sandbox.checkout.commit 95c0ae2 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-06-09T04:23:29.630413Z][ms-elapsed-since-start:   47][tname:main/tid:1] Number of available cores: 10
[2024-06-09T04:23:29.651359Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-1/tid:21] Fetching data... task 1
[2024-06-09T04:23:29.651570Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-3/tid:23] Fetching data... task 2
[2024-06-09T04:23:29.651777Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-5/tid:25] Fetching data... task 3
[2024-06-09T04:23:29.651815Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-2/tid:22] Fetching data... task 4
[2024-06-09T04:23:29.651846Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-4/tid:24] Fetching data... task 5
[2024-06-09T04:23:29.652016Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-8/tid:28] Fetching data... task 6
[2024-06-09T04:23:29.652160Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-7/tid:27] Fetching data... task 7
[2024-06-09T04:23:29.652243Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-6/tid:26] Fetching data... task 8
[2024-06-09T04:23:29.652418Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-11/tid:31] Fetching data... task 9
[2024-06-09T04:23:29.652762Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-10/tid:30] Fetching data... task 10
[2024-06-09T04:23:29.652811Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-14/tid:34] Fetching data... task 11
[2024-06-09T04:23:29.652945Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-13/tid:33] Fetching data... task 12
[2024-06-09T04:23:29.653149Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-18/tid:38] Fetching data... task 13
[2024-06-09T04:23:29.653555Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-9/tid:29] Fetching data... task 15
[2024-06-09T04:23:29.653556Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 14
[2024-06-09T04:23:29.653623Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-7/tid:27] Fetching data... task 16
[2024-06-09T04:23:29.653651Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-9/tid:29] Fetching data... task 17
[2024-06-09T04:23:29.653676Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-13/tid:33] Fetching data... task 18
[2024-06-09T04:23:29.653717Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-7/tid:27] Fetching data... task 20
[2024-06-09T04:23:29.653692Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 19
[2024-06-09T04:23:29.653867Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 22
[2024-06-09T04:23:29.653732Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-9/tid:29] Fetching data... task 21
[2024-06-09T04:23:30.661370Z][ms-elapsed-since-start: 1069][tname:DefaultDispatcher-worker-17/tid:37] Data fetched task 7
[2024-06-09T04:23:30.661588Z][ms-elapsed-since-start: 1069][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 4
[2024-06-09T04:23:30.662263Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 8
[2024-06-09T04:23:30.661870Z][ms-elapsed-since-start: 1069][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 2
[2024-06-09T04:23:30.662439Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 3
[2024-06-09T04:23:30.662540Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 6
[2024-06-09T04:23:30.662606Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 1
[2024-06-09T04:23:30.662685Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 10
[2024-06-09T04:23:30.662824Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 11
[2024-06-09T04:23:30.662870Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-24/tid:45] Data fetched task 13
[2024-06-09T04:23:30.662972Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 15
[2024-06-09T04:23:30.663027Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-24/tid:45] Data fetched task 14
[2024-06-09T04:23:30.661376Z][ms-elapsed-since-start: 1069][tname:DefaultDispatcher-worker-18/tid:38] Data fetched task 9
[2024-06-09T04:23:30.663206Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-24/tid:45] Data fetched task 17
[2024-06-09T04:23:30.663273Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-18/tid:38] Data fetched task 18
[2024-06-09T04:23:30.663533Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-6/tid:26] Data fetched task 19
[2024-06-09T04:23:30.663124Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 16
[2024-06-09T04:23:30.662265Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-17/tid:37] Data fetched task 5
[2024-06-09T04:23:30.662858Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 12
[2024-06-09T04:23:30.663355Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-24/tid:45] Data fetched task 20
[2024-06-09T04:23:30.663540Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-18/tid:38] Data fetched task 22
[2024-06-09T04:23:30.663689Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-6/tid:26] Data fetched task 21

BUILD SUCCESSFUL in 1s
2 actionable tasks: 2 executed

Relationships


Children
  1. Cancellation
  2. Channel
  3. Commandments
  4. Continuation
  5. Coroutine Builders
  6. CoroutineScope
  7. Eg
  8. Gotchas
  9. Kotlin Coroutines Intro
  10. Ref
  11. Suspend Kotlin Examples
  12. VS Thread