Mutex

Observations:

Works with co-routines from single scope

Code

package com.glassthought.sandbox

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock


class RaceConditionInducer {
  private var value = 0
  private val mutex = Mutex() // Mutex for coroutine-safe access to 'value'

  // Suspend function to use coroutine-friendly synchronization
  suspend fun incrementValue() {
    mutex.withLock {
      val storedValue = value

      // Coroutine-friendly delay, instead of Thread.sleep
      // [delay](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html)
      delay(10)

      println("Incrementing value from storedValue=$storedValue to ${storedValue + 1}")
      value = storedValue + 1
    }
  }

  fun getValue() = value
}

fun main() = runBlocking {
  val raceConditionInducer = RaceConditionInducer()

  // Launch coroutines instead of creating threads
  coroutineScope {
    val jobs = (1..10).map {
      launch { raceConditionInducer.incrementValue() }
    }
    jobs.forEach { it.join() } // Wait for all coroutines to complete
  }

  println("Value: ${raceConditionInducer.getValue()}")
}

Command to reproduce:

gt.sandbox.checkout.commit caf1172a6d743df8695a \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:processResources NO-SOURCE
> Task :app:compileKotlin
> Task :app:compileJava NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
Incrementing value from storedValue=0 to 1
Incrementing value from storedValue=1 to 2
Incrementing value from storedValue=2 to 3
Incrementing value from storedValue=3 to 4
Incrementing value from storedValue=4 to 5
Incrementing value from storedValue=5 to 6
Incrementing value from storedValue=6 to 7
Incrementing value from storedValue=7 to 8
Incrementing value from storedValue=8 to 9
Incrementing value from storedValue=9 to 10
Value: 10

BUILD SUCCESSFUL in 1s
2 actionable tasks: 2 executed
Works across different Dispatchers

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val out = Out.standard();

class RaceConditionInducer {
  private var value = 0
  private val mutex = Mutex() // Mutex for coroutine-safe access to 'value'

  // Suspend function to use coroutine-friendly synchronization
  suspend fun incrementValue() {
    mutex.withLock {
      val storedValue = value

      // Coroutine-friendly delay, instead of Thread.sleep
      // [delay](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html)
      delay(100)
      out.println("Incrementing value from $storedValue to ${storedValue + 1}")

      value = storedValue + 1
    }
  }

  fun getValue() = value
}

fun main() = runBlocking {
  val raceConditionInducer = RaceConditionInducer()

  val dispatchers = listOf(
    Dispatchers.Default,
    Dispatchers.IO,
    newFixedThreadPoolContext(3, "CustomThreadPool")
  )

  val jobs = dispatchers.mapIndexed { index, dispatcher ->
    launch(dispatcher) {
      out.println("Starting scope $index on dispatcher $dispatcher")
      repeat(5) { i ->
        raceConditionInducer.incrementValue()
      }
      println("Finished scope $index on dispatcher $dispatcher")
    }
  }

  jobs.forEach { it.join() }

  println("Final Value: ${raceConditionInducer.getValue()}")
}

Command to reproduce:

