Conversation
1ed400a to
ff55251
Compare
|
The force-push was made to make this PR independent of #585; they are now both based on |
|
Just a quick note: I wouldn't expect many changes would be needed to sync.ml, as it was designed to support this in the first place. I was expecting it would just need another version of the This means that when a producer gives us a value, we always resume and accept it. With multiple streams, we'll try to CAS the result and might fail. In that case, it needs to return |
|
What I implemented is, after all, just another version of I'm sure there is a more elegant or succinct way to implement it than the way I have found, though. After all, the simplest way is often the most difficult to find :-)
This is what happens in sync.ml:402, or do you mean something else? |
talex5
left a comment
There was a problem hiding this comment.
Sorry for the late review!
I think trying to modify the internals like this is going to be very hard to get right, and it won't work when we add non-sync streams anyway.
A slight change to take_suspend and take should give you what you need:
diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml
index ee78c62a..7f1d39fd 100644
--- a/lib_eio/sync.ml
+++ b/lib_eio/sync.ml
@@ -383,10 +383,14 @@ let rec consumer_resume_cell t ~success ~in_transition cell =
if Atomic.compare_and_set cell old Finished then success req
else consumer_resume_cell t ~success ~in_transition cell
-let take_suspend t loc =
+let take_suspend ~accept t loc =
Suspend.enter_unchecked @@ fun ctx enqueue ->
let Short cell | Long (_, cell) = loc in
- let kc v = enqueue (Ok v); true in
+ let kc v =
+ let result = accept v in
+ enqueue result;
+ Result.is_ok result
+ in
add_to_cell t.producers (Slot kc) cell;
match loc with
| Short _ -> ()
@@ -401,17 +405,38 @@ let take_suspend t loc =
(* else being resumed *)
)
-let take (t : _ t) =
+let take_full ~accept (t : _ t) =
let old = Atomic.fetch_and_add t.balance (-1) in
if old > 0 then (
let cell = Q.next_resume t.producers in
consumer_resume_cell t cell
- ~success:(fun item -> item.kp (Ok true); item.v)
- ~in_transition:(fun cell -> take_suspend t (Short cell))
+ ~success:(fun item ->
+ let result = accept item.v in
+ item.kp (Ok (Result.is_ok result));
+ match result with
+ | Ok x -> x
+ | Error ex -> raise ex
+ )
+ ~in_transition:(fun cell -> take_suspend ~accept t (Short cell))
) else (
- take_suspend t (Long (Q.next_suspend t.consumers))
+ take_suspend ~accept t (Long (Q.next_suspend t.consumers))
)Then selecting over multiple streams can be done using fibers, e.g.
let select_of_many ts =
let finished = Atomic.make false in
ts
|> List.map (fun (t, fn) () ->
let accept x =
if Atomic.compare_and_set finished false true then Ok x
else Error (Cancel.Cancelled Already_accepted)
in
fn (take_full ~accept t)
)
|> Fiber.any| (* restore old balance, because another stream was ready first. *) | ||
| ignore (Atomic.fetch_and_add t.balance (+1)) |
There was a problem hiding this comment.
This isn't safe. Some other domain may have seen the updated value and be relying on it.
| let v = consumer_resume_cell t cell | ||
| ~success:(fun it -> it.kp (Ok true); it.v) | ||
| ?in_transition:None in | ||
| enqueue (Ok (f v)) |
There was a problem hiding this comment.
We don't want to run the user function f here because we're in the sender's context, which could be a different domain, sys-thread, etc.
|
thank you for getting back to me! This PR was (for me) mostly an attempt at getting serious with multicore ocaml, but I didn't really expect it to be merged anyway :) Therefore, thank you for all the helpful feedback and also thank you for providing such a pleasant library to work with! |
This is a work in progress for an implementation of
select_of_manywith the same type as the one in #585 in theStream.Lockingmodule. At some point, both implementations would need to be unified so that a mix of locking and sync streams can be selected from.I added a simple benchmark script which exercises the logic from multiple domains (although just one fiber calling
selectat the moment). It helped find one bug already. The time-per-item is at about 4-5 µs on my machine; in comparison, the Stream benchmark for sending items on a single stream takes 4.5 µs for the same number of domains -- i.e., it takes about the same time.Overall I've had the feeling that some approaches I've used might be controversial (eg., a spinning wait on
In_transitioncells). Also, the code is not in the nicest shape yet, it probably can be decomposed into smaller functions. I'd mainly like to hear some feedback if this is a viable route to go, like in #585. In any case, I hope my dabbling around doesn't cause too much annoyance to you all :-)