SagaSaga

Async Tasks

Saga's standard library provides an async/await API, but it required no special language support to build. The entire Async module is implemented using effects and the actor system:

module Std.Async

import Std.Actor (Process, Actor)

opaque type Future a = Future(Pid a)

pub effect Async {
  fun async : (f: Unit -> a) -> Future a
  fun await : (future: Future a) -> a
}

pub handler async_handler for Async needs {Process, Actor a} {
  async f = {
    let caller = self! ()
    let pid = spawn! (fun () -> {
      let result = f ()
      send! caller result
    })
    resume Future pid
  }
  await _future = {
    receive {
      value -> resume value
    }
  }
}

pub fun all : (futures: List (Future a)) -> List a needs {Async}
all futures = case futures {
  [] -> []
  f :: rest -> await! f :: all rest
}

That's the whole thing. async! spawns a BEAM process that runs the function and sends the result back. await! receives the result. No new language features, just effects and actors.

Using it

async! spawns a task and returns a Future. The task runs in parallel immediately. await! blocks until the result is ready:

import Std.Actor (beam_actor)
import Std.Async (Async, async_handler)

fun run : Unit -> Int needs {Async}
run () = {
  let future = async! (fun () -> expensive_computation ())
  # ... do other work while the task runs ...
  await! future
}

To fire off multiple tasks in parallel and collect all results, use Async.all:

fun run_all : Unit -> List Int needs {Async}
run_all () = {
  let f1 = async! (fun () -> 1)
  let f2 = async! (fun () -> 2)
  let f3 = async! (fun () -> 3)
  Async.all [f1, f2, f3]
}

main () = {
  let results = run_all () with {async_handler, beam_actor}
  dbg results
}

Output:

[1, 2, 3]

Handling failures

Since effects compose, async tasks can use Fail the same way synchronous code does. Handle it inside the task to get back a Result:

fun fetch_user : Int -> User needs {Http, Fail}
fetch_user id = {
  let resp = get! (Request $"/api/users/{id}")
  if resp.status == 404 then fail! $"user {id} not found"
  else parse_user resp.body
}

fun fetch_users : List Int -> List (Result User String) needs {Async, Http}
fetch_users ids = {
  let futures = List.map (fun id ->
    async! (fun () -> fetch_user id with to_result)
  ) ids
  List.map (fun f -> await! f) futures
}

Each task runs in parallel and independently succeeds or fails.

You can also let failures propagate, like an unhandled promise rejection. Here fetch_both doesn't handle Fail itself, so it stays in its needs:

fun fetch_both : Unit -> (User, User) needs {Async, Http, Fail}
fetch_both () = {
  let f1 = async! (fun () -> fetch_user 1)
  let f2 = async! (fun () -> fetch_user 2)
  (await! f1, await! f2)
}

The caller decides how to handle the failure. Wrapping the whole thing in to_result means if either fetch fails, the entire result is Err:

let result = fetch_both () with {async_handler, beam_actor, http, to_result}
# Ok (user1, user2) or Err "user 2 not found"

The difference is just where you place to_result: inside the task for per-task error handling, outside for fail-fast.