gt.sandbox.checkout.commit 68a8d22276538db17d0a \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-09T22:21:26.479211Z][elapsed-since-start:   63ms][tname:CustomThreadPool-1/tid:23] Starting scope 2 on dispatcher java.util.concurrent.ScheduledThreadPoolExecutor@244038d0[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[2024-11-09T22:21:26.479210Z][elapsed-since-start:   63ms][tname:DefaultDispatcher-worker-2/tid:21] Starting scope 1 on dispatcher Dispatchers.IO
[2024-11-09T22:21:26.479209Z][elapsed-since-start:   63ms][tname:DefaultDispatcher-worker-1/tid:20] Starting scope 0 on dispatcher Dispatchers.Default
[2024-11-09T22:21:26.602896Z][elapsed-since-start:  180ms][tname:DefaultDispatcher-worker-2/tid:21] Incrementing value from 0 to 1
[2024-11-09T22:21:26.709300Z][elapsed-since-start:  287ms][tname:DefaultDispatcher-worker-2/tid:21] Incrementing value from 1 to 2
[2024-11-09T22:21:26.812024Z][elapsed-since-start:  390ms][tname:CustomThreadPool-2/tid:25] Incrementing value from 2 to 3
[2024-11-09T22:21:26.917401Z][elapsed-since-start:  495ms][tname:DefaultDispatcher-worker-2/tid:21] Incrementing value from 3 to 4
[2024-11-09T22:21:27.023142Z][elapsed-since-start:  601ms][tname:DefaultDispatcher-worker-1/tid:20] Incrementing value from 4 to 5
[2024-11-09T22:21:27.129185Z][elapsed-since-start:  707ms][tname:CustomThreadPool-1/tid:23] Incrementing value from 5 to 6
[2024-11-09T22:21:27.231417Z][elapsed-since-start:  809ms][tname:DefaultDispatcher-worker-1/tid:20] Incrementing value from 6 to 7
[2024-11-09T22:21:27.337921Z][elapsed-since-start:  916ms][tname:DefaultDispatcher-worker-3/tid:22] Incrementing value from 7 to 8
[2024-11-09T22:21:27.444125Z][elapsed-since-start: 1022ms][tname:CustomThreadPool-3/tid:26] Incrementing value from 8 to 9
[2024-11-09T22:21:27.547886Z][elapsed-since-start: 1126ms][tname:DefaultDispatcher-worker-3/tid:22] Incrementing value from 9 to 10
[2024-11-09T22:21:27.653903Z][elapsed-since-start: 1232ms][tname:DefaultDispatcher-worker-1/tid:20] Incrementing value from 10 to 11
[2024-11-09T22:21:27.757974Z][elapsed-since-start: 1336ms][tname:CustomThreadPool-2/tid:25] Incrementing value from 11 to 12
[2024-11-09T22:21:27.860972Z][elapsed-since-start: 1439ms][tname:DefaultDispatcher-worker-1/tid:20] Incrementing value from 12 to 13
Finished scope 0 on dispatcher Dispatchers.Default
[2024-11-09T22:21:27.966097Z][elapsed-since-start: 1544ms][tname:DefaultDispatcher-worker-1/tid:20] Incrementing value from 13 to 14
Finished scope 1 on dispatcher Dispatchers.IO
[2024-11-09T22:21:28.071978Z][elapsed-since-start: 1650ms][tname:CustomThreadPool-3/tid:26] Incrementing value from 14 to 15
Finished scope 2 on dispatcher java.util.concurrent.ScheduledThreadPoolExecutor@244038d0[Running, pool size = 3, active threads = 1, queued tasks = 0, completed tasks = 10]
Final Value: 15

BUILD SUCCESSFUL in 2s
2 actionable tasks: 1 executed, 1 up-to-date

Notes

Fair: FIFO order on acquisitions

The mutex created is fair: lock is granted in first come, first served order. - kotlin doc

GT-Sandbox-Snapshot:

Code

package com.glassthought.sandbox

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import gt.sandbox.util.output.out

fun main() = runBlocking {
  val mutex = Mutex()
  val expectedOrder = listOf("Job 1", "Job 2", "Job 3")
  val results = mutableListOf<String>()

  var counterTimesRan = 0
  while (true) {
    results.clear()

   val mutexAcquired = CompletableDeferred<Unit>()
    val job1 = launch {
      out.info("Job 1: Attempting to acquire the mutex...")
      mutex.withLock {
        out.infoGreen("Job 1: Acquired the mutex!")
        mutexAcquired.complete(Unit)
        results.add("Job 1")
        delay(200) // Simulate long work
        out.infoGreen("Job 1: Releasing the mutex.")
      }
    }
    mutexAcquired.await()

    val job2AboutToWaitOnMutex = CompletableDeferred<Unit>()
    val job2 = launch {
      out.info("Job 2: Attempting to acquire the mutex...")
      job2AboutToWaitOnMutex.complete(Unit)
      mutex.withLock {
        out.infoBlue("Job 2: Acquired the mutex!")
        results.add("Job 2")
        out.infoBlue("Job 2: Releasing the mutex.")
      }
    }
    job2AboutToWaitOnMutex.await()
    delay(10)

    val job3 = launch {
      out.info("Job 3: Attempting to acquire the mutex...")
      mutex.withLock {
        out.infoRed("Job 3: Acquired the mutex!")
        results.add("Job 3")
        out.infoRed("Job 3: Releasing the mutex.")
      }
    }

    joinAll(job1, job2, job3)

    if (results != expectedOrder) {
      out.infoRed("Incorrect order detected: $results")
      break
    } else {
      out.infoGreen("Correct order: $results, (run #${++counterTimesRan})")
    }
  }
}

Command to reproduce:

gt.sandbox.checkout.commit a38a00b120ad0ba936ea \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "_gt_no_op_for_snapshotting_with_printout"

Recorded output of command:

No command ran. Just snapshot.

Mutex is not designed for System Thread synchronization: But does synchronize threads

kotlinx.coroutines.sync.Mutex:

  • Not intended for thread synchronization.
  • Does make sure a single "logical execution" can be within withLock block.
    • CoRoutine can suspend and switch to different thread. But the withLock block will be executed by a single logical execution.

Mutual exclusion solution to the problem is to protect all modifications of the shared state with a critical section that is never executed concurrently. In a blocking world you'd typically use synchronized or ReentrantLock for that. Coroutine's alternative is called Mutex. It has lock and unlock functions to delimit a critical section. The key difference is that Mutex.lock() is a suspending function. It does not block a thread.

There is also withLock extension function that conveniently represents mutex.lock(); try { ... } finally { mutex.unlock() } pattern: - kotlin doc

Experimentally blocks a thread

In the following example, Mutex ends up synchronizing the counter across threads.

Stack overflow Question on this with explanation

Example where Mutex appears to block system level threads

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import com.glassthought.sandbox.util.out.impl.OutSettings
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.concurrent.thread

val mutex = Mutex()

var counter = 0

suspend fun test() {
  mutex.withLock {
    counter = counter + 1
  }
}

val out = Out.standard(OutSettings(printCoroutineName = false))
val TIMES_TO_REPEAT = 100000

suspend fun main(args: Array<String>) {

  out.info("Starting on main thread")

  val t1 = thread {
    runBlocking {
      incrementInALoop("thread-1")
    }
  }

  val t2 = thread {
    runBlocking {
      incrementInALoop("thread-2")
    }
  }

  t1.join()
  t2.join()

  printResults()
}

private suspend fun incrementInALoop(threadName: String) {
  out.info("Starting execution on $threadName")
  repeat(TIMES_TO_REPEAT) {
    test()
  }
  out.info("Finished execution on $threadName")
}

private suspend fun printResults() {
  out.info("Counter : $counter")
  val expected = TIMES_TO_REPEAT * 2
  out.info("Expected: $expected")
  if (expected == counter) {
    out.printGreen("All accounted!")
    out.println("")
  } else {
    out.printRed("NOT all accounted!")
    out.println("")
  }
}

Command to reproduce:

gt.sandbox.checkout.commit badef9ec1f9c2ffed319 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[elapsed:   22ms][šŸ„‡/tname:main/tid:1] Starting on main thread
[elapsed:   57ms][ā“¶/tname:Thread-1/tid:21] Starting execution on thread-2
[elapsed:   57ms][ā“·/tname:Thread-0/tid:20] Starting execution on thread-1
[elapsed:  422ms][ā“·/tname:Thread-0/tid:20] Finished execution on thread-1
[elapsed:  423ms][ā“¶/tname:Thread-1/tid:21] Finished execution on thread-2
[elapsed:  425ms][šŸ„‡/tname:main/tid:1] Counter : 200000
[elapsed:  425ms][šŸ„‡/tname:main/tid:1] Expected: 200000
All accounted!
The same example without using Mutex shows counter synchronization issues.

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import com.glassthought.sandbox.util.out.impl.OutSettings
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.concurrent.thread

val mutex = Mutex()

var counter = 0

suspend fun test() {
  // mutex.withLock {
    counter = counter + 1
  // }
}

val out = Out.standard(OutSettings(printCoroutineName = false))
val TIMES_TO_REPEAT = 100000

suspend fun main(args: Array<String>) {

  out.info("Starting on main thread")

  val t1 = thread {
    runBlocking {
      incrementInALoop("thread-1")
    }
  }

  val t2 = thread {
    runBlocking {
      incrementInALoop("thread-2")
    }
  }

  t1.join()
  t2.join()

  printResults()
}

private suspend fun incrementInALoop(threadName: String) {
  out.info("Starting execution on $threadName")
  repeat(TIMES_TO_REPEAT) {
    test()
  }
  out.info("Finished execution on $threadName")
}

private suspend fun printResults() {
  out.info("Counter : $counter")
  val expected = TIMES_TO_REPEAT * 2
  out.info("Expected: $expected")
  if (expected == counter) {
    out.printGreen("All accounted!")
    out.println("")
  } else {
    out.printRed("NOT all accounted!")
    out.println("")
  }
}

Command to reproduce:

gt.sandbox.checkout.commit 098034c6e697d9903df1 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[elapsed:   22ms][šŸ„‡/tname:main/tid:1] Starting on main thread
[elapsed:   49ms][ā“¶/tname:Thread-1/tid:21] Starting execution on thread-2
[elapsed:   49ms][ā“·/tname:Thread-0/tid:20] Starting execution on thread-1
[elapsed:   58ms][ā“·/tname:Thread-0/tid:20] Finished execution on thread-1
[elapsed:   59ms][ā“¶/tname:Thread-1/tid:21] Finished execution on thread-2
[elapsed:   60ms][šŸ„‡/tname:main/tid:1] Counter : 159744
[elapsed:   60ms][šŸ„‡/tname:main/tid:1] Expected: 200000
NOT all accounted!

How Expensive?

On M2-Pro Mac the overhead of taking Mutex is about 20-30 nanoseconds. That means you can take Mutex:

  • About 40K times per 1-millisecond.
  • About 40M times per 1-second.
GT-Sandbox-Snapshot: Time Taking Mutex VS Not taking Mutex

Code

package com.glassthought.sandbox

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.system.measureNanoTime

fun main() = runBlocking {
  val iterations = 100_000_000

  // Operation without mutex
  var counterWithoutMutex = 0
  val timeWithoutMutex = measureNanoTime {
    repeat(iterations) {
      counterWithoutMutex++
    }
  }

  println("Counter without mutex: $counterWithoutMutex")
  println("Time taken without mutex: $timeWithoutMutex ns")

  // Operation with mutex
  val mutex = Mutex()
  var counterWithMutex = 0
  val timeWithMutex = measureNanoTime {
    repeat(iterations) {
      mutex.withLock {
        counterWithMutex++
      }
    }
  }

  println("Counter with mutex: $counterWithMutex")
  println("Time taken with mutex: $timeWithMutex ns")

  // Calculate overhead per mutex operation
  val overheadPerOperation = (timeWithMutex - timeWithoutMutex) / iterations
  println("Overhead of Mutex.withLock{} per operation: $overheadPerOperation ns")

  // In relation to millisecond how many times can we take mutex in one millisecond
  // There are 1000 microseconds in one millisecond
  // There are 1,000,000 nanoseconds in one millisecond
  val timesPerMillisecond = 1_000_000 / overheadPerOperation
  println("In relation to millisecond how many times can we take mutex in one millisecond: $timesPerMillisecond")
}

Command to reproduce:

gt.sandbox.checkout.commit ef2e717a8bc9b3ebb7cd \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

Counter without mutex: 100000000
Time taken without mutex: 5828000 ns
Counter with mutex: 100000000
Time taken with mutex: 2240589458 ns
Overhead of Mutex.withLock{} per operation: 22 ns
In relation to millisecond how many times can we take mutex in one millisecond: 45454

Caution:

Not reentrant: Susceptible to deadlocks from the same co-routine.

If the same co-routine that acquired the lock attempts to acquire it again, it will deadlock. This is because the lock is not reentrant.

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock


val out = Out.standard()

class AttemptToGrabLockFromSameCoroutine {
  private var value = 0
  private val mutex = Mutex() // Mutex for coroutine-safe access to 'value'

  // Suspend function to use coroutine-friendly synchronization with timeout
  suspend fun outerCall() {
    out.println("outerCall outside of mutex")

    // Attempt to acquire the lock with a timeout
    val success = withTimeoutOrNull(1000L) { // 1000ms timeout
      mutex.withLock {
        out.println("outerCall INSIDE of mutex")
        innerCall()
      }
    }

    if (success == null) {
      out.println("ERROR: outerCall could not acquire the lock within timeout")
    }
  }

  private suspend fun innerCall() {
    out.println("innerCall outside of mutex")

    // Attempt to acquire the lock with a timeout
    val success = withTimeoutOrNull(1000L) { // 1000ms timeout
      mutex.withLock {
        out.println("innerCall INSIDE of mutex")
        value++
      }
    }

    if (success == null) {
      out.println("ERROR: innerCall could not acquire the lock within timeout")
    }
  }

  fun getValue() = value
}

fun main() = runBlocking {
  val attemptToGrabLockFromSameCoroutine = AttemptToGrabLockFromSameCoroutine()

  // Launch coroutines instead of creating threads
  coroutineScope {
    val jobs = (1..2).map {
      launch { attemptToGrabLockFromSameCoroutine.outerCall() }
    }
    jobs.forEach { it.join() } // Wait for all coroutines to complete
  }

  println("Value: ${attemptToGrabLockFromSameCoroutine.getValue()}")
}

Command to reproduce:

gt.sandbox.checkout.commit fec6776d79e44c8ebe56 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run"

Recorded output of command:

> Task :app:checkKotlinGradlePluginConfigurationErrors SKIPPED
> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources NO-SOURCE
> Task :app:classes UP-TO-DATE

> Task :app:run
[2024-11-17T01:11:38.506307Z][elapsed-since-start:   54ms][tname:main/tid:1] outerCall outside of mutex
[2024-11-17T01:11:38.530549Z][elapsed-since-start:   66ms][tname:main/tid:1] outerCall INSIDE of mutex
[2024-11-17T01:11:38.530880Z][elapsed-since-start:   66ms][tname:main/tid:1] innerCall outside of mutex
[2024-11-17T01:11:38.532629Z][elapsed-since-start:   68ms][tname:main/tid:1] outerCall outside of mutex
[2024-11-17T01:11:39.536510Z][elapsed-since-start: 1072ms][tname:main/tid:1] ERROR: innerCall could not acquire the lock within timeout
[2024-11-17T01:11:39.538559Z][elapsed-since-start: 1074ms][tname:main/tid:1] ERROR: outerCall could not acquire the lock within timeout
[2024-11-17T01:11:39.539809Z][elapsed-since-start: 1075ms][tname:main/tid:1] ERROR: outerCall could not acquire the lock within timeout
Value: 0

BUILD SUCCESSFUL in 1s
2 actionable tasks: 1 executed, 1 up-to-date

Examples

lock() try/finally unlock
    // Acquire the lock to ensure only one coroutine initializes the instance.
    mutex.lock()

    try {
      // Second check after acquiring the lock in case the instance was initialized
      // while waiting on the lock.
      if (singletonFileChangeEventEmitter == null) {
        // Create the singleton instance as it doesn't exist yet.
        singletonFileChangeEventEmitter = FileChangeEventEmitterImpl(
          fileWatcher = fileWatcher,
          eventSystem = eventSystem,
        )
      }
    } finally {
      // Always release the lock to avoid deadlocks.
      mutex.unlock()
    }
Preferred: mutex.withLock() {}
  // Acquire the lock to ensure only one coroutine initializes the instance.
  mutex.withLock {
    // Second check after acquiring the lock in case the instance was initialized
    // while waiting on the lock.
    if (singletonFileChangeEventEmitter == null) {
      // Create the singleton instance as it doesn't exist yet.
      singletonFileChangeEventEmitter = FileChangeEventEmitterImpl(
        fileWatcher = fileWatcher,
        eventSystem = eventSystem,
      )
    }
  }

Children
  1. Mutex is fair (fifo)
  2. Mutex is non-reentrant
  3. Mutex: How expensive is it to take?
  4. not-designed-for-system-threads but does work with them

Backlinks