Dispatcher

A Dispatcher determines which thread or thread pool executes coroutines, controlling the execution context for concurrent operations.

Built-in Dispatchers

Dispatchers.Default

  • CPU-intensive work (calculations, data processing)
  • Thread pool size equals CPU cores

Questions that come up

Should we limit max concurrency by one less thread for heavy workloads to leave some for UI?

Should we limit the concurrency of Dispatchers.Default by creating a limitedParallelism dispatcher?

// Like this:
val maxConcurrency = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
val limitedDispatcher = dispatcher.limitedParallelism(maxConcurrency)

Why would we want to limitedParallelism? Is due to the following question: if you use up all threads will operating system (OS) create cooperative environment for other processes that are running?.

Experiment aimed to roughly answer this

Experiment: load up CPU and try to use other apps in the meantime.

When I tried this on 16-core/32-thread AMD Ryzen 9 7945HX on Linux it seems at least Linux OS was creating a cooperative environment between the threads to not show perceivable slow down in other processes while the CPU was shown to be running at 100% per htop. Hence, as of now I don't think it's worth artificially lowering the parallelism and rather let OS take care of scheduling threads.

img

GT-Sandbox-Snapshot: example loading up all the CPU cores for 30 seconds doing computation in a loop

Code

package com.glassthought.sandbox

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

fun main(): Unit = runBlocking(CoroutineName("main-runBlocking")) {
  val availableProcessors = Runtime.getRuntime().availableProcessors()

  println("Starting CPU stress test on $availableProcessors cores...")
  println("This will run for 30 seconds. System may become less responsive.")

  val duration = 30_000L // 30 seconds in milliseconds

  val elapsedTime = measureTimeMillis {
    // Create a list of jobs, one for each processor
    val jobs = List(availableProcessors) { coreIndex ->
      // Use Dispatchers.Default which has a thread pool sized to CPU cores
      // Each coroutine will pin to a thread and consume CPU
      launch(Dispatchers.Default + CoroutineName("cpu-loader-$coreIndex")) {
        println("Core $coreIndex: Starting intensive computation...")

        val startTime = System.currentTimeMillis()
        var counter = 0L
        var sum = 0.0

        // Tight loop that runs for 30 seconds
        while (System.currentTimeMillis() - startTime < duration) {
          // Perform some CPU-intensive operations
          counter++
          sum += Math.sqrt(counter.toDouble())

          // Add some more operations to ensure CPU usage
          if (counter % 1000000 == 0L) {
            // Occasionally perform more complex calculations
            for (i in 1..100) {
              sum += Math.sin(sum) * Math.cos(counter.toDouble())
            }
          }

          // Check for cancellation periodically (every million iterations)
          if (counter % 10000000 == 0L) {
            yield() // Allow coroutine cancellation if needed
          }
        }

        println("Core $coreIndex: Completed. Counter=$counter, Sum=$sum")
      }
    }

    // Wait for all jobs to complete
    jobs.joinAll()
  }

  println("\nCPU stress test completed in ${elapsedTime}ms")
  println("All $availableProcessors cores were loaded for approximately 30 seconds")
}

Command to reproduce:

gt.sandbox.checkout.commit 552c189a3d508ae510c9 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

