Async Tasks
Saga's standard library provides an async/await API, but it required no
special language support to build. The entire Async module is implemented
using effects and the actor system:
module Std.Async
import Std.Actor (Process, Actor)
opaque type Future a = Future(Pid a)
pub effect Async {
fun async : (f: Unit -> a) -> Future a
fun await : (future: Future a) -> a
}
pub handler async_handler for Async needs {Process, Actor a} {
async f = {
let caller = self! ()
let pid = spawn! (fun () -> {
let result = f ()
send! caller result
})
resume Future pid
}
await _future = {
receive {
value -> resume value
}
}
}
pub fun all : (futures: List (Future a)) -> List a needs {Async}
all futures = case futures {
[] -> []
f :: rest -> await! f :: all rest
}That's the whole thing. async! spawns a BEAM process that runs the function
and sends the result back. await! receives the result. No new language
features, just effects and actors.
Using it
async! spawns a task and returns a Future. The task runs in parallel
immediately. await! blocks until the result is ready:
import Std.Actor (beam_actor)
import Std.Async (Async, async_handler)
fun run : Unit -> Int needs {Async}
run () = {
let future = async! (fun () -> expensive_computation ())
# ... do other work while the task runs ...
await! future
}To fire off multiple tasks in parallel and collect all results, use Async.all:
fun run_all : Unit -> List Int needs {Async}
run_all () = {
let f1 = async! (fun () -> 1)
let f2 = async! (fun () -> 2)
let f3 = async! (fun () -> 3)
Async.all [f1, f2, f3]
}
main () = {
let results = run_all () with {async_handler, beam_actor}
dbg results
}Output:
[1, 2, 3]
Handling failures
Since effects compose, async tasks can use Fail the same way synchronous code
does. Handle it inside the task to get back a Result:
fun fetch_user : Int -> User needs {Http, Fail}
fetch_user id = {
let resp = get! (Request $"/api/users/{id}")
if resp.status == 404 then fail! $"user {id} not found"
else parse_user resp.body
}
fun fetch_users : List Int -> List (Result User String) needs {Async, Http}
fetch_users ids = {
let futures = List.map (fun id ->
async! (fun () -> fetch_user id with to_result)
) ids
List.map (fun f -> await! f) futures
}Each task runs in parallel and independently succeeds or fails.
You can also let failures propagate, like an unhandled promise rejection.
Here fetch_both doesn't handle Fail itself, so it stays in its needs:
fun fetch_both : Unit -> (User, User) needs {Async, Http, Fail}
fetch_both () = {
let f1 = async! (fun () -> fetch_user 1)
let f2 = async! (fun () -> fetch_user 2)
(await! f1, await! f2)
}The caller decides how to handle the failure. Wrapping the whole thing in
to_result means if either fetch fails, the entire result is Err:
let result = fetch_both () with {async_handler, beam_actor, http, to_result}
# Ok (user1, user2) or Err "user 2 not found"The difference is just where you place to_result: inside the task for
per-task error handling, outside for fail-fast.