Concurrency & Actors
Saga runs on the BEAM, which means concurrency is built on isolated processes
that communicate through message passing. There is no shared memory between
processes and no data races by construction. Saga wraps the BEAM's actor model
in effects, so spawning processes and sending messages use the same ! syntax
as any other effect operation.
The Actor Model
A BEAM process is a lightweight, isolated unit of execution with its own mailbox. Processes communicate by sending immutable messages to each other. This is fundamentally different from thread-based concurrency: there are no locks, no mutexes, and no shared state.
Saga exposes this through two effects:
Process: spawning processes, sending messages, and exiting processesActor msg: receiving messages in the current process's typed mailbox
Both are handled by the beam_actor handler, which maps directly to BEAM
primitives.
import Std.Actor (Process, Actor, beam_actor)Spawning and Sending
spawn! takes a function and runs it in a new process. It returns a Pid msg,
a process identifier typed by the message type the process accepts:
type Msg = Hello String | Goodbye
fun greeter : Unit -> Unit needs {Actor Msg}
greeter () = receive {
Hello name -> {
dbg $"Hello, {name}!"
greeter ()
}
Goodbye -> dbg "Bye!"
}
main () = {
let pid = spawn! (fun () -> greeter ())
send! pid (Hello "Alice")
send! pid (Hello "Bob")
send! pid Goodbye
} with beam_actorspawn! returns a Pid Msg because the greeter function uses
Actor Msg. The type system ensures you can only send Msg values to
this pid. Sending a value of the wrong type is a compile error.
Receiving Messages
receive is a keyword expression (not an effect operation) that suspends the
current process until a matching message arrives. It uses pattern matching
syntax, just like case:
fun counter : Int -> Unit needs {Actor CounterMsg, Process}
counter count = receive {
Increment n -> counter (count + n)
GetCount caller -> {
send! caller count
counter count
}
Stop -> ()
}The process blocks at receive until a message in its mailbox matches one of
the patterns. Unmatched messages stay in the mailbox for future receive calls.
This is selective receive, the same mechanism as Erlang's receive.
The receive block is fully typed. The compiler builds a union of the process's
message type (from Actor msg) and any system messages (like Down from
monitors). If your actor uses Actor CounterMsg and you have a monitor active,
the receive block accepts CounterMsg variants and Down/Exit system
messages, and nothing else. Unlike case, exhaustiveness checking is disabled
for receive: you can match on a subset of the message type, and unmatched
messages simply stay in the mailbox for a future receive call.
Timeouts
Add an after clause to avoid waiting forever:
receive {
Response data -> handle data
after 5000 -> dbg "timed out"
}The timeout is in milliseconds. If no matching message arrives within the
timeout, the after branch runs instead.
Getting Your Own Pid
self! returns the current process's pid:
fun request_count : Pid CounterMsg -> Int needs {Process, Actor Int}
request_count counter_pid = {
send! counter_pid (GetCount (self! ()))
receive {
n -> n
}
}The type of self! () is Pid msg, where msg matches the current process's
Actor msg effect. This means a process that uses Actor Int gets back a
Pid Int from self!.
A Complete Example
Here is a counter actor that handles increment, query, and stop messages:
import Std.Actor (Process, Actor, beam_actor)
type CounterMsg =
| Increment Int
| GetCount (Pid Int)
| Stop
fun counter : Int -> Unit needs {Process, Actor CounterMsg}
counter count = receive {
Increment n -> counter (count + n)
GetCount caller -> {
send! caller count
counter count
}
Stop -> ()
}
fun run_counter : Unit -> Unit needs {Process, Actor Int}
run_counter () = {
let pid = spawn! (fun () -> counter 0)
send! pid (Increment 5)
send! pid (Increment 3)
send! pid (GetCount (self! ()))
let result = receive {
n -> n
}
dbg $"count: {result}"
}
main () = {
run_counter ()
} with beam_actorMonitoring
Monitoring lets one process watch another and receive a notification when it
exits. The Monitor effect provides monitor! and demonitor!:
import Std.Actor (Process, Actor, Monitor, beam_actor)
fun watcher : Unit -> Unit needs {Process, Actor WorkerMsg, Monitor}
watcher () = {
let pid = spawn! (fun () -> worker ())
let _ref = monitor! pid
send! pid (Work 42)
send! pid Die
receive {
Down _pid reason -> dbg $"Worker exited: {reason}"
after 5000 -> dbg "Timed out waiting"
}
}
main () = {
watcher ()
} with beam_actorWhen a monitored process exits, a Down pid reason system message is delivered
to the monitoring process's mailbox. These system messages can be matched in
receive blocks alongside regular messages.
Linking
Link provides bidirectional crash propagation. If two processes are linked
and one crashes, the other crashes too:
import Std.Actor (Link)
fun start_worker : Unit -> Unit needs {Process, Actor msg, Link}
start_worker () = {
let pid = spawn! (fun () -> worker ())
link! pid
}Links are useful when two processes are co-dependent: if one can't function without the other, linking ensures they fail together rather than leaving one in a broken state.
Async / Await
For simple "run these things concurrently and collect the results" patterns,
Std.Async provides a higher-level API on top of actors:
import Std.Actor (beam_actor)
import Std.Async (Async, async_handler)
fun run : Unit -> List Int needs {Async}
run () = {
let f1 = async! (fun () -> 1)
let f2 = async! (fun () -> 2)
let f3 = async! (fun () -> 3)
Async.all [f1, f2, f3]
}
main () = {
let results = run () with {async_handler, beam_actor}
dbg results
}async! spawns a function in a new process and returns a Future a.
Async.all awaits all futures and collects the results into a list.
Timers
The Timer effect provides delays and scheduled messages:
import Std.Actor (Timer)
# Pause the current process
sleep! 1000
# Send a message to a pid after a delay
let ref = send_after! pid 5000 Timeout
# Cancel a pending timer
cancel_timer! refWiring It Up
All BEAM concurrency effects are handled by a single handler:
handler beam_actor for Process, Actor msg, Monitor, Link, TimerAttach it at your program's entry point:
main () = {
run_app ()
} with beam_actor