not-designed-for-system-threads but does work with them

kotlinx.coroutines.sync.Mutex:

  • Not intended for thread synchronization.
  • Does make sure a single "logical execution" can be within withLock block.
    • CoRoutine can suspend and switch to different thread. But the withLock block will be executed by a single logical execution.

Mutual exclusion solution to the problem is to protect all modifications of the shared state with a critical section that is never executed concurrently. In a blocking world you'd typically use synchronized or ReentrantLock for that. Coroutine's alternative is called Mutex. It has lock and unlock functions to delimit a critical section. The key difference is that Mutex.lock() is a suspending function. It does not block a thread.

There is also withLock extension function that conveniently represents mutex.lock(); try { ... } finally { mutex.unlock() } pattern: - kotlin doc

Experimentally blocks a thread

In the following example, Mutex ends up synchronizing the counter across threads.

Stack overflow Question on this with explanation

Example where Mutex appears to block system level threads

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import com.glassthought.sandbox.util.out.impl.OutSettings
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.concurrent.thread

val mutex = Mutex()

var counter = 0

suspend fun test() {
  mutex.withLock {
    counter = counter + 1
  }
}

val out = Out.standard(OutSettings(printCoroutineName = false))
val TIMES_TO_REPEAT = 100000

suspend fun main(args: Array<String>) {

  out.info("Starting on main thread")

  val t1 = thread {
    runBlocking {
      incrementInALoop("thread-1")
    }
  }

  val t2 = thread {
    runBlocking {
      incrementInALoop("thread-2")
    }
  }

  t1.join()
  t2.join()

  printResults()
}

private suspend fun incrementInALoop(threadName: String) {
  out.info("Starting execution on $threadName")
  repeat(TIMES_TO_REPEAT) {
    test()
  }
  out.info("Finished execution on $threadName")
}

private suspend fun printResults() {
  out.info("Counter : $counter")
  val expected = TIMES_TO_REPEAT * 2
  out.info("Expected: $expected")
  if (expected == counter) {
    out.printGreen("All accounted!")
    out.println("")
  } else {
    out.printRed("NOT all accounted!")
    out.println("")
  }
}

Command to reproduce:

gt.sandbox.checkout.commit badef9ec1f9c2ffed319 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[elapsed:   22ms][šŸ„‡/tname:main/tid:1] Starting on main thread
[elapsed:   57ms][ā“¶/tname:Thread-1/tid:21] Starting execution on thread-2
[elapsed:   57ms][ā“·/tname:Thread-0/tid:20] Starting execution on thread-1
[elapsed:  422ms][ā“·/tname:Thread-0/tid:20] Finished execution on thread-1
[elapsed:  423ms][ā“¶/tname:Thread-1/tid:21] Finished execution on thread-2
[elapsed:  425ms][šŸ„‡/tname:main/tid:1] Counter : 200000
[elapsed:  425ms][šŸ„‡/tname:main/tid:1] Expected: 200000
All accounted!
The same example without using Mutex shows counter synchronization issues.

Code

package com.glassthought.sandbox

import gt.sandbox.util.output.Out
import com.glassthought.sandbox.util.out.impl.OutSettings
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.concurrent.thread

val mutex = Mutex()

var counter = 0

suspend fun test() {
  // mutex.withLock {
    counter = counter + 1
  // }
}

val out = Out.standard(OutSettings(printCoroutineName = false))
val TIMES_TO_REPEAT = 100000

suspend fun main(args: Array<String>) {

  out.info("Starting on main thread")

  val t1 = thread {
    runBlocking {
      incrementInALoop("thread-1")
    }
  }

  val t2 = thread {
    runBlocking {
      incrementInALoop("thread-2")
    }
  }

  t1.join()
  t2.join()

  printResults()
}

private suspend fun incrementInALoop(threadName: String) {
  out.info("Starting execution on $threadName")
  repeat(TIMES_TO_REPEAT) {
    test()
  }
  out.info("Finished execution on $threadName")
}

private suspend fun printResults() {
  out.info("Counter : $counter")
  val expected = TIMES_TO_REPEAT * 2
  out.info("Expected: $expected")
  if (expected == counter) {
    out.printGreen("All accounted!")
    out.println("")
  } else {
    out.printRed("NOT all accounted!")
    out.println("")
  }
}

Command to reproduce:

gt.sandbox.checkout.commit 098034c6e697d9903df1 \
&& cd "${GT_SANDBOX_REPO}" \
&& cmd.run.announce "./gradlew run --quiet"

Recorded output of command:

[elapsed:   22ms][šŸ„‡/tname:main/tid:1] Starting on main thread
[elapsed:   49ms][ā“¶/tname:Thread-1/tid:21] Starting execution on thread-2
[elapsed:   49ms][ā“·/tname:Thread-0/tid:20] Starting execution on thread-1
[elapsed:   58ms][ā“·/tname:Thread-0/tid:20] Finished execution on thread-1
[elapsed:   59ms][ā“¶/tname:Thread-1/tid:21] Finished execution on thread-2
[elapsed:   60ms][šŸ„‡/tname:main/tid:1] Counter : 159744
[elapsed:   60ms][šŸ„‡/tname:main/tid:1] Expected: 200000
NOT all accounted!


Children
  1. Mutex experimentally-blocks-thread

Backlinks