diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 284919d57..584bccbb8 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -54,7 +54,7 @@ jobs: - run: opam install conf-libev if: ${{ matrix.libev == true }} - - run: opam install . --deps-only --with-test + - run: opam install --deps-only --with-test ./lwt.opam ./lwt_ppx.opam ./lwt_react.opam ./lwt_retry.opam - run: opam exec -- dune build diff --git a/CHANGES b/CHANGES index 76a6cdb80..2b8e0e40c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,10 @@ +===== dev ===== + +====== Additions ====== + + * Lwt_direct using Lwt in direct-style. (Simon Cruanes, #1060) + + ===== 5.9.0 ===== ====== Additions ====== diff --git a/dune-project b/dune-project index 5f0af6dfa..3befa8530 100644 --- a/dune-project +++ b/dune-project @@ -44,6 +44,16 @@ (react (>= 1.0.0)) (bisect_ppx :with-test))) +(package + (name lwt_direct) + (synopsis "Direct-style control-flow and `await` for Lwt") + (authors "Simon Cruanes") + (depends + (ocaml (>= 5.0)) + base-unix + (lwt (>= 6)) + (bisect_ppx :with-test))) + (package (name lwt) (synopsis "Promises and event-driven I/O") diff --git a/lwt_direct.opam b/lwt_direct.opam new file mode 100644 index 000000000..7db18c542 --- /dev/null +++ b/lwt_direct.opam @@ -0,0 +1,34 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Direct-style control-flow and `await` for Lwt" +maintainer: [ + "Raphaƫl Proust " "Anton Bachin " +] +authors: ["Simon Cruanes"] +license: "MIT" +homepage: "https://github.com/ocsigen/lwt" +doc: "https://ocsigen.org/lwt" +bug-reports: "https://github.com/ocsigen/lwt/issues" +depends: [ + "dune" {>= "2.7"} + "ocaml" {>= "5.0"} + "base-unix" + "lwt" {>= "6"} + "bisect_ppx" {with-test} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/ocsigen/lwt.git" diff --git a/src/core/lwt.ml b/src/core/lwt.ml index 257134c63..660819adc 100644 --- a/src/core/lwt.ml +++ b/src/core/lwt.ml @@ -730,6 +730,9 @@ sig val new_key : unit -> _ key val get : 'v key -> 'v option val with_value : 'v key -> 'v option -> (unit -> 'b) -> 'b + val get_from_storage : 'v key -> storage -> 'v option + val modify_storage : 'v key -> 'v option -> storage -> storage + val empty_storage : storage (* Internal interface *) val current_storage : storage ref @@ -773,28 +776,33 @@ struct next_key_id := id + 1; {id = id; value = None} - let current_storage = ref Storage_map.empty + (* generic storage *) + let empty_storage = Storage_map.empty - let get key = - if Storage_map.mem key.id !current_storage then begin - let refresh = Storage_map.find key.id !current_storage in + let get_from_storage key storage = + match Storage_map.find_opt key.id storage with + | Some refresh -> refresh (); let value = key.value in key.value <- None; value - end - else - None + | None -> None + + let modify_storage key value storage = + match value with + | Some _ -> + let refresh = fun () -> key.value <- value in + Storage_map.add key.id refresh storage + | None -> + Storage_map.remove key.id storage + + (* built-in storage: propagated by bind and such *) + let current_storage = ref empty_storage + + let get key = get_from_storage key !current_storage let with_value key value f = - let new_storage = - match value with - | Some _ -> - let refresh = fun () -> key.value <- value in - Storage_map.add key.id refresh !current_storage - | None -> - Storage_map.remove key.id !current_storage - in + let new_storage = modify_storage key value !current_storage in let saved_storage = !current_storage in current_storage := new_storage; @@ -3228,3 +3236,8 @@ struct let (let+) x f = map f x let (and+) = both end + +module Private = struct + type nonrec storage = storage + module Sequence_associated_storage = Sequence_associated_storage +end diff --git a/src/core/lwt.mli b/src/core/lwt.mli index 7598343d8..7ec5efb24 100644 --- a/src/core/lwt.mli +++ b/src/core/lwt.mli @@ -2061,3 +2061,14 @@ val backtrace_try_bind : val abandon_wakeups : unit -> unit val debug_state_is : 'a state -> 'a t -> bool t + +module Private : sig + type storage + + module Sequence_associated_storage : sig + val get_from_storage : 'a key -> storage -> 'a option + val modify_storage : 'a key -> 'a option -> storage -> storage + val empty_storage : storage + val current_storage : storage ref + end +end [@@alert trespassing "for internal use only, keep away"] diff --git a/src/direct/dune b/src/direct/dune new file mode 100644 index 000000000..9ea910ebc --- /dev/null +++ b/src/direct/dune @@ -0,0 +1,7 @@ +(library + (public_name lwt_direct) + (synopsis "Direct-style control-flow and `await` for Lwt") + (enabled_if (>= %{ocaml_version} "5.0")) + (libraries lwt lwt.unix) + (instrumentation + (backend bisect_ppx))) diff --git a/src/direct/lwt_direct.ml b/src/direct/lwt_direct.ml new file mode 100644 index 000000000..346b07fda --- /dev/null +++ b/src/direct/lwt_direct.ml @@ -0,0 +1,134 @@ +(* Direct-style wrapper for Lwt code + + The implementation of the direct-style wrapper relies on ocaml5's effect + system capturing continuations and adding them as a callback to some lwt + promises. *) + +(* part 1: tasks, getting the scheduler to call them *) + +let tasks : (unit -> unit) Queue.t = Queue.create () + +let[@inline] push_task f : unit = Queue.push f tasks + +let absolute_max_number_of_steps = + (* TODO 6.0: what's a good number here? should it be customisable? *) + 10_000 + +let run_all_tasks () : unit = + let n_processed = ref 0 in + let max_number_of_steps = min absolute_max_number_of_steps (2 * Queue.length tasks) in + while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do + let t = Queue.pop tasks in + incr n_processed; + try t () + with exn -> + (* TODO 6.0: change async_exception handler to accept a backtrace, pass it + here and at the other use site. *) + (* TODO 6.0: this and other try-with: respect exception-filter *) + !Lwt.async_exception_hook exn + done; + (* In the case where there are no promises ready for wakeup, the scheduler's + engine will pause until some IO completes. There might never be completed + IO, depending on the program structure and the state of the world. If this + happens and the queue is not empty, we add a [pause] so that the engine has + something to wakeup for so that the rest of the queue can be processed. *) + if not (Queue.is_empty tasks) && Lwt.paused_count () = 0 then ignore (Lwt.pause () : unit Lwt.t) + +let setup_hooks = + let already_done = ref false in + fun () -> + if not !already_done then ( + already_done := true; + (* TODO 6.0: assess whether we should have both hooks or just one (which + one). Tempted to say we should only have the enter hook. *) + let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in + let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in + () + ) + +(* part 2: effects, performing them *) + +type _ Effect.t += + | Await : 'a Lwt.t -> 'a Effect.t + | Yield : unit Effect.t + +let await (fut : 'a Lwt.t) : 'a = + match Lwt.state fut with + | Lwt.Return x -> x + | Lwt.Fail exn -> raise exn + | Lwt.Sleep -> Effect.perform (Await fut) + +let yield () : unit = Effect.perform Yield + +(* interlude: task-local storage helpers *) + +module Storage = struct + [@@@alert "-trespassing"] + module Lwt_storage= Lwt.Private.Sequence_associated_storage + [@@@alert "+trespassing"] + type 'a key = 'a Lwt.key + let new_key = Lwt.new_key + let get = Lwt.get + let set k v = Lwt_storage.(current_storage := modify_storage k (Some v) !current_storage) + let remove k = Lwt_storage.(current_storage := modify_storage k None !current_storage) + let reset_to_empty () = Lwt_storage.(current_storage := empty_storage) + let save_current () = !Lwt_storage.current_storage + let restore_current saved = Lwt_storage.current_storage := saved +end + +(* part 3: handling effects *) + +let handler : _ Effect.Deep.effect_handler = + let effc : type b. b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option = + function + | Yield -> + Some (fun k -> + let storage = Storage.save_current () in + push_task (fun () -> + Storage.restore_current storage; + Effect.Deep.continue k ())) + | Await fut -> + Some + (fun k -> + let storage = Storage.save_current () in + Lwt.on_any fut + (fun res -> push_task (fun () -> + Storage.restore_current storage; Effect.Deep.continue k res)) + (fun exn -> push_task (fun () -> + Storage.restore_current storage; Effect.Deep.discontinue k exn))) + | _ -> None + in + { effc } + +(* part 4: putting it all together: running tasks *) + +let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit = + let run_f_and_set_res () = + Storage.reset_to_empty(); + match f () with + | res -> Lwt.wakeup promise res + | exception exc -> Lwt.wakeup_exn promise exc + in + Effect.Deep.try_with run_f_and_set_res () handler + +let spawn f : _ Lwt.t = + setup_hooks (); + let lwt, resolve = Lwt.wait () in + push_task (run_inside_effect_handler_and_resolve_ resolve f); + lwt + +(* part 4 (encore): running a task in the background *) + +let run_inside_effect_handler_in_the_background_ f () : unit = + let run_f () : unit = + Storage.reset_to_empty(); + try + f () + with exn -> + !Lwt.async_exception_hook exn + in + Effect.Deep.try_with run_f () handler + +let spawn_in_the_background f : unit = + setup_hooks (); + push_task (run_inside_effect_handler_in_the_background_ f) diff --git a/src/direct/lwt_direct.mli b/src/direct/lwt_direct.mli new file mode 100644 index 000000000..0cc4284e3 --- /dev/null +++ b/src/direct/lwt_direct.mli @@ -0,0 +1,92 @@ +(** Direct style control flow for Lwt. + + This module relies on OCaml 5's + {{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. + Instead of chaining promises using {!Lwt.bind} and {!Lwt.map} + and other combinators, it becomes possible to start + lightweight "tasks" using [Lwt_direct.spawn (fun () -> ...)]. + The body of such a task is written in direct-style code, + using OCaml's standard control flow structures such as loops, + higher-order functions, exception handlers, [match], etc. + + Interactions with the rest of lwt can be done using [await], + for example: + + {[ + Lwt_direct.spawn (fun () -> + let continue = ref true in + while !continue do + match Lwt_io.read_line in_channel |> Lwt_direct.await with + | exception End_of_file -> continue := false + | line -> + let uppercase_line = String.uppercase_ascii line in + Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await + done) + ]} + + This code snippet contains a simple "task" that repeatedly reads + a line from a [Lwt_io] channel, uppercases it, and writes the + uppercase version to another channel. + + This task is itself a [unit Lwt.t], which is resolved when the function + returns. It is possible to use + {!Lwt_direct.run_in_the_background} to ignore the result and + let the task run in the background instead. + + *) + +val spawn : (unit -> 'a) -> 'a Lwt.t +(** [spawn f] runs the function [f ()] in a task within + the [Lwt_unix] event loop. [f ()] can create [Lwt] + promises and use {!await} to wait for them. Like any promise + in Lwt, [f ()] can starve the event loop if it runs long computations + without yielding to the event loop. + + When [f ()] terminates (successfully or not), the promise + [spawn f] is resolved with [f ()]'s result, or the exception + raised by [f ()]. *) + +val spawn_in_the_background : + (unit -> unit) -> + unit +(** [spawn_in_the_background f] is similar to [ignore (spawn f)]. + The computation [f()] runs in the background in the event loop + and returns no result. + If [f()] raises an exception, {!Lwt.async_exception_hook} is called. *) + +val yield : unit -> unit +(** Yield to the event loop. + + Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise an exception, + crash your program, or otherwise cause errors. It is a programming error to do so. *) + +val await : 'a Lwt.t -> 'a +(** [await prom] returns the result of [prom], or re-raises the + exception with which [prom] failed if it failed. + If [prom] is not resolved yet, [await prom] will suspend the + current task and resume it when [prom] is resolved. + + Calling [await] outside of {!spawn} or {!run_in_the_background} will raise an exception, + crash your program, or otherwise cause errors. It is a programming error to do so. *) + +(** Local storage. + + This storage is the same as the one described with {!Lwt.key}, + except that it is usable from the inside of {!spawn} or + {!run_in_the_background}. + + Each task has its own storage, independent from other tasks or promises. *) +module Storage : sig + type 'a key = 'a Lwt.key + val new_key : unit -> 'a key + (** Alias to {!Lwt.new_key} *) + + val get : 'a key -> 'a option + (** get the value associated with this key in local storage, or [None] *) + + val set : 'a key -> 'a -> unit + (** [set k v] sets the key to the value for the rest of the task. *) + + val remove : 'a key -> unit + (** Remove the value associated with this key, if any *) +end diff --git a/test/direct/dune b/test/direct/dune new file mode 100644 index 000000000..fa6ef7d37 --- /dev/null +++ b/test/direct/dune @@ -0,0 +1,12 @@ + +(executable + (name main) + (enabled_if (>= %{ocaml_version} "5.0")) + (libraries lwt_direct lwt.unix lwttester)) + +(rule + (alias runtest) + (package lwt_direct) + (enabled_if (>= %{ocaml_version} "5.0")) + (action (run ./main.exe))) + diff --git a/test/direct/main.ml b/test/direct/main.ml new file mode 100644 index 000000000..5b9b13dba --- /dev/null +++ b/test/direct/main.ml @@ -0,0 +1,3 @@ + +Test.run "lwt_direct" + Test_lwt_direct.suites diff --git a/test/direct/test_lwt_direct.ml b/test/direct/test_lwt_direct.ml new file mode 100644 index 000000000..74cab01ef --- /dev/null +++ b/test/direct/test_lwt_direct.ml @@ -0,0 +1,223 @@ +open Test +open Lwt_direct +open Lwt.Syntax + +let main_tests = suite "main" [ + test "basic await" begin fun () -> + let fut = spawn @@ fun () -> + Lwt_unix.sleep 1e-6 |> await; + 42 + in + let+ res = fut in + res = 42 + end; + + test "await multiple values" begin fun () -> + let fut1 = let+ () = Lwt_unix.sleep 1e-6 in 1 in + let fut2 = let+ () = Lwt_unix.sleep 2e-6 in 2 in + let fut3 = let+ () = Lwt_unix.sleep 3e-6 in 3 in + + spawn @@ fun () -> + let x1 = fut1 |> await in + let x2 = fut2 |> await in + let x3 = fut3 |> await in + x1 = 1 && x2 = 2 && x3 = 3 + end; + + test "list.iter await" begin fun () -> + let items = List.init 101 (fun i -> Lwt.return i) in + spawn @@ fun () -> + let sum = ref 0 in + List.iter (fun fut -> sum := !sum + await fut) items; + !sum = 5050 + end; + + test "lwt_list.iter_p spawn" begin fun () -> + let items = List.init 101 (fun i -> i) in + let+ items = Lwt_list.map_p + (fun i -> spawn (fun () -> + for _ = 0 to i mod 5 do yield () done; + i + )) + items + in + List.fold_left (+) 0 items = 5050 + end; + + test "spawn in background" begin fun () -> + let stream, push = Lwt_stream.create_bounded 2 in + spawn_in_the_background (fun () -> + for i = 1 to 10 do + push#push i |> await + done; + push#close); + spawn @@ fun () -> + let continue = ref true in + let seen = ref [] in + + while !continue do + match Lwt_stream.get stream |> await with + | None -> continue := false + | Some x -> seen := x :: !seen + done; + List.rev !seen = [1;2;3;4;5;6;7;8;9;10] + end; + + test "list.iter await with yield" begin fun () -> + let items = List.init 101 (fun i -> Lwt.return i) in + spawn @@ fun () -> + let sum = ref 0 in + List.iter (fun fut -> yield(); sum := !sum + await fut) items; + !sum = 5050 + end; + + test "awaiting on failing promise" begin fun () -> + let fut: unit Lwt.t = let* () = Lwt.pause () in let* () = Lwt_unix.sleep 0.0001 in Lwt.fail Exit in + spawn @@ fun () -> + try await fut; false + with Exit -> true + end; + + test "spawn can fail" begin fun () -> + spawn @@ fun () -> + let sub: unit Lwt.t = spawn @@ fun () -> + Lwt_unix.sleep 0.00001 |> await; + raise Exit + in + try await sub; false + with Exit -> true + end; + + test "concurrent fib" begin fun () -> + let rec badfib n = + if n <= 2 then Lwt.return 1 + else + spawn begin fun () -> + let f1 = badfib (n-1) in + let f2 = badfib (n-2) in + await f1 + await f2 + end + in + spawn @@ fun () -> + let fib12 = badfib 12 in + let fib12 = await fib12 in + fib12 = 144 + end +] + +let storage_tests = suite "storage" [ + test "get set" begin fun () -> + let k1 = Storage.new_key () in + let k2 = Storage.new_key () in + spawn @@ fun () -> + assert (Storage.get k1 = None); + assert (Storage.get k2 = None); + Storage.set k1 42; + assert (Storage.get k1 = Some 42); + assert (Storage.get k2 = None); + Storage.set k2 true; + assert (Storage.get k1 = Some 42); + assert (Storage.get k2 = Some true); + Storage.remove k1; + assert (Storage.get k1 = None); + assert (Storage.get k2 = Some true); + true + end; + + test "storage across await" begin fun () -> + let k = Storage.new_key () in + + (* spawn another promise that touches storage *) + let run_promise_async () = + Lwt.async @@ fun () -> + Lwt.with_value k (Some "something else") @@ fun () -> + assert (Lwt.get k = Some "something else"); + Lwt.return_unit + in + + let run_promise () : unit Lwt.t = + Lwt.with_value k (Some "another one") @@ fun () -> + assert (Lwt.get k = Some "another one"); + Lwt.return_unit + in + + let one_task () = + run_promise_async(); + assert (Storage.get k = None); + Storage.set k "v1"; + assert (Storage.get k = Some "v1"); + run_promise () |> await; + assert (Storage.get k = Some "v1"); + Storage.remove k; + assert (Storage.get k = None); + yield(); + assert (Storage.get k = None); + run_promise () |> await; + assert (Storage.get k = None); + run_promise_async(); + yield(); + assert (Storage.get k = None); + Storage.set k "v2"; + assert (Storage.get k = Some "v2"); + run_promise_async(); + yield(); + run_promise () |> await; + assert (Storage.get k = Some "v2"); + in + + (* spawn multiple such tasks *) + let tasks = [ spawn one_task; spawn one_task; spawn one_task ] in + + spawn @@ fun () -> + List.iter await tasks; + true + end; +] + +let io_tests = suite "io" [ + test "read io" begin fun () -> + let str = "some\ninteresting\ntext string here!\n" in + let ic = Lwt_io.of_bytes ~mode:Input (Lwt_bytes.of_string str) in + spawn @@ fun () -> + let lines = ref [] in + while + try + yield (); + let line = Lwt_io.read_line ic |> await in + lines := line :: !lines; + true + with End_of_file -> false + do () + done; + List.rev !lines = ["some"; "interesting"; "text string here!"] + end; + + test "pipe" begin fun () -> + let ic, oc = Lwt_io.pipe() in + spawn_in_the_background (fun () -> + for i = 1 to 100 do + Lwt_io.write_line oc (string_of_int i) |> await; + Lwt_io.flush oc |> await + done; + Lwt_io.close oc |> await; + ); + + spawn @@ fun () -> + let sum = ref 0 in + let continue = ref true in + while !continue do + match Lwt_io.read_line ic |> await |> String.trim |> int_of_string with + | exception End_of_file -> continue := false + | i -> + sum := !sum + i + done; + Lwt_io.close ic |> await; + !sum = 5050 + end +] + +let suites = [ + main_tests; + storage_tests; + io_tests; +]