exceptionHandling
As expected the exception is re-thrown when calling get
on the CompletableFuture
that had exception thrown.
GT-Sandbox-Snapshot
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
"Jon Snow"
}
try {
future.get()
} catch (e: Exception) {
out.println("Caught exception: ${e.message}")
}
println("")
}
Command to reproduce:
gt.sandbox.checkout.commit 0daaef350f0970bcddb7 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"
Recorded output of command:
[2024-11-22T06:16:43.526687Z][elapsed-since-start: 41ms][tname:main/tid:1] Caught exception: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block
exceptionally: block that runs when an exception occurs
future
.exceptionally { ex: Throwable? -> fallbackMaker(ex) }
.thenAccept { x: String -> out.println(x) }
GT-Snapshot
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
out.println("supplyAsync")
"Jon Snow"
}
future
.exceptionally { ex: Throwable? -> fallbackMaker(ex) }
.thenAccept { x: String -> out.println(x) }
delay(100)
println()
}
private fun fallbackMaker(ex: Throwable?): String {
return "Fallback Result (${ex?.message})"
}
Command to reproduce:
gt.sandbox.checkout.commit db510bcf58d24150f27d \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-11-22T05:23:27.988318Z][elapsed-since-start: 42ms][tname:main/tid:1] Fallback Result (java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block)
BUILD SUCCESSFUL in 675ms
2 actionable tasks: 1 executed, 1 up-to-date
handle: Handle both success and failure in one callback.
future.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
} else {
return result.toUpperCase();
}
}).thenAccept(System.out::println);
GT-Sandbox-Snapshot
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
out.println("supplyAsync")
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
"Jon Snow"
}
future.handle { result: String, ex: Throwable? ->
if (ex != null) {
return@handle "Error handling: " + ex.message
} else {
return@handle result.uppercase(Locale.getDefault())
}
}.thenApply { x: String -> out.println("thenApply"); x + "!" }
.thenAccept { x: String -> out.println(x) }
delay(100)
println()
}
Command to reproduce:
gt.sandbox.checkout.commit 381b07b215873ab5a216 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-11-22T06:05:54.560567Z][elapsed-since-start: 41ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T06:05:54.579624Z][elapsed-since-start: 51ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T06:05:54.579897Z][elapsed-since-start: 51ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!
BUILD SUCCESSFUL in 901ms
2 actionable tasks: 2 executed
Questions
CompletableFuture.supplyAsync AND handle AND which thread thenAccept runs on. WHY?
GT-Sandbox-Snapshot: thenAccept running on forkJoin
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
// WHEN I PRINT THIS LINE
// WITHIN THE FEATURE
// THEN thenAccept runs on ForkJoinPool.commonPool-worker-1
out.println("supplyAsync")
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
"Jon Snow"
}
future.handle { result: String, ex: Throwable? ->
if (ex != null) {
return@handle "Error handling: " + ex.message
} else {
return@handle result.uppercase(Locale.getDefault())
}
}.thenApply { x: String -> out.println("thenApply"); x + "!" }
.thenAccept { x: String -> out.println("thenAccept: " + x) }
delay(100)
println()
}
Command to reproduce:
gt.sandbox.checkout.commit 136656e43e24fb056c68 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-11-22T06:08:33.435054Z][elapsed-since-start: 40ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T06:08:33.454568Z][elapsed-since-start: 50ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T06:08:33.454811Z][elapsed-since-start: 50ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenAccept: Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!
BUILD SUCCESSFUL in 860ms
2 actionable tasks: 2 executed
GT-Sandbox-Snapshot: thenAccept runs on main thread
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
// WHEN I do NOT PRINT THIS LINE
// WITHIN THE FEATURE
// THEN thenAccept runs on main thread.
// out.println("supplyAsync")
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
"Jon Snow"
}
future.handle { result: String, ex: Throwable? ->
if (ex != null) {
return@handle "Error handling: " + ex.message
} else {
return@handle result.uppercase(Locale.getDefault())
}
}.thenApply { x: String -> out.println("thenApply"); x + "!" }
.thenAccept { x: String -> out.println("thenAccept: " + x) }
delay(100)
println()
}
Command to reproduce:
gt.sandbox.checkout.commit 3e8b27335e729f4a0273 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-11-22T06:09:31.213443Z][elapsed-since-start: 44ms][tname:main/tid:1] thenApply
[2024-11-22T06:09:31.234221Z][elapsed-since-start: 55ms][tname:main/tid:1] thenAccept: Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!
BUILD SUCCESSFUL in 855ms
2 actionable tasks: 2 executed
GPT's 4o Answer
The behavior of thenAccept
running on different threads in your code depends on how tasks are scheduled and executed in CompletableFuture
. Here's an explanation:
High-Level Explanation
-
Thread Pool for
supplyAsync
:
CompletableFuture.supplyAsync
by default uses the ForkJoinPool.commonPool, unless you specify a customExecutor
. This means any asynchronous task (likesupplyAsync
) is picked up by one of the threads in the common pool. -
Thread Propagation through Chaining:
If a stage in the chain (likehandle
,thenApply
,thenAccept
) does not explicitly specify an executor, it may execute:- On the thread completing the previous stage if it happens to be free.
- On the calling thread if the task is already completed when the next stage is invoked (i.e., the main thread in this case).
-
Why
thenAccept
Runs on Different Threads:- When you print within
supplyAsync
, it likely adds enough delay for the task to fully complete asynchronously beforethenAccept
is registered. As a result,thenAccept
executes on the ForkJoinPool.commonPool. - When you skip printing, the completion happens quickly enough that the main thread can take over execution of subsequent stages like
thenApply
andthenAccept
.
- When you print within
Recorded Output Analysis
Case 1: When supplyAsync
Prints
-
Thread Behavior:
supplyAsync
: Runs onForkJoinPool.commonPool-worker-1
.thenApply
andthenAccept
: Also run on the same worker thread since the asynchronous computation was still active in the pool whenthenAccept
was registered.
-
Reasoning:
- Adding the
out.println("supplyAsync")
adds latency, keeping the computation on the worker thread longer.
- Adding the
Case 2: When supplyAsync
Does Not Print
-
Thread Behavior:
thenApply
andthenAccept
: Run on the main thread (main/tid:1
).
-
Reasoning:
- Without the delay introduced by
out.println
, thesupplyAsync
task completes faster. When the chained operations likethenApply
andthenAccept
are registered, they are picked up by the thread initiating the chain (the main thread), since the completion occurs synchronously.
- Without the delay introduced by
Takeaways
-
Thread Dependence: The thread running
thenAccept
(or other chained stages) depends on:- Whether the task completed asynchronously or synchronously.
- How and when subsequent stages are registered.
-
Consistency with Custom Executor:
If you want predictable thread execution, supply anExecutor
toCompletableFuture
methods:CompletableFuture.supplyAsync(task, customExecutor) .thenApplyAsync(transform, customExecutor) .thenAcceptAsync(action, customExecutor)
-
ForkJoinPool Behavior: The ForkJoinPool.commonPool can sometimes reuse the same thread for subsequent tasks to optimize throughput, but it’s not guaranteed. Timing and task scheduling influence this behavior.
Solution for Controlling Execution Context
To ensure all stages consistently run on a specific thread pool:
val executor = Executors.newFixedThreadPool(4)
CompletableFuture.supplyAsync({
out.println("supplyAsync")
"Result"
}, executor).thenApplyAsync({
it + " Processed"
}, executor).thenAcceptAsync({
out.println(it)
}, executor)
Children
Backlinks