Picked up JAVA_TOOL_OPTIONS: -Dkotlinx.coroutines.debug
Picked up JAVA_TOOL_OPTIONS: -Dkotlinx.coroutines.debug
Starting CPU stress test on 32 cores...
This will run for 30 seconds. System may become less responsive.
Core 0: Starting intensive computation...
Core 1: Starting intensive computation...
Core 2: Starting intensive computation...
Core 3: Starting intensive computation...
Core 4: Starting intensive computation...
Core 5: Starting intensive computation...
Core 6: Starting intensive computation...
Core 7: Starting intensive computation...
Core 8: Starting intensive computation...
Core 9: Starting intensive computation...
Core 10: Starting intensive computation...
Core 11: Starting intensive computation...
Core 12: Starting intensive computation...
Core 13: Starting intensive computation...
Core 14: Starting intensive computation...
Core 15: Starting intensive computation...
Core 16: Starting intensive computation...
Core 17: Starting intensive computation...
Core 18: Starting intensive computation...
Core 19: Starting intensive computation...
Core 20: Starting intensive computation...
Core 21: Starting intensive computation...
Core 22: Starting intensive computation...
Core 23: Starting intensive computation...
Core 24: Starting intensive computation...
Core 25: Starting intensive computation...
Core 26: Starting intensive computation...
Core 27: Starting intensive computation...
Core 29: Starting intensive computation...
Core 28: Starting intensive computation...
Core 30: Starting intensive computation...
Core 31: Starting intensive computation...
Core 3: Completed. Counter=1209229201, Sum=2.8033135773977207E13
Core 11: Completed. Counter=1211769364, Sum=2.8121513705789645E13
Core 8: Completed. Counter=1199590272, Sum=2.7698620755711055E13
Core 2: Completed. Counter=1203021176, Sum=2.7817535389470645E13
Core 13: Completed. Counter=1190825648, Sum=2.7395612274237375E13
Core 31: Completed. Counter=1218795961, Sum=2.83664671475123E13
Core 21: Completed. Counter=1212393233, Sum=2.814323367875016E13
Core 19: Completed. Counter=1204196135, Sum=2.785829831628206E13
Core 12: Completed. Counter=1223621706, Sum=2.8535106632545105E13
Core 20: Completed. Counter=1223951172, Sum=2.854663222930776E13
Core 26: Completed. Counter=1209303550, Sum=2.803572122383962E13
Core 29: Completed. Counter=1225677066, Sum=2.8607033934887254E13
Core 0: Completed. Counter=1222882977, Sum=2.8509269567738004E13
Core 23: Completed. Counter=1209603905, Sum=2.8046166730119613E13
Core 24: Completed. Counter=1189998737, Sum=2.7367081901189492E13
Core 9: Completed. Counter=1215672477, Sum=2.8257492284361145E13
Core 25: Completed. Counter=996223257, Sum=2.0962532819551504E13
Core 30: Completed. Counter=1199697911, Sum=2.770234892705607E13
Core 5: Completed. Counter=1214359549, Sum=2.821172744882745E13
Core 28: Completed. Counter=1206475954, Sum=2.7937448955144168E13
Core 18: Completed. Counter=1213988582, Sum=2.8198801103505355E13
Core 4: Completed. Counter=1208161491, Sum=2.7996015452241723E13
Core 7: Completed. Counter=1173755242, Sum=2.6808657016430223E13
Core 22: Completed. Counter=1193089284, Sum=2.7473763708917047E13
Core 16: Completed. Counter=1216669588, Sum=2.8292265177977305E13
Core 6: Completed. Counter=1214497392, Sum=2.8216531091415805E13
Core 14: Completed. Counter=1202774004, Sum=2.7808962768959176E13
Core 27: Completed. Counter=1213167406, Sum=2.817019424997135E13
Core 17: Completed. Counter=1204046012, Sum=2.785308898096554E13
Core 1: Completed. Counter=1208010559, Sum=2.7990769428434086E13
Core 15: Completed. Counter=1215303167, Sum=2.8244616717065184E13
Core 10: Completed. Counter=1215761769, Sum=2.8260605640605434E13

CPU stress test completed in 30043ms
All 32 cores were loaded for approximately 30 seconds

Note: Would be interesting to try this out on Mac & Windows later.

Dispatchers.IO

  • Optimized for I/O operations (file, network, database)
  • Backed by shared thread pool that grows as needed

Dispatchers.Main

  • UI thread dispatcher (Android main thread, JavaFX Application Thread)
  • Use for UI updates and lightweight operations
Dispatchers.Unconfined

Dispatchers.Unconfined

  • Starts in caller thread, resumes in any thread
  • In general should NOT be used.

Usage

