Threading and Concurrency Primitives Guide

Safe concurrent programming with OS threads, atomic types, and synchronization primitives.

Overview

Turmeric provides 1:1 OS threads via C11 <threads.h> plus thread-safe abstractions (Arc<T>, Mutex<T>, Atomic<T>) for safe concurrent programming. Integration with Turmeric's ownership model (ref<T>, borrow checking) ensures memory safety.

Higher-level primitives -- channels, futures, task groups, thread pools, semaphores -- are implemented in stdlib without FFI.

Thread Model

Creating Threads

;; Spawn a new thread
(def result
  (thread
    (fn []
      (println "Hello from thread!")
      42)))

;; Block until thread completes and get result
(println (thread-join result))  ; prints 42

Properties

Thread-Local Storage

Each thread has its own stack and thread-local variables:

;; Declare a thread-local
(thread-local my-tls 42)

;; Get current value (only accessible within this thread)
(thread-local-get my-tls)  ; => 42

;; Set current value
(thread-local-set my-tls 100)

Shared Ownership: Arc

Arc<T> (atomic reference counting) enables shared ownership across threads:

;; Create a shared value
(def shared (arc (make-counter 0)))

;; Clone the arc (increments reference count)
(thread
  (fn []
    (let [my-copy (arc-clone shared)]
      (modify-counter my-copy))))

;; Original and clones all point to same value
(println (counter-value shared))

Properties

When to Use Arc

Mutual Exclusion: Mutex

Mutex<T> (mutual exclusion) protects mutable shared state:

;; Create a mutex
(def counter (mutex 0))

;; Acquire lock, modify, and release (automatically via defer)
(with-lock counter
  (fn [value]
    (let [new-val (+ value 1)]
      new-val)))

;; Check current value (requires lock)
(let [val (with-lock counter (fn [v] v))]
  (println val))

Properties

Read-Write Locks: RwLock

For read-heavy workloads, use RwLock<T>:

;; Multiple readers or one writer
(def data (rw-lock (vec 1 2 3)))

;; Read lock (multiple threads can hold simultaneously)
(read-lock data
  (fn [vec] (println vec)))

;; Write lock (exclusive)
(write-lock data
  (fn [vec] (set-vec vec 0 42)))

Atomic Types: Atomic

For simple types, Atomic<T> provides lock-free atomic operations:

;; Atomic integer
(def counter (atomic 0))

;; Atomic load
(println (atomic-load counter))  ; => 0

;; Atomic store
(atomic-store counter 42)

;; Atomic compare-and-swap
(atomic-cas counter 42 100)  ; success if value was 42, set to 100

;; Atomic add/sub
(atomic-add counter 5)  ; atomically add 5

Types Supported

When to Use Atomic

Futures and Promises

Future<T> and Promise<T> decouple the producer and consumer of an asynchronous result. Both share a single heap-allocated cell containing a mutex, condvar, and value/error slot.

;; Create a promise/future pair (same underlying cell)
(def p (promise-new))
(def f p)

;; Producer thread fulfills the promise
(thread (fn [] (promise-fulfill p 42)))

;; Consumer blocks on the future
(def result (future-get f))
(println (ok? result))     ; => true
(println (ok-val result))  ; => 42
(promise-free p)

Core API

Function Notes
promise-new Returns an unsettled FutureCell*
promise-fulfill [p v] Settles ok with value v; aborts if already settled
promise-fail [p e] Settles with error code e; aborts if already settled
promise-free [p] Frees the shared cell
future-get [f] Blocks until settled; returns a heap Result*
future-done? [f] Non-blocking settled check
future-cancel [f] Settles with exn = -2 if not yet settled
future-cancelled? [f] Returns true if cancelled (exn == -2)
future-free [f] Frees the shared cell

Pre-Settled Futures

(def f (future-of 99))       ; immediately fulfilled
(future-done? f)             ; => true

(def e (future-error-of 7))  ; immediately rejected

Combinators

;; Map a function over the fulfilled value
(def f2 (future-map f (fn [v] (* v 2))))

;; Flat-map: fn must return a new future
(def f3 (future-then f (fn [v] (future-of (+ v 1)))))

Multi-Combinators

Function Behaviour
future-race [fa fb] First to settle wins
future-all2 [fa fb] Both must succeed; result carries fa's value
future-any2 [fa fb] First to fulfill wins; both rejecting yields fb's error
future-join [fa fb] Both must succeed; result is a Tuple2 pair
future-race-n [futures n] Variadic race over a pointer array
future-all-n [futures n] All must succeed
future-any-n [futures n] First fulfillment wins
;; Join two futures into a pair
(def tup-future (future-join fa fb))
(def tup-result (future-get tup-future))
(when (ok? tup-result)
  (let [tup (ok-val tup-result)]
    (println (tuple-first tup))
    (println (tuple-second tup))
    (tuple-free tup)))

