CompletableFuture

Overview of CompletableFuture in Java

CompletableFuture is part of the Java java.util.concurrent package, introduced in Java 8. It represents a promise-like construct for asynchronous programming, allowing you to write non-blocking and more readable code for handling computations that may complete in the future.

Life cycle

stateDiagram-v2 [*] --> NotStarted: CompletableFuture created NotStarted --> Running: Task submitted Running --> Completed: Task finishes successfully Running --> ExceptionallyCompleted: Exception occurs Running --> Canceled: Future canceled Completed --> [*]: Result available ExceptionallyCompleted --> [*]: Exception handled Canceled --> [*]: No result Completed: Result can be retrieved with get() or join() ExceptionallyCompleted: Exception can be handled with exceptionally() or handle() Canceled: Cancel task with cancel(true)

Key Concepts

  1. Promise of a Result: A CompletableFuture represents a computation that will be completed (successfully or with an exception) at some point.

  2. Non-blocking Execution: It allows asynchronous computation without blocking the current thread.

  3. Chaining: You can chain multiple tasks together using methods like .thenApply, .thenAccept, .thenCompose, etc.

  4. Handling Results: It offers methods to handle the result of the computation, like .get(), .join(), or callbacks for async handling.

  5. Completion: A CompletableFuture can be completed programmatically using complete() or completeExceptionally().

Creating and Using CompletableFuture

Creating a CompletableFuture

Manually created future:
val future = CompletableFuture<String>()
// Completing it programmatically
future.complete("Result-Val")

println("Some other work")

println(future.get()) 

Output:

Some other work
Result-Val
GT-snapshot

Command to reproduce:

gt.sandbox.checkout.commit b581f798c55d205c6e91 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
Some other work
Result-Val

BUILD SUCCESSFUL in 874ms
2 actionable tasks: 2 executed
Asynchronous task:
val future = CompletableFuture.supplyAsync {
  out.println("Starting to sleep")
  Thread.sleep(1000)
  out.println("Finished sleeping")
  "Hello, CompletableFuture!"
}

out.println("Hello, from Main thread")
out.println(future.get()) // Output: Hello, CompletableFuture!
GT-Snapshot

Command to reproduce:

gt.sandbox.checkout.commit 9dd9ad6c4750da8e7b93 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:


[2024-11-22T04:33:14.166261Z][elapsed-since-start:   42ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Starting to sleep
[2024-11-22T04:33:14.166259Z][elapsed-since-start:   42ms][tname:main/tid:1] Hello, from Main thread
[2024-11-22T04:33:15.193044Z][elapsed-since-start: 1058ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Finished sleeping
[2024-11-22T04:33:15.194765Z][elapsed-since-start: 1059ms][tname:main/tid:1] Hello, CompletableFuture!

Non-Blocking Execution

To avoid blocking, use asynchronous callbacks:

  • thenApply: Transform the result.

    future.thenApply(result -> result.toUpperCase())
          .thenAccept(System.out::println);  // Output: HELLO, COMPLETABLEFUTURE!
    
  • thenAccept: Consume the result (no transformation).

    future.thenAccept(result -> System.out.println("Result: " + result));
    
  • thenRun: Run a task after completion (doesn’t consume the result).

    future.thenRun(() -> System.out.println("Computation finished!"));
    

thenApply does not modify Future

  • thenApply does NOT modify the CompletableFuture it is called on.
  • thenApply creates a new CompletableFuture.
GT-snapshot

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.runBlocking
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {
    out.println("supplyAsync")
    "Jon Snow"
  }

  out.println("thenApply().get(): " + future.thenApply {
    out.println("thenApply")
    "Hello: $it"
  }.get())

  out.println("""future.get(): [${future.get()}]""") // Output: Hello, CompletableFuture!
  println()
}

Command to reproduce:

gt.sandbox.checkout.commit 1c4b9d2596b1f10289ca \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

[2024-11-22T04:46:49.146281Z][elapsed-since-start:   43ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T04:46:49.165936Z][elapsed-since-start:   52ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T04:46:49.166359Z][elapsed-since-start:   53ms][tname:main/tid:1] thenApply().get(): Hello: Jon Snow
[2024-11-22T04:46:49.166505Z][elapsed-since-start:   53ms][tname:main/tid:1] future.get(): [Jon Snow]

Key Points About thenApply

  1. thenApply Creates a New Future:

    • When you call thenApply, a new CompletableFuture is created. This new future holds the transformed result of the computation once the original future completes.
  2. The Original Future Remains Unchanged:

    • The result of the original CompletableFuture (future.get() in your code) is unaffected by the thenApply call. It still holds the value "Jon Snow".
  3. Accessing the Transformed Result:

    • To retrieve the transformed result (e.g., "Hello Jon Snow" in your example), you need to call .get() or .join() on the CompletableFuture returned by thenApply.

Exception Handling

As expected the exception is re-thrown when calling get on the CompletableFuture that had exception thrown.

GT-Sandbox-Snapshot

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {

    throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
    "Jon Snow"
  }

  try {
    future.get()
  } catch (e: Exception) {
    out.println("Caught exception: ${e.message}")
  }

  println("")
}