// Switch context
withContext(Dispatchers.IO) {
    // Network call or file operation
}

// Launch with specific dispatcher
launch(Dispatchers.Default) {
    // CPU-intensive work
}

Key Points

  • Dispatchers are inherited by child coroutines
  • Use withContext() to switch dispatchers temporarily
  • Choose dispatcher based on operation type, not arbitrary preference
  • Main dispatcher requires platform-specific dependency

Relationships

  • Uses: Thread pools for execution
  • Enables: Context switching with withContext()
  • Inherited by: Child coroutines

Examples

With Dispatchers.IO

Simple dispatcher IO

About

Here we show example of simple usage of dispatcher io.

We can see that different OS threads are being used.

Code

package gt.sandbox

import gt.sandbox.internal.output.Out
import kotlinx.coroutines.*

val out = Out.standard()

suspend fun fetchData(s: String) {
    out.println("Fetching data... $s")
    delay(1000) // This suspends the coroutine for 1 second without blocking the thread
    out.println("Data fetched $s")
}

fun main(): Unit = runBlocking {
    launch(Dispatchers.IO) {
        fetchData("a-1")
        fetchData("a-2")
    }
    launch(Dispatchers.IO) {
        fetchData("b")
    }
    launch(Dispatchers.IO) {
        fetchData("c")
    }
}

Command to reproduce:

gt.sandbox.checkout.commit 1fc85e0 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-06-09T04:18:21.291646Z][ms-elapsed-since-start:   50][tname:DefaultDispatcher-worker-2/tid:22] Fetching data... c
[2024-06-09T04:18:21.291456Z][ms-elapsed-since-start:   50][tname:DefaultDispatcher-worker-1/tid:21] Fetching data... a-1
[2024-06-09T04:18:21.291454Z][ms-elapsed-since-start:   50][tname:DefaultDispatcher-worker-3/tid:23] Fetching data... b
[2024-06-09T04:18:22.323347Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-2/tid:22] Data fetched c
[2024-06-09T04:18:22.323736Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-3/tid:23] Data fetched a-1
[2024-06-09T04:18:22.323347Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-1/tid:21] Data fetched b
[2024-06-09T04:18:22.324494Z][ms-elapsed-since-start: 1072][tname:DefaultDispatcher-worker-3/tid:23] Fetching data... a-2
[2024-06-09T04:18:23.325568Z][ms-elapsed-since-start: 2073][tname:DefaultDispatcher-worker-3/tid:23] Data fetched a-2

BUILD SUCCESSFUL in 2s
2 actionable tasks: 2 executed

Increasing beyond double number of cores

About

In this exame we illustrate the Dispatchers.IO re-using the same OS thread for 2 different executions.

This is illustrated in these lines of output:


[2024-06-09T04:23:29.653692Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 19
[2024-06-09T04:23:29.653867Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 22

Code

package gt.sandbox

import gt.sandbox.internal.output.Out
import kotlinx.coroutines.*

val out = Out.standard()

suspend fun fetchData(s: String) {
    out.println("Fetching data... $s")
    delay(1000) // This suspends the coroutine for 1 second without blocking the thread
    out.println("Data fetched $s")
}

fun main(): Unit = runBlocking {
    val availableCores = Runtime.getRuntime().availableProcessors()
    out.println("Number of available cores: $availableCores")

    for (i in 1..(availableCores* 2) + 2) {
        launch(Dispatchers.IO) {
            fetchData("task $i")
        }
    }
}

Command to reproduce:

gt.sandbox.checkout.commit 95c0ae2 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-06-09T04:23:29.630413Z][ms-elapsed-since-start:   47][tname:main/tid:1] Number of available cores: 10
[2024-06-09T04:23:29.651359Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-1/tid:21] Fetching data... task 1
[2024-06-09T04:23:29.651570Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-3/tid:23] Fetching data... task 2
[2024-06-09T04:23:29.651777Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-5/tid:25] Fetching data... task 3
[2024-06-09T04:23:29.651815Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-2/tid:22] Fetching data... task 4
[2024-06-09T04:23:29.651846Z][ms-elapsed-since-start:   59][tname:DefaultDispatcher-worker-4/tid:24] Fetching data... task 5
[2024-06-09T04:23:29.652016Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-8/tid:28] Fetching data... task 6
[2024-06-09T04:23:29.652160Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-7/tid:27] Fetching data... task 7
[2024-06-09T04:23:29.652243Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-6/tid:26] Fetching data... task 8
[2024-06-09T04:23:29.652418Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-11/tid:31] Fetching data... task 9
[2024-06-09T04:23:29.652762Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-10/tid:30] Fetching data... task 10
[2024-06-09T04:23:29.652811Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-14/tid:34] Fetching data... task 11
[2024-06-09T04:23:29.652945Z][ms-elapsed-since-start:   60][tname:DefaultDispatcher-worker-13/tid:33] Fetching data... task 12
[2024-06-09T04:23:29.653149Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-18/tid:38] Fetching data... task 13
[2024-06-09T04:23:29.653555Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-9/tid:29] Fetching data... task 15
[2024-06-09T04:23:29.653556Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 14
[2024-06-09T04:23:29.653623Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-7/tid:27] Fetching data... task 16
[2024-06-09T04:23:29.653651Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-9/tid:29] Fetching data... task 17
[2024-06-09T04:23:29.653676Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-13/tid:33] Fetching data... task 18
[2024-06-09T04:23:29.653717Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-7/tid:27] Fetching data... task 20
[2024-06-09T04:23:29.653692Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 19
[2024-06-09T04:23:29.653867Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-21/tid:41] Fetching data... task 22
[2024-06-09T04:23:29.653732Z][ms-elapsed-since-start:   61][tname:DefaultDispatcher-worker-9/tid:29] Fetching data... task 21
[2024-06-09T04:23:30.661370Z][ms-elapsed-since-start: 1069][tname:DefaultDispatcher-worker-17/tid:37] Data fetched task 7
[2024-06-09T04:23:30.661588Z][ms-elapsed-since-start: 1069][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 4
[2024-06-09T04:23:30.662263Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 8
[2024-06-09T04:23:30.661870Z][ms-elapsed-since-start: 1069][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 2
[2024-06-09T04:23:30.662439Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 3
[2024-06-09T04:23:30.662540Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 6
[2024-06-09T04:23:30.662606Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 1
[2024-06-09T04:23:30.662685Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 10
[2024-06-09T04:23:30.662824Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 11
[2024-06-09T04:23:30.662870Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-24/tid:45] Data fetched task 13
[2024-06-09T04:23:30.662972Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 15
[2024-06-09T04:23:30.663027Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-24/tid:45] Data fetched task 14
[2024-06-09T04:23:30.661376Z][ms-elapsed-since-start: 1069][tname:DefaultDispatcher-worker-18/tid:38] Data fetched task 9
[2024-06-09T04:23:30.663206Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-24/tid:45] Data fetched task 17
[2024-06-09T04:23:30.663273Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-18/tid:38] Data fetched task 18
[2024-06-09T04:23:30.663533Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-6/tid:26] Data fetched task 19
[2024-06-09T04:23:30.663124Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-19/tid:39] Data fetched task 16
[2024-06-09T04:23:30.662265Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-17/tid:37] Data fetched task 5
[2024-06-09T04:23:30.662858Z][ms-elapsed-since-start: 1070][tname:DefaultDispatcher-worker-15/tid:35] Data fetched task 12
[2024-06-09T04:23:30.663355Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-24/tid:45] Data fetched task 20
[2024-06-09T04:23:30.663540Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-18/tid:38] Data fetched task 22
[2024-06-09T04:23:30.663689Z][ms-elapsed-since-start: 1071][tname:DefaultDispatcher-worker-6/tid:26] Data fetched task 21

BUILD SUCCESSFUL in 1s
2 actionable tasks: 2 executed


Children
  1. Dispatchers.Default
  2. example

Backlinks