Timeouts

;; Race a computation against a deadline
(def result (future-get (future-with-timeout task-future 5000)))
(if (future-cancelled? result)
  (println "timed out")
  (println (ok-val result)))

;; Stand-alone timeout future (rejects with exn = -1 after ms)
(def t (future-timeout 1000))

Synchronization Primitives

Channels

Turmeric provides two channel types backed by the same ring-buffer layout:

Synchronous Channel (Chan)

(def ch (chan-new 8))

;; Producer thread
(thread
  (fn []
    (chan-send ch 1)
    (chan-send ch 2)
    (chan-send ch 3)))

;; Consumer
(println (chan-recv ch))  ; => 1
(println (chan-recv ch))  ; => 2
(println (chan-recv ch))  ; => 3
(chan-free ch)
Function Signature Notes
chan-new [cap :int] :ptr<void> Allocates ring-buffer with given capacity
chan-send [ch val :int] :nil Blocks when full
chan-recv [ch] :int Blocks when empty
chan-free [ch] :nil Destroys mutex/condvar and frees buffer

Async Buffered Channel (AsyncChan)

(def ch (async-chan-new 16))

(async-chan-send ch 99)

;; Non-blocking send (returns false if full)
(if (async-chan-try-send ch 100) ...)

;; Non-blocking recv (returns INT64_MIN if empty)
(let [v (async-chan-try-recv ch)] ...)

(println (async-chan-count ch))  ; current item count
(async-chan-free ch)
Function Signature Notes
async-chan-new [cap :int] :ptr<void>
async-chan-send [ch val :int] :nil Blocks when full
async-chan-recv [ch] :int Blocks when empty
async-chan-try-send [ch val :int] :bool Returns false if full; never blocks
async-chan-try-recv [ch] :int Returns INT64_MIN if empty; never blocks
async-chan-count [ch] :int Current item count (locked)
async-chan-free [ch] :nil

Multi-Channel Select

select waits on multiple channel operations and executes the first one ready.

(def ch-a (chan-new 4))
(def ch-b (chan-new 4))

;; Poll with a default arm (never blocks)
(let [[idx val] (select
                  (ch-a :recv)
                  (ch-b :recv)
                  (:default :nothing))]
  (cond
    (= idx 0) (println (str "from ch-a: " val))
    (= idx 1) (println (str "from ch-b: " val))
    :else     (println "nothing ready")))

;; Send-or-drop
(select
  (ch-a :send 99)
  (:default (println "ch-a full, dropping")))

Returns (index value) where index is the 0-based clause position (or -1 for :default) and value is the received value, true for :send, or the :default expression result.

When multiple clauses are simultaneously ready, select picks one uniformly at random using an xorshift32 PRNG so that no single channel is systematically favoured. When no clause is ready and no :default is present, select blocks on all channels concurrently and wakes as soon as any one of them becomes ready.

Condition Variables

Block until a condition is signaled:

(def cond (condition-variable))

;; Thread A: wait for signal
(with-lock mutex
  (fn [_]
    (condition-wait cond mutex)
    (println "woken!")))

;; Thread B: signal the condition
(condition-signal cond)

Semaphore

A counting semaphore implemented with mutex + condvar (portable; pthread_sem_t unnamed form is unavailable on macOS).

;; Binary semaphore (mutex-like)
(def s (sem-new 1))

;; Limit concurrency to 3 parallel workers
(def sem (sem-new 3))

(thread
  (fn []
    (sem-acquire sem)
    (do-work)
    (sem-release sem)))
Function Notes
sem-new [initial] initial=0 starts locked; initial=1 is a binary semaphore
sem-acquire [s] Decrements; blocks when count is 0
sem-release [s] Increments and wakes one blocked acquirer
sem-free [s]

One-Time Initialization: Once

once-call guarantees an initializer runs exactly once no matter how many threads call it concurrently.

(def flag (once-flag-new))

;; Safe to call from any number of threads
(once-call flag init-resource)

(once-flag-free flag)
Function Notes
once-flag-new Allocates a pthread_once_t on the heap
once-call [flag init-fn] Calls init-fn at most once across all threads
once-flag-free [flag]

Thread Pools

WorkQueue

A thread-safe FIFO used internally by both pool variants; also available directly for custom worker patterns. A sentinel value INT64_MIN returned by work-queue-pop signals that the queue has been closed.

;; Unbounded queue (grows dynamically)
(def q (work-queue-new))

