SagaSaga
Guide

Concurrency & Actors

Saga runs on the BEAM, which means concurrency is built on isolated processes that communicate through message passing. There is no shared memory between processes and no data races by construction. Saga wraps the BEAM's actor model in effects, so spawning processes and sending messages use the same ! syntax as any other effect operation.

The Actor Model

A BEAM process is a lightweight, isolated unit of execution with its own mailbox. Processes communicate by sending immutable messages to each other. This is fundamentally different from thread-based concurrency: there are no locks, no mutexes, and no shared state.

Saga exposes this through two effects:

  • Process: spawning processes, sending messages, and exiting processes
  • Actor msg: receiving messages in the current process's typed mailbox

Both are handled by the beam_actor handler, which maps directly to BEAM primitives.

import Std.Actor (Process, Actor, beam_actor)

Spawning and Sending

spawn! takes a function and runs it in a new process. It returns a Pid msg, a process identifier typed by the message type the process accepts:

type Msg = Hello String | Goodbye

fun greeter : Unit -> Unit needs {Actor Msg}
greeter () = receive {
  Hello name -> {
    dbg $"Hello, {name}!"
    greeter ()
  }
  Goodbye -> dbg "Bye!"
}

main () = {
  let pid = spawn! (fun () -> greeter ())
  send! pid (Hello "Alice")
  send! pid (Hello "Bob")
  send! pid Goodbye
} with beam_actor

spawn! returns a Pid Msg because the greeter function uses Actor Msg. The type system ensures you can only send Msg values to this pid. Sending a value of the wrong type is a compile error.

Receiving Messages

receive is a keyword expression (not an effect operation) that suspends the current process until a matching message arrives. It uses pattern matching syntax, just like case:

fun counter : Int -> Unit needs {Actor CounterMsg, Process}
counter count = receive {
  Increment n -> counter (count + n)
  GetCount caller -> {
    send! caller count
    counter count
  }
  Stop -> ()
}

The process blocks at receive until a message in its mailbox matches one of the patterns. Unmatched messages stay in the mailbox for future receive calls. This is selective receive, the same mechanism as Erlang's receive.

The receive block is fully typed. The compiler builds a union of the process's message type (from Actor msg) and any system messages (like Down from monitors). If your actor uses Actor CounterMsg and you have a monitor active, the receive block accepts CounterMsg variants and Down/Exit system messages, and nothing else. Unlike case, exhaustiveness checking is disabled for receive: you can match on a subset of the message type, and unmatched messages simply stay in the mailbox for a future receive call.

Timeouts

Add an after clause to avoid waiting forever:

receive {
  Response data -> handle data
  after 5000 -> dbg "timed out"
}

The timeout is in milliseconds. If no matching message arrives within the timeout, the after branch runs instead.

Getting Your Own Pid

self! returns the current process's pid:

fun request_count : Pid CounterMsg -> Int needs {Process, Actor Int}
request_count counter_pid = {
  send! counter_pid (GetCount (self! ()))
  receive {
    n -> n
  }
}

The type of self! () is Pid msg, where msg matches the current process's Actor msg effect. This means a process that uses Actor Int gets back a Pid Int from self!.

A Complete Example

Here is a counter actor that handles increment, query, and stop messages:

import Std.Actor (Process, Actor, beam_actor)

type CounterMsg =
  | Increment Int
  | GetCount (Pid Int)
  | Stop

fun counter : Int -> Unit needs {Process, Actor CounterMsg}
counter count = receive {
  Increment n -> counter (count + n)
  GetCount caller -> {
    send! caller count
    counter count
  }
  Stop -> ()
}

fun run_counter : Unit -> Unit needs {Process, Actor Int}
run_counter () = {
  let pid = spawn! (fun () -> counter 0)
  send! pid (Increment 5)
  send! pid (Increment 3)
  send! pid (GetCount (self! ()))
  let result = receive {
    n -> n
  }
  dbg $"count: {result}"
}

main () = {
  run_counter ()
} with beam_actor

Monitoring

Monitoring lets one process watch another and receive a notification when it exits. The Monitor effect provides monitor! and demonitor!:

import Std.Actor (Process, Actor, Monitor, beam_actor)

fun watcher : Unit -> Unit needs {Process, Actor WorkerMsg, Monitor}
watcher () = {
  let pid = spawn! (fun () -> worker ())
  let _ref = monitor! pid

  send! pid (Work 42)
  send! pid Die

  receive {
    Down _pid reason -> dbg $"Worker exited: {reason}"
    after 5000 -> dbg "Timed out waiting"
  }
}

main () = {
  watcher ()
} with beam_actor

When a monitored process exits, a Down pid reason system message is delivered to the monitoring process's mailbox. These system messages can be matched in receive blocks alongside regular messages.

Linking

Link provides bidirectional crash propagation. If two processes are linked and one crashes, the other crashes too:

import Std.Actor (Link)

fun start_worker : Unit -> Unit needs {Process, Actor msg, Link}
start_worker () = {
  let pid = spawn! (fun () -> worker ())
  link! pid
}

Links are useful when two processes are co-dependent: if one can't function without the other, linking ensures they fail together rather than leaving one in a broken state.

Async / Await

For simple "run these things concurrently and collect the results" patterns, Std.Async provides a higher-level API on top of actors:

import Std.Actor (beam_actor)
import Std.Async (Async, async_handler)

fun run : Unit -> List Int needs {Async}
run () = {
  let f1 = async! (fun () -> 1)
  let f2 = async! (fun () -> 2)
  let f3 = async! (fun () -> 3)
  Async.all [f1, f2, f3]
}

main () = {
  let results = run () with {async_handler, beam_actor}
  dbg results
}

async! spawns a function in a new process and returns a Future a. Async.all awaits all futures and collects the results into a list.

Timers

The Timer effect provides delays and scheduled messages:

import Std.Actor (Timer)

# Pause the current process
sleep! 1000

# Send a message to a pid after a delay
let ref = send_after! pid 5000 Timeout

# Cancel a pending timer
cancel_timer! ref

Wiring It Up

All BEAM concurrency effects are handled by a single handler:

handler beam_actor for Process, Actor msg, Monitor, Link, Timer

Attach it at your program's entry point:

main () = {
  run_app ()
} with beam_actor