Skip to content

add the lwt_direct package, for direct-style control flow #1060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
- run: opam install conf-libev
if: ${{ matrix.libev == true }}

- run: opam install . --deps-only --with-test
- run: opam install --deps-only --with-test ./lwt.opam ./lwt_ppx.opam ./lwt_react.opam ./lwt_retry.opam

- run: opam exec -- dune build

Expand Down
7 changes: 7 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
===== dev =====

====== Additions ======

* Lwt_direct using Lwt in direct-style. (Simon Cruanes, #1060)


===== 5.9.0 =====

====== Additions ======
Expand Down
10 changes: 10 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@
(react (>= 1.0.0))
(bisect_ppx :with-test)))

(package
(name lwt_direct)
(synopsis "Direct-style control-flow and `await` for Lwt")
(authors "Simon Cruanes")
(depends
(ocaml (>= 5.0))
base-unix
(lwt (>= 6))
(bisect_ppx :with-test)))

(package
(name lwt)
(synopsis "Promises and event-driven I/O")
Expand Down
34 changes: 34 additions & 0 deletions lwt_direct.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Direct-style control-flow and `await` for Lwt"
maintainer: [
"Raphaël Proust <[email protected]>" "Anton Bachin <[email protected]>"
]
authors: ["Simon Cruanes"]
license: "MIT"
homepage: "https://github.com/ocsigen/lwt"
doc: "https://ocsigen.org/lwt"
bug-reports: "https://github.com/ocsigen/lwt/issues"
depends: [
"dune" {>= "2.7"}
"ocaml" {>= "5.0"}
"base-unix"
"lwt" {>= "6"}
"bisect_ppx" {with-test}
"odoc" {with-doc}
]
build: [
["dune" "subst"] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
]
dev-repo: "git+https://github.com/ocsigen/lwt.git"
43 changes: 28 additions & 15 deletions src/core/lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,9 @@ sig
val new_key : unit -> _ key
val get : 'v key -> 'v option
val with_value : 'v key -> 'v option -> (unit -> 'b) -> 'b
val get_from_storage : 'v key -> storage -> 'v option
val modify_storage : 'v key -> 'v option -> storage -> storage
val empty_storage : storage

(* Internal interface *)
val current_storage : storage ref
Expand Down Expand Up @@ -773,28 +776,33 @@ struct
next_key_id := id + 1;
{id = id; value = None}

let current_storage = ref Storage_map.empty
(* generic storage *)
let empty_storage = Storage_map.empty

let get key =
if Storage_map.mem key.id !current_storage then begin
let refresh = Storage_map.find key.id !current_storage in
let get_from_storage key storage =
match Storage_map.find_opt key.id storage with
| Some refresh ->
refresh ();
let value = key.value in
key.value <- None;
value
end
else
None
| None -> None

let modify_storage key value storage =
match value with
| Some _ ->
let refresh = fun () -> key.value <- value in
Storage_map.add key.id refresh storage
| None ->
Storage_map.remove key.id storage

(* built-in storage: propagated by bind and such *)
let current_storage = ref empty_storage

let get key = get_from_storage key !current_storage

let with_value key value f =
let new_storage =
match value with
| Some _ ->
let refresh = fun () -> key.value <- value in
Storage_map.add key.id refresh !current_storage
| None ->
Storage_map.remove key.id !current_storage
in
let new_storage = modify_storage key value !current_storage in

let saved_storage = !current_storage in
current_storage := new_storage;
Expand Down Expand Up @@ -3228,3 +3236,8 @@ struct
let (let+) x f = map f x
let (and+) = both
end

module Private = struct
type nonrec storage = storage
module Sequence_associated_storage = Sequence_associated_storage
end
11 changes: 11 additions & 0 deletions src/core/lwt.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2061,3 +2061,14 @@ val backtrace_try_bind :
val abandon_wakeups : unit -> unit

val debug_state_is : 'a state -> 'a t -> bool t

module Private : sig
type storage

module Sequence_associated_storage : sig
val get_from_storage : 'a key -> storage -> 'a option
val modify_storage : 'a key -> 'a option -> storage -> storage
val empty_storage : storage
val current_storage : storage ref
end
end [@@alert trespassing "for internal use only, keep away"]
7 changes: 7 additions & 0 deletions src/direct/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(library
(public_name lwt_direct)
(synopsis "Direct-style control-flow and `await` for Lwt")
(enabled_if (>= %{ocaml_version} "5.0"))
(libraries lwt lwt.unix)
(instrumentation
(backend bisect_ppx)))
134 changes: 134 additions & 0 deletions src/direct/lwt_direct.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
(* Direct-style wrapper for Lwt code

The implementation of the direct-style wrapper relies on ocaml5's effect
system capturing continuations and adding them as a callback to some lwt
promises. *)

(* part 1: tasks, getting the scheduler to call them *)

let tasks : (unit -> unit) Queue.t = Queue.create ()

let[@inline] push_task f : unit = Queue.push f tasks

let absolute_max_number_of_steps =
(* TODO 6.0: what's a good number here? should it be customisable? *)
10_000

let run_all_tasks () : unit =
let n_processed = ref 0 in
let max_number_of_steps = min absolute_max_number_of_steps (2 * Queue.length tasks) in
while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do
let t = Queue.pop tasks in
incr n_processed;
try t ()
with exn ->
(* TODO 6.0: change async_exception handler to accept a backtrace, pass it
here and at the other use site. *)
(* TODO 6.0: this and other try-with: respect exception-filter *)
!Lwt.async_exception_hook exn
done;
(* In the case where there are no promises ready for wakeup, the scheduler's
engine will pause until some IO completes. There might never be completed
IO, depending on the program structure and the state of the world. If this
happens and the queue is not empty, we add a [pause] so that the engine has
something to wakeup for so that the rest of the queue can be processed. *)
if not (Queue.is_empty tasks) && Lwt.paused_count () = 0 then ignore (Lwt.pause () : unit Lwt.t)

let setup_hooks =
let already_done = ref false in
fun () ->
if not !already_done then (
already_done := true;
(* TODO 6.0: assess whether we should have both hooks or just one (which
one). Tempted to say we should only have the enter hook. *)
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
()
)

(* part 2: effects, performing them *)

type _ Effect.t +=
| Await : 'a Lwt.t -> 'a Effect.t
| Yield : unit Effect.t

let await (fut : 'a Lwt.t) : 'a =
match Lwt.state fut with
| Lwt.Return x -> x
| Lwt.Fail exn -> raise exn
| Lwt.Sleep -> Effect.perform (Await fut)

let yield () : unit = Effect.perform Yield

(* interlude: task-local storage helpers *)

module Storage = struct
[@@@alert "-trespassing"]
module Lwt_storage= Lwt.Private.Sequence_associated_storage
[@@@alert "+trespassing"]
type 'a key = 'a Lwt.key
let new_key = Lwt.new_key
let get = Lwt.get
let set k v = Lwt_storage.(current_storage := modify_storage k (Some v) !current_storage)
let remove k = Lwt_storage.(current_storage := modify_storage k None !current_storage)
let reset_to_empty () = Lwt_storage.(current_storage := empty_storage)
let save_current () = !Lwt_storage.current_storage
let restore_current saved = Lwt_storage.current_storage := saved
end

(* part 3: handling effects *)

let handler : _ Effect.Deep.effect_handler =
let effc : type b. b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option =
function
| Yield ->
Some (fun k ->
let storage = Storage.save_current () in
push_task (fun () ->
Storage.restore_current storage;
Effect.Deep.continue k ()))
| Await fut ->
Some
(fun k ->
let storage = Storage.save_current () in
Lwt.on_any fut
(fun res -> push_task (fun () ->
Storage.restore_current storage; Effect.Deep.continue k res))
(fun exn -> push_task (fun () ->
Storage.restore_current storage; Effect.Deep.discontinue k exn)))
| _ -> None
in
{ effc }

(* part 4: putting it all together: running tasks *)

let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit =
let run_f_and_set_res () =
Storage.reset_to_empty();
match f () with
| res -> Lwt.wakeup promise res
| exception exc -> Lwt.wakeup_exn promise exc
in
Effect.Deep.try_with run_f_and_set_res () handler

let spawn f : _ Lwt.t =
setup_hooks ();
let lwt, resolve = Lwt.wait () in
push_task (run_inside_effect_handler_and_resolve_ resolve f);
lwt

(* part 4 (encore): running a task in the background *)

let run_inside_effect_handler_in_the_background_ f () : unit =
let run_f () : unit =
Storage.reset_to_empty();
try
f ()
with exn ->
!Lwt.async_exception_hook exn
in
Effect.Deep.try_with run_f () handler

let spawn_in_the_background f : unit =
setup_hooks ();
push_task (run_inside_effect_handler_in_the_background_ f)
92 changes: 92 additions & 0 deletions src/direct/lwt_direct.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
(** Direct style control flow for Lwt.

This module relies on OCaml 5's
{{:https://ocaml.org/manual/5.3/effects.html} effect handlers}.
Instead of chaining promises using {!Lwt.bind} and {!Lwt.map}
and other combinators, it becomes possible to start
lightweight "tasks" using [Lwt_direct.spawn (fun () -> ...)].
The body of such a task is written in direct-style code,
using OCaml's standard control flow structures such as loops,
higher-order functions, exception handlers, [match], etc.

Interactions with the rest of lwt can be done using [await],
for example:

{[
Lwt_direct.spawn (fun () ->
let continue = ref true in
while !continue do
match Lwt_io.read_line in_channel |> Lwt_direct.await with
| exception End_of_file -> continue := false
| line ->
let uppercase_line = String.uppercase_ascii line in
Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await
done)
]}

This code snippet contains a simple "task" that repeatedly reads
a line from a [Lwt_io] channel, uppercases it, and writes the
uppercase version to another channel.

This task is itself a [unit Lwt.t], which is resolved when the function
returns. It is possible to use
{!Lwt_direct.run_in_the_background} to ignore the result and
let the task run in the background instead.

*)

val spawn : (unit -> 'a) -> 'a Lwt.t
(** [spawn f] runs the function [f ()] in a task within
the [Lwt_unix] event loop. [f ()] can create [Lwt]
promises and use {!await} to wait for them. Like any promise
in Lwt, [f ()] can starve the event loop if it runs long computations
without yielding to the event loop.

When [f ()] terminates (successfully or not), the promise
[spawn f] is resolved with [f ()]'s result, or the exception
raised by [f ()]. *)

val spawn_in_the_background :
(unit -> unit) ->
unit
(** [spawn_in_the_background f] is similar to [ignore (spawn f)].
The computation [f()] runs in the background in the event loop
and returns no result.
If [f()] raises an exception, {!Lwt.async_exception_hook} is called. *)

val yield : unit -> unit
(** Yield to the event loop.

Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise an exception,
crash your program, or otherwise cause errors. It is a programming error to do so. *)

val await : 'a Lwt.t -> 'a
(** [await prom] returns the result of [prom], or re-raises the
exception with which [prom] failed if it failed.
If [prom] is not resolved yet, [await prom] will suspend the
current task and resume it when [prom] is resolved.

Calling [await] outside of {!spawn} or {!run_in_the_background} will raise an exception,
crash your program, or otherwise cause errors. It is a programming error to do so. *)

(** Local storage.

This storage is the same as the one described with {!Lwt.key},
except that it is usable from the inside of {!spawn} or
{!run_in_the_background}.

Each task has its own storage, independent from other tasks or promises. *)
module Storage : sig
type 'a key = 'a Lwt.key
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Do we want to export the type equality? It makes it possible to use Lwt functions to handle keys. Which at the very least is confusing but also would require to have tests in place.
  • Do we actually need to expose all of this considering it's all compatible with vanilla lwt storage? We could just say in the doc that all the Lwt.key-related functions are ok to use in run blocks.,
  • Should we actually advertise storage? it's deprecated so idk that we should encourage it much

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I think it's good to be able to use existing keys
  • it's deprecated but why? Local storage is important!! For logging and tracing at least. Eio has it too :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's deprecated because the way that the storage flows is non-obvious and not extensible.

Non-obvious:
Like even experienced Lwt users might have a hard time answering questions like:
In Lwt.on_success (fun v -> f v)) is f executed with the storage "of v" or the local ambient one?
What even is the storage "of v"? It's kinda ok to say if you have let v = <long series of bind>, but what if you have let v, r = Lwt.task ()? Does it get the storage from declaration site or does it gets it from resolution site?
What if you pass r around to several "threads" with different storages so that they race to resolve v?

It's the fundamental original issue of Lwt: there are no threads but the API sometimes say there is.

Non-extensible:
When you define something à la Lwt_list but for whatever data-structure you have in-house. Say Seqes or say lache. You'd want to be able to get storage and reset it so that the abstractions you build on top of your data-structure matches the behaviour of Lwt.
(E.g., in lache you would want to decide what happens if someone binds on a cached promise: do they get the storage from the "thread" which added the promise in the first place? do they get a clean-slate storage? you can't make that happen.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess actually you adding these storage function is because of the the lack of extensibility. So maybe the way out of it, the way to undeprecate is to

  • document clearly the built-in flow
  • provide the functions needed for extensibility
  • provide a few example and some documentation of how to properly do extensibility

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to write more about this, but in a way this storage makes more sense for Lwt_direct, where the scope should be clear (one call to run = one instance of storage). Even then there are questions about whether run inside run should inherit the storage or not? hmm

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I think the stoarge makes more sense for direct-style, because of the run wrapper gives it a more explicit scope. let's keep it for now and we'll see later

val new_key : unit -> 'a key
(** Alias to {!Lwt.new_key} *)

val get : 'a key -> 'a option
(** get the value associated with this key in local storage, or [None] *)

val set : 'a key -> 'a -> unit
(** [set k v] sets the key to the value for the rest of the task. *)

val remove : 'a key -> unit
(** Remove the value associated with this key, if any *)
end
Loading
Loading