;; Bounded queue (push blocks when full)
(def bq (work-queue-new-bounded 64))

(work-queue-push q 42)
(let [v (work-queue-pop q)] ...)  ; blocks until item available

;; Shutdown: wake all blocked producers and consumers
(work-queue-close q)
(work-queue-free q)
Function Notes
work-queue-new Unbounded; doubles capacity on growth
work-queue-new-bounded [cap] Fixed ring-buffer; push blocks when full
work-queue-push [q v] Blocks on bounded queue when full; no-op after close
work-queue-pop [q] Blocks until item available; returns INT64_MIN after close
work-queue-close [q] Broadcasts to all blocked threads
work-queue-free [q]

Fixed Thread Pool

A pool of n worker threads. Tasks are C function pointers; results are delivered via Future (see the Futures section).

(def tp (thread-pool-new 4))

;; Submit returns a Future fulfilled with the task's return value
(def fut (thread-pool-submit tp my-work-fn my-arg))
(def result (future-get fut))
(println (ok-val result))

(thread-pool-shutdown tp)  ; closes queue and joins all workers
(thread-pool-free tp)      ; must be called after shutdown
Function Notes
thread-pool-new [n] Spawns n worker threads immediately
thread-pool-submit [tp task-fn task-arg] Returns a FutureCell*
thread-pool-shutdown [tp] Closes queue, joins workers
thread-pool-free [tp] Call after shutdown

Auto-Scaling Thread Pool

Starts with min-threads workers and grows up to max-threads when all current workers are busy.

(def dtp (thread-pool-new-dynamic 2 8))

(def fut (thread-pool-dynamic-submit dtp my-work-fn nil))
(def result (future-get fut))

(thread-pool-dynamic-shutdown dtp)
(thread-pool-dynamic-free dtp)
Function Notes
thread-pool-new-dynamic [min max] Spawns min workers; scales to max
thread-pool-dynamic-submit [tp task-fn task-arg] Spawns a new worker if idle count is 0 and below max
thread-pool-dynamic-shutdown [tp]
thread-pool-dynamic-free [tp] Call after shutdown

Structured Concurrency: TaskGroup

TaskGroup provides structured concurrency: spawn a group of fibers, then wait for or cancel all of them together. When a task panics, the group is automatically cancelled.

Cancellation is cooperative -- tasks must periodically check task-group-should-exit? or fiber-cancelled? and exit if set.

Basic Usage

;; Manual lifecycle
(def g (task-group-new))

(task-group-spawn g
  (fn []
    (println "worker A")
    (task-group-task-done g)))

(task-group-spawn g
  (fn []
    (println "worker B")
    (task-group-task-done g)))

(task-group-wait g)
(task-group-free g)

;; Preferred: task-group-with macro (calls wait automatically)
(def g (task-group-new))
(task-group-with g
  (task-group-spawn g my-fiber-a)
  (task-group-spawn g my-fiber-b))
(task-group-free g)

Cooperative Cancellation

(def g (task-group-new))

(task-group-spawn g
  (fn []
    (while (not (task-group-should-exit? g))
      (do-work))
    (task-group-task-done g)))

;; Cancel from another thread or the parent
(task-group-cancel g)
(task-group-wait g)
(task-group-free g)

Timeouts

;; Auto-cancel after 5 seconds
(def g (task-group-new))
(task-group-with-timeout g 5000
  (task-group-spawn g long-running-task))
(task-group-free g)

Lifecycle API

Function Notes
task-group-new Allocates an empty group
task-group-spawn [group f] Increments task count; returns fiber handle
task-group-task-done [group] Each spawned task must call this on exit
task-group-join [group handle] Wait for one specific fiber
task-group-wait [group] Block until all tasks complete
task-group-done? [group] Non-blocking group completion check
task-handle-done? [handle] Non-blocking per-fiber check
task-group-free [group]

Cancellation API

Function Notes
task-group-cancel [group] Manual cancel; reason = 0
task-group-cancel-with-reason [group reason] 0=manual 1=panic 2=timeout 3=error
task-group-cancel-panic [group] Convenience: reason = 1
task-group-cancel-timeout [group] Convenience: reason = 2
task-group-cancel-error [group] Convenience: reason = 3
task-group-cancel-reason [group] Returns the reason code
task-group-cancelled? [group] Check group-level cancel flag
fiber-cancelled? Check thread-local cancel flag for current fiber
task-group-should-exit? [group] Combines both checks (preferred in task bodies)
fiber-should-exit? Alias for fiber-cancelled?

Macros

Macro Notes
task-group-with [group & body] Runs body then calls task-group-wait
task-group-with-timeout [group ms & body] Auto-cancels after ms milliseconds
task-group-with-cancellation [group & body] Skips body if already cancelled