Command to reproduce:

gt.sandbox.checkout.commit 0daaef350f0970bcddb7 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[2024-11-22T06:16:43.526687Z][elapsed-since-start:   41ms][tname:main/tid:1] Caught exception: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block

exceptionally: block that runs when an exception occurs

  future
    .exceptionally { ex: Throwable? -> fallbackMaker(ex) }
    .thenAccept { x: String -> out.println(x) }
GT-Snapshot

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {
    throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
    out.println("supplyAsync")
    "Jon Snow"
  }

  future
    .exceptionally { ex: Throwable? -> fallbackMaker(ex) }
    .thenAccept { x: String -> out.println(x) }

  delay(100)
  println()
}

private fun fallbackMaker(ex: Throwable?): String {
  return "Fallback Result (${ex?.message})"
}

Command to reproduce:

gt.sandbox.checkout.commit db510bcf58d24150f27d \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-22T05:23:27.988318Z][elapsed-since-start:   42ms][tname:main/tid:1] Fallback Result (java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block)


BUILD SUCCESSFUL in 675ms
2 actionable tasks: 1 executed, 1 up-to-date

handle: Handle both success and failure in one callback.

future.handle((result, ex) -> {
    if (ex != null) {
        return "Error: " + ex.getMessage();
    } else {
        return result.toUpperCase();
    }
}).thenAccept(System.out::println);
GT-Sandbox-Snapshot

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {
    out.println("supplyAsync")

    throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
    "Jon Snow"
  }

  future.handle { result: String, ex: Throwable? ->
    if (ex != null) {
      return@handle "Error handling: " + ex.message
    } else {
      return@handle result.uppercase(Locale.getDefault())
    }
  }.thenApply { x: String -> out.println("thenApply"); x + "!" }
    .thenAccept { x: String -> out.println(x) }

  
  delay(100)
  println()
}

Command to reproduce:

gt.sandbox.checkout.commit 381b07b215873ab5a216 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-22T06:05:54.560567Z][elapsed-since-start:   41ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T06:05:54.579624Z][elapsed-since-start:   51ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T06:05:54.579897Z][elapsed-since-start:   51ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!


BUILD SUCCESSFUL in 901ms
2 actionable tasks: 2 executed

Questions

CompletableFuture.supplyAsync AND handle AND which thread thenAccept runs on. WHY?

GT-Sandbox-Snapshot: thenAccept running on forkJoin

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {
    // WHEN I PRINT THIS LINE
    //  WITHIN THE FEATURE
    // THEN thenAccept runs on ForkJoinPool.commonPool-worker-1
    out.println("supplyAsync")

    throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
    "Jon Snow"
  }

  future.handle { result: String, ex: Throwable? ->
    if (ex != null) {
      return@handle "Error handling: " + ex.message
    } else {
      return@handle result.uppercase(Locale.getDefault())
    }
  }.thenApply { x: String -> out.println("thenApply"); x + "!" }
    .thenAccept { x: String -> out.println("thenAccept: " + x) }


  delay(100)
  println()
}

Command to reproduce:

gt.sandbox.checkout.commit 136656e43e24fb056c68 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-22T06:08:33.435054Z][elapsed-since-start:   40ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T06:08:33.454568Z][elapsed-since-start:   50ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T06:08:33.454811Z][elapsed-since-start:   50ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenAccept: Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!


BUILD SUCCESSFUL in 860ms
2 actionable tasks: 2 executed
GT-Sandbox-Snapshot: thenAccept runs on main thread

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {
    // WHEN I do NOT PRINT THIS LINE
    //  WITHIN THE FEATURE
    // THEN thenAccept runs on main thread.
    // out.println("supplyAsync")

    throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
    "Jon Snow"
  }

  future.handle { result: String, ex: Throwable? ->
    if (ex != null) {
      return@handle "Error handling: " + ex.message
    } else {
      return@handle result.uppercase(Locale.getDefault())
    }
  }.thenApply { x: String -> out.println("thenApply"); x + "!" }
    .thenAccept { x: String -> out.println("thenAccept: " + x) }


  delay(100)
  println()
}

Command to reproduce:

gt.sandbox.checkout.commit 3e8b27335e729f4a0273 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-22T06:09:31.213443Z][elapsed-since-start:   44ms][tname:main/tid:1] thenApply
[2024-11-22T06:09:31.234221Z][elapsed-since-start:   55ms][tname:main/tid:1] thenAccept: Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!


BUILD SUCCESSFUL in 855ms
2 actionable tasks: 2 executed
GPT's 4o Answer

The behavior of thenAccept running on different threads in your code depends on how tasks are scheduled and executed in CompletableFuture. Here's an explanation:


High-Level Explanation

  1. Thread Pool for supplyAsync:
    CompletableFuture.supplyAsync by default uses the ForkJoinPool.commonPool, unless you specify a custom Executor. This means any asynchronous task (like supplyAsync) is picked up by one of the threads in the common pool.

  2. Thread Propagation through Chaining:
    If a stage in the chain (like handle, thenApply, thenAccept) does not explicitly specify an executor, it may execute:

    • On the thread completing the previous stage if it happens to be free.
    • On the calling thread if the task is already completed when the next stage is invoked (i.e., the main thread in this case).
  3. Why thenAccept Runs on Different Threads:

    • When you print within supplyAsync, it likely adds enough delay for the task to fully complete asynchronously before thenAccept is registered. As a result, thenAccept executes on the ForkJoinPool.commonPool.
    • When you skip printing, the completion happens quickly enough that the main thread can take over execution of subsequent stages like thenApply and thenAccept.

Recorded Output Analysis

Case 1: When supplyAsync Prints

  • Thread Behavior:

    • supplyAsync: Runs on ForkJoinPool.commonPool-worker-1.
    • thenApply and thenAccept: Also run on the same worker thread since the asynchronous computation was still active in the pool when thenAccept was registered.
  • Reasoning:

    • Adding the out.println("supplyAsync") adds latency, keeping the computation on the worker thread longer.

Case 2: When supplyAsync Does Not Print

  • Thread Behavior:

    • thenApply and thenAccept: Run on the main thread (main/tid:1).
  • Reasoning:

    • Without the delay introduced by out.println, the supplyAsync task completes faster. When the chained operations like thenApply and thenAccept are registered, they are picked up by the thread initiating the chain (the main thread), since the completion occurs synchronously.

Takeaways

  1. Thread Dependence: The thread running thenAccept (or other chained stages) depends on:

    • Whether the task completed asynchronously or synchronously.
    • How and when subsequent stages are registered.
  2. Consistency with Custom Executor:
    If you want predictable thread execution, supply an Executor to CompletableFuture methods:

    CompletableFuture.supplyAsync(task, customExecutor)
        .thenApplyAsync(transform, customExecutor)
        .thenAcceptAsync(action, customExecutor)
    
  3. ForkJoinPool Behavior: The ForkJoinPool.commonPool can sometimes reuse the same thread for subsequent tasks to optimize throughput, but it’s not guaranteed. Timing and task scheduling influence this behavior.


Solution for Controlling Execution Context

To ensure all stages consistently run on a specific thread pool:

val executor = Executors.newFixedThreadPool(4)
CompletableFuture.supplyAsync({
    out.println("supplyAsync")
    "Result"
}, executor).thenApplyAsync({
    it + " Processed"
}, executor).thenAcceptAsync({
    out.println(it)
}, executor)

