Skip to content

add the lwt_direct package, for direct-style control flow #1060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@
(react (>= 1.0.0))
(bisect_ppx :with-test)))

(package
(name lwt_direct)
(synopsis "Direct style control flow and `await` for Lwt")
(depends
(ocaml (>= 5.0))
base-unix
(lwt (>= 3.0.0))
(bisect_ppx :with-test)))

(package
(name lwt)
(synopsis "Promises and event-driven I/O")
Expand Down
34 changes: 34 additions & 0 deletions lwt_direct.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Direct style control flow and `await` for Lwt"
maintainer: [
"Raphaël Proust <[email protected]>" "Anton Bachin <[email protected]>"
]
authors: ["Jérôme Vouillon" "Jérémie Dimino"]
license: "MIT"
homepage: "https://github.com/ocsigen/lwt"
doc: "https://ocsigen.org/lwt"
bug-reports: "https://github.com/ocsigen/lwt/issues"
depends: [
"dune" {>= "2.7"}
"ocaml" {>= "5.0"}
"base-unix"
"lwt" {>= "3.0.0"}
"bisect_ppx" {with-test}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/ocsigen/lwt.git"
9 changes: 9 additions & 0 deletions src/direct/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
(library
(public_name lwt_direct)
(synopsis "Direct style control flow and `await` for Lwt")
(enabled_if (>= %{ocaml_version} "5.0"))
(libraries lwt lwt.unix)
(instrumentation
(backend bisect_ppx)))


95 changes: 95 additions & 0 deletions src/direct/lwt_direct.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
module ED = Effect.Deep

type _ Effect.t +=
| Await : 'a Lwt.t -> 'a Effect.t
| Yield : unit Effect.t

(** Queue of microtasks that are ready *)
let tasks : (unit -> unit) Queue.t = Queue.create ()

let[@inline] push_task f : unit = Queue.push f tasks

let default_on_uncaught_exn exn bt =
Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)

let run_all_tasks () : unit =
let n_processed = ref 0 in
let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in
while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do
let t = Queue.pop tasks in
incr n_processed;
try t ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
default_on_uncaught_exn exn bt
done;
(* make sure we don't sleep forever if there's no lwt promise
ready but [tasks] contains ready tasks *)
if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t)

let setup_hooks =
let already_done = ref false in
fun () ->
if not !already_done then (
already_done := true;
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
()
)

let await (fut : 'a Lwt.t) : 'a =
match Lwt.state fut with
| Lwt.Return x -> x
| Lwt.Fail exn -> raise exn
| Lwt.Sleep -> Effect.perform (Await fut)

let yield () : unit = Effect.perform Yield

(** the main effect handler *)
let handler : _ ED.effect_handler =
let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option =
function
| Yield ->
Some (fun k -> push_task (fun () -> ED.continue k ()))
| Await fut ->
Some
(fun k ->
Lwt.on_any fut
(fun res -> push_task (fun () -> ED.continue k res))
(fun exn -> push_task (fun () -> ED.discontinue k exn)))
| _ -> None
in
{ effc }

let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit =
let res = ref (Error (Failure "not resolved")) in
let run_f_and_set_res () =
(try
let r = f () in
res := Ok r
with exn -> res := Error exn);
Lwt.wakeup_result promise !res
in
ED.try_with run_f_and_set_res () handler

let run f : _ Lwt.t =
setup_hooks ();
let lwt, resolve = Lwt.wait () in
push_task (run_inside_effect_handler_and_resolve_ resolve f);
lwt

let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit =
let run_f () : unit =
try
f ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
on_uncaught_exn exn bt
in
ED.try_with run_f () handler

let run_in_the_background ?(on_uncaught_exn=default_on_uncaught_exn) f : unit =
setup_hooks ();
push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f)
33 changes: 33 additions & 0 deletions src/direct/lwt_direct.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
(** Direct style control flow for Lwt. *)

val run : (unit -> 'a) -> 'a Lwt.t
(** [run f] runs the function [f ()] in a task within
the [Lwt_unix] event loop. [f ()] can create [Lwt]
promises and use {!await} to wait for them. Like any promise
in Lwt, [f ()] can starve the event loop if it runs long computations
without yielding to the event loop.

When [f ()] terminates (successfully or not), the promise
[run f] is resolved with [f ()]'s result, or the exception
raised by [f ()]. *)

val run_in_the_background :
?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) ->
(unit -> unit) ->
unit
(** [run_in_the_background f] is similar to [ignore (run f)].
The computation [f()] runs in the background in the event loop
and returns no result.
@param on_uncaught_exn if provided, this is called when [f()]
raises an exception. *)

val yield : unit -> unit
(** Yield to the event loop.
Can only be used inside {!run} or {!run_in_the_background}. *)

val await : 'a Lwt.t -> 'a
(** [await prom] returns the result of [prom], or re-raises the
exception with which [prom] failed if it failed.
If [prom] is not resolved yet, [await prom] will suspend the
current task and resume it when [prom] is resolved.
Can only be used inside {!run} or {!run_in_the_background}. *)
Loading