CompletableFuture
Overview of CompletableFuture in Java
CompletableFuture
is part of the Java java.util.concurrent
package, introduced in Java 8. It represents a promise-like construct for asynchronous programming, allowing you to write non-blocking and more readable code for handling computations that may complete in the future.
Life cycle
Key Concepts
-
Promise of a Result: A
CompletableFuture
represents a computation that will be completed (successfully or with an exception) at some point. -
Non-blocking Execution: It allows asynchronous computation without blocking the current thread.
-
Chaining: You can chain multiple tasks together using methods like
.thenApply
,.thenAccept
,.thenCompose
, etc. -
Handling Results: It offers methods to handle the result of the computation, like
.get()
,.join()
, or callbacks for async handling. -
Completion: A
CompletableFuture
can be completed programmatically usingcomplete()
orcompleteExceptionally()
.
Creating and Using CompletableFuture
Creating a CompletableFuture
Manually created future:
val future = CompletableFuture<String>()
// Completing it programmatically
future.complete("Result-Val")
println("Some other work")
println(future.get())
Output:
Some other work
Result-Val
GT-snapshot
Command to reproduce:
gt.sandbox.checkout.commit b581f798c55d205c6e91 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
Some other work
Result-Val
BUILD SUCCESSFUL in 874ms
2 actionable tasks: 2 executed
Asynchronous task:
val future = CompletableFuture.supplyAsync {
out.println("Starting to sleep")
Thread.sleep(1000)
out.println("Finished sleeping")
"Hello, CompletableFuture!"
}
out.println("Hello, from Main thread")
out.println(future.get()) // Output: Hello, CompletableFuture!
GT-Snapshot
Command to reproduce:
gt.sandbox.checkout.commit 9dd9ad6c4750da8e7b93 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
[2024-11-22T04:33:14.166261Z][elapsed-since-start: 42ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Starting to sleep
[2024-11-22T04:33:14.166259Z][elapsed-since-start: 42ms][tname:main/tid:1] Hello, from Main thread
[2024-11-22T04:33:15.193044Z][elapsed-since-start: 1058ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Finished sleeping
[2024-11-22T04:33:15.194765Z][elapsed-since-start: 1059ms][tname:main/tid:1] Hello, CompletableFuture!
Non-Blocking Execution
To avoid blocking, use asynchronous callbacks:
-
thenApply: Transform the result.
future.thenApply(result -> result.toUpperCase()) .thenAccept(System.out::println); // Output: HELLO, COMPLETABLEFUTURE!
-
thenAccept: Consume the result (no transformation).
future.thenAccept(result -> System.out.println("Result: " + result));
-
thenRun: Run a task after completion (doesn’t consume the result).
future.thenRun(() -> System.out.println("Computation finished!"));
thenApply does not modify Future
thenApply
does NOT modify theCompletableFuture
it is called on.thenApply
creates a newCompletableFuture
.
GT-snapshot
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.runBlocking
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
out.println("supplyAsync")
"Jon Snow"
}
out.println("thenApply().get(): " + future.thenApply {
out.println("thenApply")
"Hello: $it"
}.get())
out.println("""future.get(): [${future.get()}]""") // Output: Hello, CompletableFuture!
println()
}
Command to reproduce:
gt.sandbox.checkout.commit 1c4b9d2596b1f10289ca \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
[2024-11-22T04:46:49.146281Z][elapsed-since-start: 43ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T04:46:49.165936Z][elapsed-since-start: 52ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T04:46:49.166359Z][elapsed-since-start: 53ms][tname:main/tid:1] thenApply().get(): Hello: Jon Snow
[2024-11-22T04:46:49.166505Z][elapsed-since-start: 53ms][tname:main/tid:1] future.get(): [Jon Snow]
Key Points About thenApply
-
thenApply
Creates a New Future:- When you call
thenApply
, a newCompletableFuture
is created. This new future holds the transformed result of the computation once the original future completes.
- When you call
-
The Original Future Remains Unchanged:
- The result of the original
CompletableFuture
(future.get()
in your code) is unaffected by thethenApply
call. It still holds the value"Jon Snow"
.
- The result of the original
-
Accessing the Transformed Result:
- To retrieve the transformed result (e.g.,
"Hello Jon Snow"
in your example), you need to call.get()
or.join()
on theCompletableFuture
returned bythenApply
.
- To retrieve the transformed result (e.g.,
Exception Handling
As expected the exception is re-thrown when calling get
on the CompletableFuture
that had exception thrown.
GT-Sandbox-Snapshot
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
"Jon Snow"
}
try {
future.get()
} catch (e: Exception) {
out.println("Caught exception: ${e.message}")
}
println("")
}
Command to reproduce:
gt.sandbox.checkout.commit 0daaef350f0970bcddb7 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"
Recorded output of command:
[2024-11-22T06:16:43.526687Z][elapsed-since-start: 41ms][tname:main/tid:1] Caught exception: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block
exceptionally: block that runs when an exception occurs
future
.exceptionally { ex: Throwable? -> fallbackMaker(ex) }
.thenAccept { x: String -> out.println(x) }
GT-Snapshot
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
out.println("supplyAsync")
"Jon Snow"
}
future
.exceptionally { ex: Throwable? -> fallbackMaker(ex) }
.thenAccept { x: String -> out.println(x) }
delay(100)
println()
}
private fun fallbackMaker(ex: Throwable?): String {
return "Fallback Result (${ex?.message})"
}
Command to reproduce:
gt.sandbox.checkout.commit db510bcf58d24150f27d \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-11-22T05:23:27.988318Z][elapsed-since-start: 42ms][tname:main/tid:1] Fallback Result (java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block)
BUILD SUCCESSFUL in 675ms
2 actionable tasks: 1 executed, 1 up-to-date
handle: Handle both success and failure in one callback.
future.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
} else {
return result.toUpperCase();
}
}).thenAccept(System.out::println);
GT-Sandbox-Snapshot
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
out.println("supplyAsync")
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
"Jon Snow"
}
future.handle { result: String, ex: Throwable? ->
if (ex != null) {
return@handle "Error handling: " + ex.message
} else {
return@handle result.uppercase(Locale.getDefault())
}
}.thenApply { x: String -> out.println("thenApply"); x + "!" }
.thenAccept { x: String -> out.println(x) }
delay(100)
println()
}
Command to reproduce:
gt.sandbox.checkout.commit 381b07b215873ab5a216 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-11-22T06:05:54.560567Z][elapsed-since-start: 41ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T06:05:54.579624Z][elapsed-since-start: 51ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T06:05:54.579897Z][elapsed-since-start: 51ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!
BUILD SUCCESSFUL in 901ms
2 actionable tasks: 2 executed
Questions
CompletableFuture.supplyAsync AND handle AND which thread thenAccept runs on. WHY?
GT-Sandbox-Snapshot: thenAccept running on forkJoin
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
// WHEN I PRINT THIS LINE
// WITHIN THE FEATURE
// THEN thenAccept runs on ForkJoinPool.commonPool-worker-1
out.println("supplyAsync")
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
"Jon Snow"
}
future.handle { result: String, ex: Throwable? ->
if (ex != null) {
return@handle "Error handling: " + ex.message
} else {
return@handle result.uppercase(Locale.getDefault())
}
}.thenApply { x: String -> out.println("thenApply"); x + "!" }
.thenAccept { x: String -> out.println("thenAccept: " + x) }
delay(100)
println()
}
Command to reproduce:
gt.sandbox.checkout.commit 136656e43e24fb056c68 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-11-22T06:08:33.435054Z][elapsed-since-start: 40ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T06:08:33.454568Z][elapsed-since-start: 50ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T06:08:33.454811Z][elapsed-since-start: 50ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenAccept: Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!
BUILD SUCCESSFUL in 860ms
2 actionable tasks: 2 executed
GT-Sandbox-Snapshot: thenAccept runs on main thread
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture
val out = Out.standard()
fun main() = runBlocking {
val future = CompletableFuture.supplyAsync {
// WHEN I do NOT PRINT THIS LINE
// WITHIN THE FEATURE
// THEN thenAccept runs on main thread.
// out.println("supplyAsync")
throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
"Jon Snow"
}
future.handle { result: String, ex: Throwable? ->
if (ex != null) {
return@handle "Error handling: " + ex.message
} else {
return@handle result.uppercase(Locale.getDefault())
}
}.thenApply { x: String -> out.println("thenApply"); x + "!" }
.thenAccept { x: String -> out.println("thenAccept: " + x) }
delay(100)
println()
}
Command to reproduce:
gt.sandbox.checkout.commit 3e8b27335e729f4a0273 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"
Recorded output of command:
> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE
> Task :app:run
[2024-11-22T06:09:31.213443Z][elapsed-since-start: 44ms][tname:main/tid:1] thenApply
[2024-11-22T06:09:31.234221Z][elapsed-since-start: 55ms][tname:main/tid:1] thenAccept: Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!
BUILD SUCCESSFUL in 855ms
2 actionable tasks: 2 executed
GPT's 4o Answer
The behavior of thenAccept
running on different threads in your code depends on how tasks are scheduled and executed in CompletableFuture
. Here's an explanation:
High-Level Explanation
-
Thread Pool for
supplyAsync
:
CompletableFuture.supplyAsync
by default uses the ForkJoinPool.commonPool, unless you specify a customExecutor
. This means any asynchronous task (likesupplyAsync
) is picked up by one of the threads in the common pool. -
Thread Propagation through Chaining:
If a stage in the chain (likehandle
,thenApply
,thenAccept
) does not explicitly specify an executor, it may execute:- On the thread completing the previous stage if it happens to be free.
- On the calling thread if the task is already completed when the next stage is invoked (i.e., the main thread in this case).
-
Why
thenAccept
Runs on Different Threads:- When you print within
supplyAsync
, it likely adds enough delay for the task to fully complete asynchronously beforethenAccept
is registered. As a result,thenAccept
executes on the ForkJoinPool.commonPool. - When you skip printing, the completion happens quickly enough that the main thread can take over execution of subsequent stages like
thenApply
andthenAccept
.
- When you print within
Recorded Output Analysis
Case 1: When supplyAsync
Prints
-
Thread Behavior:
supplyAsync
: Runs onForkJoinPool.commonPool-worker-1
.thenApply
andthenAccept
: Also run on the same worker thread since the asynchronous computation was still active in the pool whenthenAccept
was registered.
-
Reasoning:
- Adding the
out.println("supplyAsync")
adds latency, keeping the computation on the worker thread longer.
- Adding the
Case 2: When supplyAsync
Does Not Print
-
Thread Behavior:
thenApply
andthenAccept
: Run on the main thread (main/tid:1
).
-
Reasoning:
- Without the delay introduced by
out.println
, thesupplyAsync
task completes faster. When the chained operations likethenApply
andthenAccept
are registered, they are picked up by the thread initiating the chain (the main thread), since the completion occurs synchronously.
- Without the delay introduced by
Takeaways
-
Thread Dependence: The thread running
thenAccept
(or other chained stages) depends on:- Whether the task completed asynchronously or synchronously.
- How and when subsequent stages are registered.
-
Consistency with Custom Executor:
If you want predictable thread execution, supply anExecutor
toCompletableFuture
methods:CompletableFuture.supplyAsync(task, customExecutor) .thenApplyAsync(transform, customExecutor) .thenAcceptAsync(action, customExecutor)
-
ForkJoinPool Behavior: The ForkJoinPool.commonPool can sometimes reuse the same thread for subsequent tasks to optimize throughput, but it’s not guaranteed. Timing and task scheduling influence this behavior.
Solution for Controlling Execution Context
To ensure all stages consistently run on a specific thread pool:
val executor = Executors.newFixedThreadPool(4)
CompletableFuture.supplyAsync({
out.println("supplyAsync")
"Result"
}, executor).thenApplyAsync({
it + " Processed"
}, executor).thenAcceptAsync({
out.println(it)
}, executor)
Cancellation does NOT work well with CompletableFutures
CompletableFuture.cancel
CompletableFuture.cancel(boolean mayInterruptIfRunning). mayInterruptIfRunning: does Nothing.
[Cancellation does NOT work well with CompletableFutures](thorg://notes/u7ox6ra9bc6qdn3zoh7plfm) ```java /** * If not already completed, completes this CompletableFuture with * a {@link CancellationException}. Dependent CompletableFutures * that have not already completed will also complete * exceptionally, with a {@link CompletionException} caused by * this {@code CancellationException}. * * @param mayInterruptIfRunning this value has no effect in this * implementation because interrupts are not used to control * processing. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { ```After 'cancel' is called. Thread.sleep() in cancelled thread does NOT throw Interrupted exception.
Code
package com.glassthought.sandbox
import gt.sandbox.util.output.Out
import gt.sandbox.util.output.impl.OutSettings
import java.util.concurrent.CompletableFuture
val out = Out.standard(outSettings = OutSettings(printColorPerThread = true))
fun main() {
// Create a CompletableFuture that simulates a long-running task
val future = CompletableFuture.supplyAsync {
out.println("supplyAsync: Task started")
try {
out.println("Starting 5 second sleep")
Thread.sleep(5000) // Simulate a long-running task
out.println("Slept for 5 seconds, will sleep for just 100ms more")
Thread.sleep(100)
out.println("supplyAsync: Task completed")
"Task completed"
} catch (e: InterruptedException) {
out.println("Task was interrupted")
throw RuntimeException("Task interrupted", e)
}
}
Thread.sleep(100)
// Simulate cancellation after 2 seconds
Thread {
out.println("Starting a new thread to cancel the future...")
Thread.sleep(2000) // Wait 2 seconds before canceling
out.println("Cancelling the future...")
val cancelled = future.cancel(true)
out.println("Future cancelled: $cancelled")
}.start()
// Attempt to retrieve the result (blocks until completed or cancelled)
try {
val result = future.get()
out.println("Future result: $result")
} catch (e: Exception) {
out.println("Exception occurred: ${e.message}")
}
Thread.sleep(6000)
out.println("Main thread ends")
}
Command to reproduce:
gt.sandbox.checkout.commit e04a4d99c46dad07bed4 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"
Recorded output of command:
[2024-11-22T16:45:52.824796Z][elapsed-since-start: 15ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync: Task started
[2024-11-22T16:45:52.852971Z][elapsed-since-start: 33ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Starting 5 second sleep
[2024-11-22T16:45:52.928483Z][elapsed-since-start: 108ms][tname:Thread-0/tid:21] Starting a new thread to cancel the future...
[2024-11-22T16:45:54.931070Z][elapsed-since-start: 2111ms][tname:Thread-0/tid:21] Cancelling the future...
[2024-11-22T16:45:54.935170Z][elapsed-since-start: 2115ms][tname:main/tid:1] Exception occurred: null
[2024-11-22T16:45:54.938928Z][elapsed-since-start: 2118ms][tname:Thread-0/tid:21] Future cancelled: true
[2024-11-22T16:45:57.857226Z][elapsed-since-start: 5037ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Slept for 5 seconds, will sleep for just 100ms more
[2024-11-22T16:45:57.958440Z][elapsed-since-start: 5138ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync: Task completed
[2024-11-22T16:46:00.940212Z][elapsed-since-start: 8120ms][tname:main/tid:1] Main thread ends
Methods to Explore
Method | Description |
---|---|
supplyAsync() | Run a task asynchronously and return a result. |
runAsync() | Run a task asynchronously without returning a result. |
thenApply() | Transform the result of the task. |
thenAccept() | Consume the result of the task without returning anything. |
thenRun() | Run some task after the future completes (ignores the result). |
exceptionally() | Handle exceptions and provide fallback logic. |
join() | Block and wait for the result (throws unchecked exceptions). |
allOf() | Combine multiple futures (all must complete). |
anyOf() | Combine multiple futures (first one to complete is used). |
Children