Cancellation does NOT work well with CompletableFutures

CompletableFuture.cancel

CompletableFuture.cancel(boolean mayInterruptIfRunning). mayInterruptIfRunning: does Nothing. [Cancellation does NOT work well with CompletableFutures](thorg://notes/u7ox6ra9bc6qdn3zoh7plfm) ```java /** * If not already completed, completes this CompletableFuture with * a {@link CancellationException}. Dependent CompletableFutures * that have not already completed will also complete * exceptionally, with a {@link CompletionException} caused by * this {@code CancellationException}. * * @param mayInterruptIfRunning this value has no effect in this * implementation because interrupts are not used to control * processing. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { ```
After 'cancel' is called. Thread.sleep() in cancelled thread does NOT throw Interrupted exception.

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import gt.sandbox.util.output.impl.OutSettings
import java.util.concurrent.CompletableFuture

val out = Out.standard(outSettings = OutSettings(printColorPerThread = true))


fun main() {
  // Create a CompletableFuture that simulates a long-running task
  val future = CompletableFuture.supplyAsync {
    out.println("supplyAsync: Task started")
    try {
      out.println("Starting 5 second sleep")
      Thread.sleep(5000) // Simulate a long-running task
      out.println("Slept for 5 seconds, will sleep for just 100ms more")
      Thread.sleep(100)
      out.println("supplyAsync: Task completed")
      "Task completed"
    } catch (e: InterruptedException) {
      out.println("Task was interrupted")
      throw RuntimeException("Task interrupted", e)
    }
  }

  Thread.sleep(100)
  // Simulate cancellation after 2 seconds
  Thread {
    out.println("Starting a new thread to cancel the future...")
    Thread.sleep(2000) // Wait 2 seconds before canceling
    out.println("Cancelling the future...")

    val cancelled = future.cancel(true)

    out.println("Future cancelled: $cancelled")
  }.start()

  // Attempt to retrieve the result (blocks until completed or cancelled)
  try {
    val result = future.get()
    out.println("Future result: $result")
  } catch (e: Exception) {
    out.println("Exception occurred: ${e.message}")
  }

  Thread.sleep(6000)
  out.println("Main thread ends")
}

Command to reproduce:

gt.sandbox.checkout.commit e04a4d99c46dad07bed4 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[2024-11-22T16:45:52.824796Z][elapsed-since-start:   15ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync: Task started
[2024-11-22T16:45:52.852971Z][elapsed-since-start:   33ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Starting 5 second sleep
[2024-11-22T16:45:52.928483Z][elapsed-since-start:  108ms][tname:Thread-0/tid:21] Starting a new thread to cancel the future...
[2024-11-22T16:45:54.931070Z][elapsed-since-start: 2111ms][tname:Thread-0/tid:21] Cancelling the future...
[2024-11-22T16:45:54.935170Z][elapsed-since-start: 2115ms][tname:main/tid:1] Exception occurred: null
[2024-11-22T16:45:54.938928Z][elapsed-since-start: 2118ms][tname:Thread-0/tid:21] Future cancelled: true
[2024-11-22T16:45:57.857226Z][elapsed-since-start: 5037ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Slept for 5 seconds, will sleep for just 100ms more
[2024-11-22T16:45:57.958440Z][elapsed-since-start: 5138ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync: Task completed
[2024-11-22T16:46:00.940212Z][elapsed-since-start: 8120ms][tname:main/tid:1] Main thread ends


Methods to Explore

MethodDescription
supplyAsync()Run a task asynchronously and return a result.
runAsync()Run a task asynchronously without returning a result.
thenApply()Transform the result of the task.
thenAccept()Consume the result of the task without returning anything.
thenRun()Run some task after the future completes (ignores the result).
exceptionally()Handle exceptions and provide fallback logic.
join()Block and wait for the result (throws unchecked exceptions).
allOf()Combine multiple futures (all must complete).
anyOf()Combine multiple futures (first one to complete is used).


Children
  1. Cancellation does NOT work well with CompletableFutures
  2. create CompletableFuture
  3. exceptionHandling
  4. non-blocking
  5. thenApply-does-not-modify-future