handle

handle: Handle both success and failure in one callback.

future.handle((result, ex) -> {
    if (ex != null) {
        return "Error: " + ex.getMessage();
    } else {
        return result.toUpperCase();
    }
}).thenAccept(System.out::println);
GT-Sandbox-Snapshot

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {
    out.println("supplyAsync")

    throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
    "Jon Snow"
  }

  future.handle { result: String, ex: Throwable? ->
    if (ex != null) {
      return@handle "Error handling: " + ex.message
    } else {
      return@handle result.uppercase(Locale.getDefault())
    }
  }.thenApply { x: String -> out.println("thenApply"); x + "!" }
    .thenAccept { x: String -> out.println(x) }

  
  delay(100)
  println()
}

Command to reproduce:

gt.sandbox.checkout.commit 381b07b215873ab5a216 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-22T06:05:54.560567Z][elapsed-since-start:   41ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T06:05:54.579624Z][elapsed-since-start:   51ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T06:05:54.579897Z][elapsed-since-start:   51ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!


BUILD SUCCESSFUL in 901ms
2 actionable tasks: 2 executed

Questions

CompletableFuture.supplyAsync AND handle AND which thread thenAccept runs on. WHY?

GT-Sandbox-Snapshot: thenAccept running on forkJoin

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {
    // WHEN I PRINT THIS LINE
    //  WITHIN THE FEATURE
    // THEN thenAccept runs on ForkJoinPool.commonPool-worker-1
    out.println("supplyAsync")

    throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
    "Jon Snow"
  }

  future.handle { result: String, ex: Throwable? ->
    if (ex != null) {
      return@handle "Error handling: " + ex.message
    } else {
      return@handle result.uppercase(Locale.getDefault())
    }
  }.thenApply { x: String -> out.println("thenApply"); x + "!" }
    .thenAccept { x: String -> out.println("thenAccept: " + x) }


  delay(100)
  println()
}

Command to reproduce:

gt.sandbox.checkout.commit 136656e43e24fb056c68 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-22T06:08:33.435054Z][elapsed-since-start:   40ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] supplyAsync
[2024-11-22T06:08:33.454568Z][elapsed-since-start:   50ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenApply
[2024-11-22T06:08:33.454811Z][elapsed-since-start:   50ms][tname:ForkJoinPool.commonPool-worker-1/tid:20] thenAccept: Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!


BUILD SUCCESSFUL in 860ms
2 actionable tasks: 2 executed
GT-Sandbox-Snapshot: thenAccept runs on main thread

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import java.util.*
import java.util.concurrent.CompletableFuture

val out = Out.standard()

fun main() = runBlocking {
  val future = CompletableFuture.supplyAsync {
    // WHEN I do NOT PRINT THIS LINE
    //  WITHIN THE FEATURE
    // THEN thenAccept runs on main thread.
    // out.println("supplyAsync")

    throw RuntimeException("original-exc-msg-from-supplyAsync-future-block")
    "Jon Snow"
  }

  future.handle { result: String, ex: Throwable? ->
    if (ex != null) {
      return@handle "Error handling: " + ex.message
    } else {
      return@handle result.uppercase(Locale.getDefault())
    }
  }.thenApply { x: String -> out.println("thenApply"); x + "!" }
    .thenAccept { x: String -> out.println("thenAccept: " + x) }


  delay(100)
  println()
}

Command to reproduce:

gt.sandbox.checkout.commit 3e8b27335e729f4a0273 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-22T06:09:31.213443Z][elapsed-since-start:   44ms][tname:main/tid:1] thenApply
[2024-11-22T06:09:31.234221Z][elapsed-since-start:   55ms][tname:main/tid:1] thenAccept: Error handling: java.lang.RuntimeException: original-exc-msg-from-supplyAsync-future-block!


BUILD SUCCESSFUL in 855ms
2 actionable tasks: 2 executed
GPT's 4o Answer

The behavior of thenAccept running on different threads in your code depends on how tasks are scheduled and executed in CompletableFuture. Here's an explanation:


High-Level Explanation

  1. Thread Pool for supplyAsync:
    CompletableFuture.supplyAsync by default uses the ForkJoinPool.commonPool, unless you specify a custom Executor. This means any asynchronous task (like supplyAsync) is picked up by one of the threads in the common pool.

  2. Thread Propagation through Chaining:
    If a stage in the chain (like handle, thenApply, thenAccept) does not explicitly specify an executor, it may execute:

    • On the thread completing the previous stage if it happens to be free.
    • On the calling thread if the task is already completed when the next stage is invoked (i.e., the main thread in this case).
  3. Why thenAccept Runs on Different Threads:

    • When you print within supplyAsync, it likely adds enough delay for the task to fully complete asynchronously before thenAccept is registered. As a result, thenAccept executes on the ForkJoinPool.commonPool.
    • When you skip printing, the completion happens quickly enough that the main thread can take over execution of subsequent stages like thenApply and thenAccept.

Recorded Output Analysis

Case 1: When supplyAsync Prints

  • Thread Behavior:

    • supplyAsync: Runs on ForkJoinPool.commonPool-worker-1.
    • thenApply and thenAccept: Also run on the same worker thread since the asynchronous computation was still active in the pool when thenAccept was registered.
  • Reasoning:

    • Adding the out.println("supplyAsync") adds latency, keeping the computation on the worker thread longer.

Case 2: When supplyAsync Does Not Print

  • Thread Behavior:

    • thenApply and thenAccept: Run on the main thread (main/tid:1).
  • Reasoning:

    • Without the delay introduced by out.println, the supplyAsync task completes faster. When the chained operations like thenApply and thenAccept are registered, they are picked up by the thread initiating the chain (the main thread), since the completion occurs synchronously.

Takeaways

  1. Thread Dependence: The thread running thenAccept (or other chained stages) depends on:

    • Whether the task completed asynchronously or synchronously.
    • How and when subsequent stages are registered.
  2. Consistency with Custom Executor:
    If you want predictable thread execution, supply an Executor to CompletableFuture methods:

    CompletableFuture.supplyAsync(task, customExecutor)
        .thenApplyAsync(transform, customExecutor)
        .thenAcceptAsync(action, customExecutor)
    
  3. ForkJoinPool Behavior: The ForkJoinPool.commonPool can sometimes reuse the same thread for subsequent tasks to optimize throughput, but it’s not guaranteed. Timing and task scheduling influence this behavior.


Solution for Controlling Execution Context

To ensure all stages consistently run on a specific thread pool:

val executor = Executors.newFixedThreadPool(4)
CompletableFuture.supplyAsync({
    out.println("supplyAsync")
    "Result"
}, executor).thenApplyAsync({
    it + " Processed"
}, executor).thenAcceptAsync({
    out.println(it)
}, executor)


Children
  1. q-on-which-thread

Backlinks