Even with Async Await Io Dispatcher Not Using up All Cores

Running below code example with withContext(Dispatchers.IO) did not use expected number of threads (expected was something aligned with CPU cores, actual was just 1).

Code Example
/*
 * This Kotlin source file was generated by the Gradle 'init' task.
 */
package gt.kotlin.sandbox

import kotlinx.coroutines.*
import java.util.stream.IntStream


suspend fun performLongRequest(msg: String): String {
    return withContext(Dispatchers.IO) {

        ThreadUtils.sleep(500)
        ThreadUtils.printWithThreadInfo("Within subroutine input: $msg")

        String.format("MessageResultAfterBlockingOperation for [%s]", msg)
    }
}

fun main() {

    ThreadUtils.printWithThreadInfo("Example using async with more requests than cores on a laptop")

    val mainMillisStamp = System.currentTimeMillis();


    runBlocking {

        val deferredObjects = IntStream.range(0, 8).mapToObj { i ->
            val deferred = async { performLongRequest("${i}-request") }
            deferred
        }

        for(deferred in deferredObjects) {
            ThreadUtils.printWithThreadInfo("Deferred result: " + deferred.await())
        }

        ThreadUtils.printWithThreadInfo(
            "Total time taken: " +
                    (System.currentTimeMillis() - mainMillisStamp) + "ms"
        )
    }
}
To reproduce

Command to reproduce:

gt.sandbox.checkout.commit 185b709 \
&& cd "${GT_SANDBOX_REPO}/gt-kotlin-sandbox" \
&& cmd.run.announce "gradle run"

Recorded output of command:

> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2023-03-28 08:44:19.828][main-1][35ms] Example using async with more requests than cores on a laptop
[2023-03-28 08:44:20.418][DefaultDispatcher-worker-1-20][591ms] Within subroutine input: 0-request
[2023-03-28 08:44:20.435][main-1][16ms] Deferred result: MessageResultAfterBlockingOperation for [0-request]
[2023-03-28 08:44:20.938][DefaultDispatcher-worker-1-20][503ms] Within subroutine input: 1-request
[2023-03-28 08:44:20.939][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [1-request]
[2023-03-28 08:44:21.443][DefaultDispatcher-worker-1-20][504ms] Within subroutine input: 2-request
[2023-03-28 08:44:21.444][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [2-request]
[2023-03-28 08:44:21.945][DefaultDispatcher-worker-1-20][501ms] Within subroutine input: 3-request
[2023-03-28 08:44:21.946][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [3-request]
[2023-03-28 08:44:22.451][DefaultDispatcher-worker-1-20][505ms] Within subroutine input: 4-request
[2023-03-28 08:44:22.452][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [4-request]
[2023-03-28 08:44:22.953][DefaultDispatcher-worker-1-20][501ms] Within subroutine input: 5-request
[2023-03-28 08:44:22.954][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [5-request]
[2023-03-28 08:44:23.455][DefaultDispatcher-worker-1-20][501ms] Within subroutine input: 6-request
[2023-03-28 08:44:23.456][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [6-request]
[2023-03-28 08:44:23.959][DefaultDispatcher-worker-1-20][503ms] Within subroutine input: 7-request
[2023-03-28 08:44:23.960][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [7-request]
[2023-03-28 08:44:23.960][main-1][0ms] Total time taken: 4131ms

BUILD SUCCESSFUL in 4s
2 actionable tasks: 1 executed, 1 up-to-date

Similar behavior can be seen with custom thread pool of 8 threads.

Code Example
package gt.kotlin.sandbox

import kotlinx.coroutines.*
import java.util.concurrent.Executors
import java.util.stream.IntStream


val threadPool = Executors.newFixedThreadPool(8)
val myDispatcher = threadPool.asCoroutineDispatcher()

suspend fun performLongRequest(msg: String): String {
    return withContext(myDispatcher) {
        ThreadUtils.printWithThreadInfo("Within subroutine (before sleep) input: $msg")

        ThreadUtils.sleep(500)

        String.format("MessageResultAfterBlockingOperation for [%s]", msg)
    }
}

fun main() {

    ThreadUtils.printWithThreadInfo("Example using async with more requests than cores on a laptop")

    val mainMillisStamp = System.currentTimeMillis();


    runBlocking {

        val deferredObjects = IntStream.range(0, 8).mapToObj { i ->
            val deferred = async { performLongRequest("${i}-request") }
            deferred
        }

        for(deferred in deferredObjects) {
            ThreadUtils.printWithThreadInfo("Deferred result: " + deferred.await())
        }

        ThreadUtils.printWithThreadInfo(
            "Total time taken: " +
                    (System.currentTimeMillis() - mainMillisStamp) + "ms"
        )
    }

    myDispatcher.close();
    threadPool.shutdown();
}
To Reproduce ## Command to reproduce: ```bash gt.sandbox.checkout.commit 8ee7ffb \ && cd "${GT_SANDBOX_REPO}/gt-kotlin-sandbox" \ && cmd.run.announce "gradle run" ```

Recorded output of command:

> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2023-03-28 08:59:33.336][main-1][33ms] Example using async with more requests than cores on a laptop
[2023-03-28 08:59:33.403][pool-1-thread-1-20][67ms] Within subroutine (before sleep) input: 0-request
[2023-03-28 08:59:33.921][main-1][518ms] Deferred result: MessageResultAfterBlockingOperation for [0-request]
[2023-03-28 08:59:33.922][pool-1-thread-2-21][1ms] Within subroutine (before sleep) input: 1-request
[2023-03-28 08:59:34.426][main-1][504ms] Deferred result: MessageResultAfterBlockingOperation for [1-request]
[2023-03-28 08:59:34.427][pool-1-thread-3-22][1ms] Within subroutine (before sleep) input: 2-request
[2023-03-28 08:59:34.930][main-1][503ms] Deferred result: MessageResultAfterBlockingOperation for [2-request]
[2023-03-28 08:59:34.931][pool-1-thread-4-23][1ms] Within subroutine (before sleep) input: 3-request
[2023-03-28 08:59:35.436][main-1][506ms] Deferred result: MessageResultAfterBlockingOperation for [3-request]
[2023-03-28 08:59:35.438][pool-1-thread-5-24][1ms] Within subroutine (before sleep) input: 4-request
[2023-03-28 08:59:35.941][main-1][503ms] Deferred result: MessageResultAfterBlockingOperation for [4-request]
[2023-03-28 08:59:35.942][pool-1-thread-6-25][1ms] Within subroutine (before sleep) input: 5-request
[2023-03-28 08:59:36.446][main-1][504ms] Deferred result: MessageResultAfterBlockingOperation for [5-request]
[2023-03-28 08:59:36.447][pool-1-thread-7-26][1ms] Within subroutine (before sleep) input: 6-request
[2023-03-28 08:59:36.951][main-1][504ms] Deferred result: MessageResultAfterBlockingOperation for [6-request]
[2023-03-28 08:59:36.952][pool-1-thread-8-27][1ms] Within subroutine (before sleep) input: 7-request
[2023-03-28 08:59:37.456][main-1][504ms] Deferred result: MessageResultAfterBlockingOperation for [7-request]
[2023-03-28 08:59:37.457][main-1][1ms] Total time taken: 4120ms

BUILD SUCCESSFUL in 4s
2 actionable tasks: 1 executed, 1 up-to-date

The reason is not the lack of threads but using it with streams.

It appears Kotlin ends up streaming into the for loop and we end up awaiting prior to asyncing onto the next operation.

        val deferredObjects = IntStream.range(0, 8).mapToObj { i ->
            val deferred = async { performLongRequest("${i}-request") }
            deferred
        }

        for(deferred in deferredObjects) {
            ThreadUtils.printWithThreadInfo("Deferred result: " + deferred.await())
        }

A way to fix this is to collect async piece of code prior to moving on the for loop with await.

   val deferredObjects = IntStream.range(0, 8).mapToObj { i ->
            val deferred = async { performLongRequest("${i}-request") }
            deferred
        }.collect(toList())

        for(deferred in deferredObjects) {
            ThreadUtils.printWithThreadInfo("Deferred result: " + deferred.await())
        }
Code
package gt.kotlin.sandbox

import kotlinx.coroutines.*
import java.util.concurrent.Executors
import java.util.stream.Collectors.toList
import java.util.stream.IntStream


val threadPool = Executors.newFixedThreadPool(8)
val myDispatcher = threadPool.asCoroutineDispatcher()

suspend fun performLongRequest(msg: String): String {
    return withContext(myDispatcher) {
        ThreadUtils.printWithThreadInfo("Within subroutine (before sleep) input: $msg")

        ThreadUtils.sleep(500)

        String.format("MessageResultAfterBlockingOperation for [%s]", msg)
    }
}

fun main() {

    ThreadUtils.printWithThreadInfo("Example using async with more requests than cores on a laptop")

    val mainMillisStamp = System.currentTimeMillis();


    runBlocking {

        val deferredObjects = IntStream.range(0, 8).mapToObj { i ->
            val deferred = async { performLongRequest("${i}-request") }
            deferred
        }.collect(toList())

        for(deferred in deferredObjects) {
            ThreadUtils.printWithThreadInfo("Deferred result: " + deferred.await())
        }

        ThreadUtils.printWithThreadInfo(
            "Total time taken: " +
                    (System.currentTimeMillis() - mainMillisStamp) + "ms"
        )
    }

    myDispatcher.close();
    threadPool.shutdown();
}
To Reproduce

Command to reproduce:

gt.sandbox.checkout.commit b38048c \
&& cd "${GT_SANDBOX_REPO}/gt-kotlin-sandbox" \
&& cmd.run.announce "gradle run"

Recorded output of command:

> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2023-03-28 09:04:31.973][main-1][41ms] Example using async with more requests than cores on a laptop
[2023-03-28 09:04:32.056][pool-1-thread-1-20][83ms] Within subroutine (before sleep) input: 0-request
[2023-03-28 09:04:32.056][pool-1-thread-2-21][83ms] Within subroutine (before sleep) input: 1-request
[2023-03-28 09:04:32.056][pool-1-thread-3-22][0ms] Within subroutine (before sleep) input: 2-request
[2023-03-28 09:04:32.056][pool-1-thread-4-23][0ms] Within subroutine (before sleep) input: 3-request
[2023-03-28 09:04:32.057][pool-1-thread-5-24][1ms] Within subroutine (before sleep) input: 4-request
[2023-03-28 09:04:32.057][pool-1-thread-6-25][0ms] Within subroutine (before sleep) input: 5-request
[2023-03-28 09:04:32.057][pool-1-thread-7-26][0ms] Within subroutine (before sleep) input: 6-request
[2023-03-28 09:04:32.057][pool-1-thread-8-27][0ms] Within subroutine (before sleep) input: 7-request
[2023-03-28 09:04:32.569][main-1][512ms] Deferred result: MessageResultAfterBlockingOperation for [0-request]
[2023-03-28 09:04:32.569][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [1-request]
[2023-03-28 09:04:32.570][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [2-request]
[2023-03-28 09:04:32.570][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [3-request]
[2023-03-28 09:04:32.570][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [4-request]
[2023-03-28 09:04:32.570][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [5-request]
[2023-03-28 09:04:32.570][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [6-request]
[2023-03-28 09:04:32.570][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [7-request]
[2023-03-28 09:04:32.570][main-1][0ms] Total time taken: 597ms

BUILD SUCCESSFUL in 1s
2 actionable tasks: 1 executed, 1 up-to-date

With toList() collection our original code starts to work as expected:

Code
package gt.kotlin.sandbox

import kotlinx.coroutines.*
import java.util.stream.Collectors.toList
import java.util.stream.IntStream


suspend fun performLongRequest(msg: String): String {
    return withContext(Dispatchers.IO) {
        ThreadUtils.printWithThreadInfo("Within subroutine (before sleep) input: $msg")

        ThreadUtils.sleep(500)

        String.format("MessageResultAfterBlockingOperation for [%s]", msg)
    }
}

fun main() {

    ThreadUtils.printWithThreadInfo("Example using async with more requests than cores on a laptop")

    val mainMillisStamp = System.currentTimeMillis();


    runBlocking {

        val deferredObjects = IntStream.range(0, 8).mapToObj { i ->
            val deferred = async { performLongRequest("${i}-request") }
            deferred
        }.collect(toList())

        for(deferred in deferredObjects) {
            ThreadUtils.printWithThreadInfo("Deferred result: " + deferred.await())
        }

        ThreadUtils.printWithThreadInfo(
            "Total time taken: " +
                    (System.currentTimeMillis() - mainMillisStamp) + "ms"
        )
    }

}
To Reproduce

Command to reproduce:

gt.sandbox.checkout.commit 9ee0788 \
&& cd "${GT_SANDBOX_REPO}/gt-kotlin-sandbox" \
&& cmd.run.announce "gradle run"

Recorded output of command:

> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2023-03-28 09:06:22.936][main-1][41ms] Example using async with more requests than cores on a laptop
[2023-03-28 09:06:23.034][DefaultDispatcher-worker-1-20][98ms] Within subroutine (before sleep) input: 0-request
[2023-03-28 09:06:23.035][DefaultDispatcher-worker-2-21][1ms] Within subroutine (before sleep) input: 1-request
[2023-03-28 09:06:23.035][DefaultDispatcher-worker-5-24][0ms] Within subroutine (before sleep) input: 2-request
[2023-03-28 09:06:23.035][DefaultDispatcher-worker-8-27][0ms] Within subroutine (before sleep) input: 3-request
[2023-03-28 09:06:23.035][DefaultDispatcher-worker-4-23][0ms] Within subroutine (before sleep) input: 4-request
[2023-03-28 09:06:23.035][DefaultDispatcher-worker-9-28][0ms] Within subroutine (before sleep) input: 5-request
[2023-03-28 09:06:23.035][DefaultDispatcher-worker-7-26][0ms] Within subroutine (before sleep) input: 6-request
[2023-03-28 09:06:23.035][DefaultDispatcher-worker-3-22][0ms] Within subroutine (before sleep) input: 7-request
[2023-03-28 09:06:23.548][main-1][513ms] Deferred result: MessageResultAfterBlockingOperation for [0-request]
[2023-03-28 09:06:23.548][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [1-request]
[2023-03-28 09:06:23.548][main-1][1ms] Deferred result: MessageResultAfterBlockingOperation for [2-request]
[2023-03-28 09:06:23.549][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [3-request]
[2023-03-28 09:06:23.549][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [4-request]
[2023-03-28 09:06:23.549][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [5-request]
[2023-03-28 09:06:23.549][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [6-request]
[2023-03-28 09:06:23.549][main-1][0ms] Deferred result: MessageResultAfterBlockingOperation for [7-request]
[2023-03-28 09:06:23.549][main-1][0ms] Total time taken: 612ms

BUILD SUCCESSFUL in 1s
2 actionable tasks: 1 executed, 1 up-to-date