Async Integration

;; Spawn an async computation into a task group; get a Future back
(def g (task-group-new))
(def fut (task-group-spawn-async g my-thunk))
(def result (future-get fut))
(task-group-free g)

;; Macro form
(task-group-async g my-thunk)

Panic propagation is automatic: if a fiber spawned into a group panics, the group is cancelled with reason 1 (panic). No extra API is needed.

Safety Guarantees

Send and Sync Traits

Marker traits control what types can be safely shared:

;; These are Send (safe to move to threads)
int, bool, string, (Pair a b) [Send a, Send b]

;; These are Sync (safe to share via &)
int, bool, Mutex<T> [T : Sync]

;; NOT Sync (require Mutex for shared access)
Rc<T> (thread-local ref counting)
ref<T> (single-thread ownership)

Most library types implement these traits automatically based on their fields.

Borrow Checking Across Threads

Turmeric's borrow checker enforces:

;; ERROR: cannot move borrowed reference to thread
(let [x 42]
  (thread
    (fn []
      (println x))))  ; x is borrowed; can't move across boundary

;; OK: clone or use Arc
(let [x (arc 42)]
  (thread
    (fn []
      (println (arc-deref x)))))

Common Patterns

Producer-Consumer

(def ch (chan-new 16))

;; Producer
(thread
  (fn []
    (for-each items
      (fn [item] (chan-send ch item)))
    (chan-send ch :done)))

;; Consumer
(let loop []
  (let [v (chan-recv ch)]
    (when (not= v :done)
      (process v)
      (loop))))

(chan-free ch)

Thread-Safe Counter

(def counter (mutex 0))

(for-each (range 10)
  (fn [i]
    (thread
      (fn []
        (with-lock counter
          (fn [n]
            (+ n 1)))))))

(println (with-lock counter (fn [n] n)))  ; => 10

Barrier

(def barrier (barrier-new 3))

(for-each (range 3)
  (fn [i]
    (thread
      (fn []
        (println (str "Thread " i " starting"))
        (barrier-wait barrier)
        (println (str "Thread " i " done"))))))

Structured Concurrency with TaskGroup

;; Spawn N workers; cancel all if one fails or times out
(def g (task-group-new))
(task-group-with-timeout g 10000
  (for-each tasks
    (fn [task]
      (task-group-spawn g
        (fn []
          (when (not (task-group-should-exit? g))
            (run-task task))
          (task-group-task-done g))))))
(task-group-free g)

Fan-Out with Thread Pool and Futures

(def tp (thread-pool-new 4))

;; Submit all tasks and collect futures
(def futures
  (map items (fn [item]
    (thread-pool-submit tp process-item item))))

;; Await all results
(for-each futures
  (fn [fut]
    (let [r (future-get fut)]
      (if (ok? r)
        (collect (ok-val r))
        (log-error (err-val r))))))

(thread-pool-shutdown tp)
(thread-pool-free tp)

Web REPL (WASM) Threading Constraints

The web REPL runs Turmeric inside a WebAssembly module compiled with Emscripten pthreads. The threading model differs from native tur run in a few ways:

Worker pool

The WASM build uses -sPTHREAD_POOL_SIZE_STRICT=0, which means the Worker pool grows lazily on demand. There is no hard cap on concurrent threads. The first time a thread is created beyond the current pool size there is additional latency while the browser spawns a new Worker; subsequent reuse of that Worker is fast.

This limit does not apply to native builds, which use unrestricted POSIX threads.

Eval Worker

All code evaluation in the browser REPL runs inside a dedicated eval Worker (eval-worker.js). This is necessary because Atomics.wait (which Emscripten uses for pthread_cond_wait) is prohibited on the browser's main thread. The eval Worker can block freely on channel operations and select without freezing the tab.

No pthread_cancel

Emscripten does not implement pthread_cancel. Turmeric's cooperative cancellation design (cancel flag + condvar check) is unaffected by this -- it does not call pthread_cancel.

Cross-origin isolation

The site must serve Cross-Origin-Opener-Policy: same-origin and Cross-Origin-Embedder-Policy: require-corp on every response for SharedArrayBuffer (required by Emscripten pthreads) to be available in the browser. Self-hosted deployments must set these headers; the hosted turmeric-lang.com site does this via the Cloudflare Worker.

Optimization level

The threaded WASM build uses -O2 rather than -O3. Older wasm-opt (Binaryen) versions can mishandle the shared-memory semantics required by pthreads at -O3. Once the threaded build has been smoke-tested at -O3 and confirmed clean, the flag will be restored.


See Also