Safe concurrent programming with OS threads, atomic types, and synchronization primitives.
Turmeric provides 1:1 OS threads via C11 <threads.h> plus thread-safe abstractions (Arc<T>, Mutex<T>, Atomic<T>) for safe concurrent programming. Integration with Turmeric's ownership model (ref<T>, borrow checking) ensures memory safety.
Higher-level primitives -- channels, futures, task groups, thread pools, semaphores -- are implemented in stdlib without FFI.
;; Spawn a new thread
(def result
(thread
(fn []
(println "Hello from thread!")
42)))
;; Block until thread completes and get result
(println (thread-join result)) ; prints 42
Each thread has its own stack and thread-local variables:
;; Declare a thread-local
(thread-local my-tls 42)
;; Get current value (only accessible within this thread)
(thread-local-get my-tls) ; => 42
;; Set current value
(thread-local-set my-tls 100)
Arc<T> (atomic reference counting) enables shared ownership across threads:
;; Create a shared value
(def shared (arc (make-counter 0)))
;; Clone the arc (increments reference count)
(thread
(fn []
(let [my-copy (arc-clone shared)]
(modify-counter my-copy))))
;; Original and clones all point to same value
(println (counter-value shared))
arc-clone increments the count; arc drop decrements atomically.Arc<T> gives shared read-only access. For mutable shared state, wrap in Mutex<T>.Mutex<T> (mutual exclusion) protects mutable shared state:
;; Create a mutex
(def counter (mutex 0))
;; Acquire lock, modify, and release (automatically via defer)
(with-lock counter
(fn [value]
(let [new-val (+ value 1)]
new-val)))
;; Check current value (requires lock)
(let [val (with-lock counter (fn [v] v))]
(println val))
defer).with-lock blocks until available.For read-heavy workloads, use RwLock<T>:
;; Multiple readers or one writer
(def data (rw-lock (vec 1 2 3)))
;; Read lock (multiple threads can hold simultaneously)
(read-lock data
(fn [vec] (println vec)))
;; Write lock (exclusive)
(write-lock data
(fn [vec] (set-vec vec 0 42)))
For simple types, Atomic<T> provides lock-free atomic operations:
;; Atomic integer
(def counter (atomic 0))
;; Atomic load
(println (atomic-load counter)) ; => 0
;; Atomic store
(atomic-store counter 42)
;; Atomic compare-and-swap
(atomic-cas counter 42 100) ; success if value was 42, set to 100
;; Atomic add/sub
(atomic-add counter 5) ; atomically add 5
int, bool, usize, pointer typesFuture<T> and Promise<T> decouple the producer and consumer of an
asynchronous result. Both share a single heap-allocated cell containing a
mutex, condvar, and value/error slot.
;; Create a promise/future pair (same underlying cell)
(def p (promise-new))
(def f p)
;; Producer thread fulfills the promise
(thread (fn [] (promise-fulfill p 42)))
;; Consumer blocks on the future
(def result (future-get f))
(println (ok? result)) ; => true
(println (ok-val result)) ; => 42
(promise-free p)
| Function | Notes |
|---|---|
promise-new |
Returns an unsettled FutureCell* |
promise-fulfill [p v] |
Settles ok with value v; aborts if already settled |
promise-fail [p e] |
Settles with error code e; aborts if already settled |
promise-free [p] |
Frees the shared cell |
future-get [f] |
Blocks until settled; returns a heap Result* |
future-done? [f] |
Non-blocking settled check |
future-cancel [f] |
Settles with exn = -2 if not yet settled |
future-cancelled? [f] |
Returns true if cancelled (exn == -2) |
future-free [f] |
Frees the shared cell |
(def f (future-of 99)) ; immediately fulfilled
(future-done? f) ; => true
(def e (future-error-of 7)) ; immediately rejected
;; Map a function over the fulfilled value
(def f2 (future-map f (fn [v] (* v 2))))
;; Flat-map: fn must return a new future
(def f3 (future-then f (fn [v] (future-of (+ v 1)))))
| Function | Behaviour |
|---|---|
future-race [fa fb] |
First to settle wins |
future-all2 [fa fb] |
Both must succeed; result carries fa's value |
future-any2 [fa fb] |
First to fulfill wins; both rejecting yields fb's error |
future-join [fa fb] |
Both must succeed; result is a Tuple2 pair |
future-race-n [futures n] |
Variadic race over a pointer array |
future-all-n [futures n] |
All must succeed |
future-any-n [futures n] |
First fulfillment wins |
;; Join two futures into a pair
(def tup-future (future-join fa fb))
(def tup-result (future-get tup-future))
(when (ok? tup-result)
(let [tup (ok-val tup-result)]
(println (tuple-first tup))
(println (tuple-second tup))
(tuple-free tup)))
;; Race a computation against a deadline
(def result (future-get (future-with-timeout task-future 5000)))
(if (future-cancelled? result)
(println "timed out")
(println (ok-val result)))
;; Stand-alone timeout future (rejects with exn = -1 after ms)
(def t (future-timeout 1000))
Turmeric provides two channel types backed by the same ring-buffer layout:
Chan -- synchronous, blocking send/recv only.AsyncChan -- buffered, blocking by default, with non-blocking try variants.(def ch (chan-new 8))
;; Producer thread
(thread
(fn []
(chan-send ch 1)
(chan-send ch 2)
(chan-send ch 3)))
;; Consumer
(println (chan-recv ch)) ; => 1
(println (chan-recv ch)) ; => 2
(println (chan-recv ch)) ; => 3
(chan-free ch)
| Function | Signature | Notes |
|---|---|---|
chan-new |
[cap :int] :ptr<void> |
Allocates ring-buffer with given capacity |
chan-send |
[ch val :int] :nil |
Blocks when full |
chan-recv |
[ch] :int |
Blocks when empty |
chan-free |
[ch] :nil |
Destroys mutex/condvar and frees buffer |
(def ch (async-chan-new 16))
(async-chan-send ch 99)
;; Non-blocking send (returns false if full)
(if (async-chan-try-send ch 100) ...)
;; Non-blocking recv (returns INT64_MIN if empty)
(let [v (async-chan-try-recv ch)] ...)
(println (async-chan-count ch)) ; current item count
(async-chan-free ch)
| Function | Signature | Notes |
|---|---|---|
async-chan-new |
[cap :int] :ptr<void> |
|
async-chan-send |
[ch val :int] :nil |
Blocks when full |
async-chan-recv |
[ch] :int |
Blocks when empty |
async-chan-try-send |
[ch val :int] :bool |
Returns false if full; never blocks |
async-chan-try-recv |
[ch] :int |
Returns INT64_MIN if empty; never blocks |
async-chan-count |
[ch] :int |
Current item count (locked) |
async-chan-free |
[ch] :nil |
select waits on multiple channel operations and executes the first one ready.
(def ch-a (chan-new 4))
(def ch-b (chan-new 4))
;; Poll with a default arm (never blocks)
(let [[idx val] (select
(ch-a :recv)
(ch-b :recv)
(:default :nothing))]
(cond
(= idx 0) (println (str "from ch-a: " val))
(= idx 1) (println (str "from ch-b: " val))
:else (println "nothing ready")))
;; Send-or-drop
(select
(ch-a :send 99)
(:default (println "ch-a full, dropping")))
Returns (index value) where index is the 0-based clause position (or -1
for :default) and value is the received value, true for :send, or the
:default expression result.
When multiple clauses are simultaneously ready, select picks one uniformly at
random using an xorshift32 PRNG so that no single channel is systematically
favoured. When no clause is ready and no :default is present, select blocks
on all channels concurrently and wakes as soon as any one of them becomes ready.
Block until a condition is signaled:
(def cond (condition-variable))
;; Thread A: wait for signal
(with-lock mutex
(fn [_]
(condition-wait cond mutex)
(println "woken!")))
;; Thread B: signal the condition
(condition-signal cond)
A counting semaphore implemented with mutex + condvar (portable; pthread_sem_t
unnamed form is unavailable on macOS).
;; Binary semaphore (mutex-like)
(def s (sem-new 1))
;; Limit concurrency to 3 parallel workers
(def sem (sem-new 3))
(thread
(fn []
(sem-acquire sem)
(do-work)
(sem-release sem)))
| Function | Notes |
|---|---|
sem-new [initial] |
initial=0 starts locked; initial=1 is a binary semaphore |
sem-acquire [s] |
Decrements; blocks when count is 0 |
sem-release [s] |
Increments and wakes one blocked acquirer |
sem-free [s] |
once-call guarantees an initializer runs exactly once no matter how many
threads call it concurrently.
(def flag (once-flag-new))
;; Safe to call from any number of threads
(once-call flag init-resource)
(once-flag-free flag)
| Function | Notes |
|---|---|
once-flag-new |
Allocates a pthread_once_t on the heap |
once-call [flag init-fn] |
Calls init-fn at most once across all threads |
once-flag-free [flag] |
A thread-safe FIFO used internally by both pool variants; also available
directly for custom worker patterns. A sentinel value INT64_MIN returned by
work-queue-pop signals that the queue has been closed.
;; Unbounded queue (grows dynamically)
(def q (work-queue-new))
;; Bounded queue (push blocks when full)
(def bq (work-queue-new-bounded 64))
(work-queue-push q 42)
(let [v (work-queue-pop q)] ...) ; blocks until item available
;; Shutdown: wake all blocked producers and consumers
(work-queue-close q)
(work-queue-free q)
| Function | Notes |
|---|---|
work-queue-new |
Unbounded; doubles capacity on growth |
work-queue-new-bounded [cap] |
Fixed ring-buffer; push blocks when full |
work-queue-push [q v] |
Blocks on bounded queue when full; no-op after close |
work-queue-pop [q] |
Blocks until item available; returns INT64_MIN after close |
work-queue-close [q] |
Broadcasts to all blocked threads |
work-queue-free [q] |
A pool of n worker threads. Tasks are C function pointers; results are
delivered via Future (see the Futures section).
(def tp (thread-pool-new 4))
;; Submit returns a Future fulfilled with the task's return value
(def fut (thread-pool-submit tp my-work-fn my-arg))
(def result (future-get fut))
(println (ok-val result))
(thread-pool-shutdown tp) ; closes queue and joins all workers
(thread-pool-free tp) ; must be called after shutdown
| Function | Notes |
|---|---|
thread-pool-new [n] |
Spawns n worker threads immediately |
thread-pool-submit [tp task-fn task-arg] |
Returns a FutureCell* |
thread-pool-shutdown [tp] |
Closes queue, joins workers |
thread-pool-free [tp] |
Call after shutdown |
Starts with min-threads workers and grows up to max-threads when all
current workers are busy.
(def dtp (thread-pool-new-dynamic 2 8))
(def fut (thread-pool-dynamic-submit dtp my-work-fn nil))
(def result (future-get fut))
(thread-pool-dynamic-shutdown dtp)
(thread-pool-dynamic-free dtp)
| Function | Notes |
|---|---|
thread-pool-new-dynamic [min max] |
Spawns min workers; scales to max |
thread-pool-dynamic-submit [tp task-fn task-arg] |
Spawns a new worker if idle count is 0 and below max |
thread-pool-dynamic-shutdown [tp] |
|
thread-pool-dynamic-free [tp] |
Call after shutdown |
TaskGroup provides structured concurrency: spawn a group of fibers, then
wait for or cancel all of them together. When a task panics, the group is
automatically cancelled.
Cancellation is cooperative -- tasks must periodically check
task-group-should-exit? or fiber-cancelled? and exit if set.
;; Manual lifecycle
(def g (task-group-new))
(task-group-spawn g
(fn []
(println "worker A")
(task-group-task-done g)))
(task-group-spawn g
(fn []
(println "worker B")
(task-group-task-done g)))
(task-group-wait g)
(task-group-free g)
;; Preferred: task-group-with macro (calls wait automatically)
(def g (task-group-new))
(task-group-with g
(task-group-spawn g my-fiber-a)
(task-group-spawn g my-fiber-b))
(task-group-free g)
(def g (task-group-new))
(task-group-spawn g
(fn []
(while (not (task-group-should-exit? g))
(do-work))
(task-group-task-done g)))
;; Cancel from another thread or the parent
(task-group-cancel g)
(task-group-wait g)
(task-group-free g)
;; Auto-cancel after 5 seconds
(def g (task-group-new))
(task-group-with-timeout g 5000
(task-group-spawn g long-running-task))
(task-group-free g)
| Function | Notes |
|---|---|
task-group-new |
Allocates an empty group |
task-group-spawn [group f] |
Increments task count; returns fiber handle |
task-group-task-done [group] |
Each spawned task must call this on exit |
task-group-join [group handle] |
Wait for one specific fiber |
task-group-wait [group] |
Block until all tasks complete |
task-group-done? [group] |
Non-blocking group completion check |
task-handle-done? [handle] |
Non-blocking per-fiber check |
task-group-free [group] |
| Function | Notes |
|---|---|
task-group-cancel [group] |
Manual cancel; reason = 0 |
task-group-cancel-with-reason [group reason] |
0=manual 1=panic 2=timeout 3=error |
task-group-cancel-panic [group] |
Convenience: reason = 1 |
task-group-cancel-timeout [group] |
Convenience: reason = 2 |
task-group-cancel-error [group] |
Convenience: reason = 3 |
task-group-cancel-reason [group] |
Returns the reason code |
task-group-cancelled? [group] |
Check group-level cancel flag |
fiber-cancelled? |
Check thread-local cancel flag for current fiber |
task-group-should-exit? [group] |
Combines both checks (preferred in task bodies) |
fiber-should-exit? |
Alias for fiber-cancelled? |
| Macro | Notes |
|---|---|
task-group-with [group & body] |
Runs body then calls task-group-wait |
task-group-with-timeout [group ms & body] |
Auto-cancels after ms milliseconds |
task-group-with-cancellation [group & body] |
Skips body if already cancelled |
;; Spawn an async computation into a task group; get a Future back
(def g (task-group-new))
(def fut (task-group-spawn-async g my-thunk))
(def result (future-get fut))
(task-group-free g)
;; Macro form
(task-group-async g my-thunk)
Panic propagation is automatic: if a fiber spawned into a group panics, the group is cancelled with reason 1 (panic). No extra API is needed.
Marker traits control what types can be safely shared:
Send -- Type can be moved across thread boundaries. If T : Send, Arc<T> can be cloned and sent to another thread.Sync -- Type can be safely shared via &T in multiple threads. If T : Sync, multiple threads can hold &T simultaneously without a Mutex.;; These are Send (safe to move to threads)
int, bool, string, (Pair a b) [Send a, Send b]
;; These are Sync (safe to share via &)
int, bool, Mutex<T> [T : Sync]
;; NOT Sync (require Mutex for shared access)
Rc<T> (thread-local ref counting)
ref<T> (single-thread ownership)
Most library types implement these traits automatically based on their fields.
Turmeric's borrow checker enforces:
;; ERROR: cannot move borrowed reference to thread
(let [x 42]
(thread
(fn []
(println x)))) ; x is borrowed; can't move across boundary
;; OK: clone or use Arc
(let [x (arc 42)]
(thread
(fn []
(println (arc-deref x)))))
(def ch (chan-new 16))
;; Producer
(thread
(fn []
(for-each items
(fn [item] (chan-send ch item)))
(chan-send ch :done)))
;; Consumer
(let loop []
(let [v (chan-recv ch)]
(when (not= v :done)
(process v)
(loop))))
(chan-free ch)
(def counter (mutex 0))
(for-each (range 10)
(fn [i]
(thread
(fn []
(with-lock counter
(fn [n]
(+ n 1)))))))
(println (with-lock counter (fn [n] n))) ; => 10
(def barrier (barrier-new 3))
(for-each (range 3)
(fn [i]
(thread
(fn []
(println (str "Thread " i " starting"))
(barrier-wait barrier)
(println (str "Thread " i " done"))))))
;; Spawn N workers; cancel all if one fails or times out
(def g (task-group-new))
(task-group-with-timeout g 10000
(for-each tasks
(fn [task]
(task-group-spawn g
(fn []
(when (not (task-group-should-exit? g))
(run-task task))
(task-group-task-done g))))))
(task-group-free g)
(def tp (thread-pool-new 4))
;; Submit all tasks and collect futures
(def futures
(map items (fn [item]
(thread-pool-submit tp process-item item))))
;; Await all results
(for-each futures
(fn [fut]
(let [r (future-get fut)]
(if (ok? r)
(collect (ok-val r))
(log-error (err-val r))))))
(thread-pool-shutdown tp)
(thread-pool-free tp)
The web REPL runs Turmeric inside a WebAssembly module compiled with
Emscripten pthreads. The threading model differs from native tur run in a
few ways:
The WASM build uses -sPTHREAD_POOL_SIZE_STRICT=0, which means the Worker
pool grows lazily on demand. There is no hard cap on concurrent threads.
The first time a thread is created beyond the current pool size there is
additional latency while the browser spawns a new Worker; subsequent reuse
of that Worker is fast.
This limit does not apply to native builds, which use unrestricted POSIX threads.
All code evaluation in the browser REPL runs inside a dedicated eval Worker
(eval-worker.js). This is necessary because Atomics.wait (which
Emscripten uses for pthread_cond_wait) is prohibited on the browser's main
thread. The eval Worker can block freely on channel operations and select
without freezing the tab.
pthread_cancelEmscripten does not implement pthread_cancel. Turmeric's cooperative
cancellation design (cancel flag + condvar check) is unaffected by this --
it does not call pthread_cancel.
The site must serve Cross-Origin-Opener-Policy: same-origin and
Cross-Origin-Embedder-Policy: require-corp on every response for
SharedArrayBuffer (required by Emscripten pthreads) to be available in
the browser. Self-hosted deployments must set these headers; the hosted
turmeric-lang.com site does this via the Cloudflare Worker.
The threaded WASM build uses -O2 rather than -O3. Older wasm-opt
(Binaryen) versions can mishandle the shared-memory semantics required by
pthreads at -O3. Once the threaded build has been smoke-tested at -O3
and confirmed clean, the flag will be restored.