diff --git a/.paket/paket.bootstrapper.exe b/.paket/paket.bootstrapper.exe deleted file mode 100644 index 167dfd6..0000000 Binary files a/.paket/paket.bootstrapper.exe and /dev/null differ diff --git a/examples/example04manyactors/Program.fs b/examples/example04manyactors/Program.fs index c91260f..143e6a5 100644 --- a/examples/example04manyactors/Program.fs +++ b/examples/example04manyactors/Program.fs @@ -22,6 +22,6 @@ let main argv = let endTime = DateTime.Now let duration = (endTime - startTime) printfn "%A" duration - printfn "Messages per second %f" ((float numberOfMessages) / (float duration.Seconds)) + printfn "Messages per second %f" ((float numberOfMessages) / (float duration.TotalSeconds)) Console.ReadLine() |> ignore 0 // return an integer exit code diff --git a/examples/example04manyactors/Program2.fs b/examples/example04manyactors/Program2.fs new file mode 100644 index 0000000..32a551d --- /dev/null +++ b/examples/example04manyactors/Program2.fs @@ -0,0 +1,49 @@ +open System +open Proto.FSharp +open Proto + +type Msg = + | Inc of int + | Get + +[] +let main argv = + let mutable state = 0 + let handler (ctx: IContext) msg = + match msg with + | Inc _ -> state <- state + 1 + | Get -> ctx.Sender Actor.spawnProps + + let createAndTell x = + Actor.create (fun msg -> counter Actor.initProps + |> Actor.spawn + + let numberOfMessages = 1000000 + let startTime = DateTime.Now + async { + Seq.init numberOfMessages id + |> Seq.map createAndTell + |> Seq.iteri (fun i pid -> pid Async.Parallel + //|> Async.RunSynchronously + //|> ignore + } |> Async.Start + + async { + let mutable count = -1 + while count < 1000000 do + do! Async.Sleep 100 + let! (c: int) = counter Async.RunSynchronously + + let endTime = DateTime.Now + let duration = (endTime - startTime) + printfn "%A" duration + printfn "Messages per second %f" ((float numberOfMessages) / (float duration.TotalSeconds)) + Console.ReadLine() |> ignore + 0 // return an integer exit code diff --git a/src/Proto.Actor.FSharp.Persistence/Proto.FSharp.Persistence.fs b/src/Proto.Actor.FSharp.Persistence/Proto.FSharp.Persistence.fs index 3ea5f0b..6f3b9fb 100644 --- a/src/Proto.Actor.FSharp.Persistence/Proto.FSharp.Persistence.fs +++ b/src/Proto.Actor.FSharp.Persistence/Proto.FSharp.Persistence.fs @@ -4,16 +4,20 @@ open Proto open System.Threading.Tasks open System -namespace Avalanchain.Core - open Proto.FSharp -[] module Persistence = open Proto open Proto.Persistence open Proto.Persistence.SnapshotStrategies + type SenderInfo<'Event> = { // Mostly for streaming support + Address: string + Tell: 'Event -> unit + } with static member FromPID (pid: PID) = { Address = pid.Address; Tell = fun e -> pid = SenderInfo<'Event> -> 'State -> int64 -> 'Command -> Async> + let private applyEvent recoverEvent replayEvent persistedEvent state (evt: Event) = match evt with | :? RecoverEvent as e -> @@ -51,19 +55,24 @@ module Persistence = | Started _ -> async { do! persistence.RecoverStateAsync() |> Async.AwaitTask } | _ -> async {()} - let private handler (persistence: Persistence) processCommand state (ctx: IContext) (cmd: 'Command): Async = - let res = processCommand state persistence.Index cmd + let private handler (persistence: Persistence) (processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>) (state: 'State) (ctx: IContext) (cmd: 'Command): Async = async { + let! res = processCommand (SenderInfo<'Event>.FromPID ctx.Sender) state persistence.Index cmd match res with - | Ok evt -> async { do! persistence.PersistEventAsync evt |> Async.AwaitTask - if isNull(ctx.Sender) |> not then ctx.Sender async { e >! ctx.Sender } + | Ok (evtOpt, save) -> match evtOpt with + | Some evt -> if save then do! persistence.PersistEventAsync evt |> Async.AwaitTask + if isNull(ctx.Sender) |> not then ctx.Sender () + | Error e -> e >! ctx.Sender + } + + let simpleProcessCommand = fun _ _ _ cmd -> async { return (Some cmd, true) |> Ok } [] module CommandSourcingAndSnapshotting = let persistDetailed (eventStore: IEventStore) (snapshotStore: ISnapshotStore) - (processCommand: 'State -> int64 -> 'Command -> Result<'Event, 'CommandError>) + (processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>) (recoverEvent: 'State -> int64 -> 'Event -> 'State) (replayEvent: 'State -> int64 -> 'Event -> 'State) (persistedEvent: 'State -> int64 -> 'Event -> 'State) @@ -89,7 +98,7 @@ module Persistence = let persist (provider: IProvider) - (processCommand: 'State -> int64 -> 'Command -> Result<'Event, 'CommandError>) + (processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>) (onEvent: 'State -> int64 -> 'Event -> 'State) (log: string -> unit) (snapshotStrategy: ISnapshotStrategy) @@ -111,7 +120,7 @@ module Persistence = let persistLight (provider: IProvider) - (processCommand: 'State -> int64 -> 'Command -> Result<'Event, 'CommandError>) + (processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>) (onEvent: 'State -> int64 -> 'Event -> 'State) (persistentID: string) (initialState: 'State) @@ -126,7 +135,7 @@ module Persistence = initialState [] - module EventSourcingAndSnapshotting = + module EventSourcingAndSnapshotting = let persistDetailed (eventStore: IEventStore) (snapshotStore: ISnapshotStore) @@ -142,7 +151,7 @@ module Persistence = CommandSourcingAndSnapshotting.persistDetailed eventStore snapshotStore - (fun _ _ cmd -> Ok cmd) + simpleProcessCommand recoverEvent replayEvent persistedEvent @@ -162,7 +171,7 @@ module Persistence = = CommandSourcingAndSnapshotting.persist provider - (fun _ _ cmd -> Ok cmd) + simpleProcessCommand onEvent log snapshotStrategy @@ -177,7 +186,7 @@ module Persistence = = CommandSourcingAndSnapshotting.persistLight provider - (fun _ _ cmd -> Ok cmd) + simpleProcessCommand onEvent persistentID initialState @@ -196,13 +205,13 @@ module Persistence = persistentID, System.Action<_>(applySnapshot recoverSnapshot persistedSnapshot ignore)) - Actor.create3Async (systemHandler persistence) (handler persistence (fun _ _ e -> Ok e) None) + Actor.create3Async (systemHandler persistence) (handler persistence simpleProcessCommand None) [] module CommandSourcing = let persist (eventStore: IEventStore) - (processCommand: 'State -> int64 -> 'Command -> Result<'Event, 'CommandError>) + (processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>) (persistedEvent: 'State -> int64 -> 'Event -> 'State) (persistentID: string) (initialState: 'State) @@ -227,7 +236,7 @@ module Persistence = = CommandSourcing.persist eventStore - (fun _ _ cmd -> Ok cmd) + simpleProcessCommand persistedEvent persistentID initialState @@ -242,7 +251,7 @@ module Persistence = persistentID None - let getEvents<'T> (handler: 'T -> unit) (persistentID: string) (indexStart: int64) (indexEnd: int64) (eventStore: IEventStore) = + let getEvents<'T> (eventStore: IEventStore) (persistentID: string) (indexStart: int64) (indexEnd: int64) (handler: 'T -> unit) = eventStore.GetEventsAsync (persistentID, indexStart, indexEnd, System.Action<_>(fun o -> if not (isNull o) then match o with diff --git a/src/Proto.Actor.FSharp/Proto.FSharp.fs b/src/Proto.Actor.FSharp/Proto.FSharp.fs index 565c0a6..df893bb 100644 --- a/src/Proto.Actor.FSharp/Proto.FSharp.fs +++ b/src/Proto.Actor.FSharp/Proto.FSharp.fs @@ -5,11 +5,11 @@ open System.Threading.Tasks open System module Async = - let inline startAsPlainTask (work : Async) = Task.Factory.StartNew(fun () -> work |> Async.RunSynchronously) + let inline startAsPlainTask (work : Async) = Async.StartAsTask work :> Task module System = - let toFunc<'a> f = Func<'a>(f) - let toFunc2<'a, 'b> f = Func<'a, 'b>(f) + let inline toFunc<'a> f = Func<'a>(f) + let inline toFunc2<'a, 'b> f = Func<'a, 'b>(f) [] module Core = @@ -26,7 +26,7 @@ module Core = | ReceiveTimeout of ReceiveTimeout | Continuation of Continuation - let (|IsSystemMessage|_|) (msg:obj) = + let inline (|IsSystemMessage|_|) (msg: obj) = match msg with | :? AutoReceiveMessage as m -> Some(AutoReceiveMessage m) | :? Terminated as m -> Some(Terminated m) @@ -49,106 +49,146 @@ module Core = | OneForOneStrategy of decider:Decider * maxNrOfRetries:int * withinTimeSpan:TimeSpan option | ExponentialBackoffStrategy of backoffWindow:TimeSpan * initialBackoff:TimeSpan - type FSharpActor<'Message, 'State>(systemMessageHandler: IContext -> SystemMessage -> 'State -> Async<'State>, handler: IContext -> 'Message -> 'State -> Async<'State>, initialState: 'State) = + type Effect<'State> = obj -> (IContext -> 'State -> 'State) option + type AsyncEffect<'State> = obj -> (IContext -> 'State -> Async<'State>) option + + let inline (<|>) (e1: Effect<'State>) (e2: Effect<'State>) = + fun o -> e1 o |> Option.orElse (e2 o) + + let inline (<||>) (e1: AsyncEffect<'State>) (e2: AsyncEffect<'State>) = + fun o -> e1 o |> Option.orElse (e2 o) + + let inline (<&>) (e1: Effect<'State>) (e2: Effect<'State>) = + fun (o: obj) -> + match (e1 o), (e2 o) with + | Some f1, Some f2 -> Some <| fun ctx state -> state |> f1 ctx |> f2 ctx + | Some f1, None -> Some f1 + | None, Some f2 -> Some f2 + | None, None -> None + + let inline (<&&>) (e1: AsyncEffect<'State>) (e2: AsyncEffect<'State>) = + fun (o: obj) -> + match (e1 o), (e2 o) with + | Some f1, Some f2 -> Some <| fun ctx state -> async { let! state' = state |> f1 ctx + return! state' |> f2 ctx } + | Some f1, None -> Some f1 + | None, Some f2 -> Some f2 + | None, None -> None + + let inline typedEffect (handler: IContext -> 'Message -> 'State -> 'T) = + fun (o: obj) -> match o with + | :? 'Message as msg -> Some <| fun ctx state -> handler ctx msg state + | _ -> None + + let inline systemEffect (handler: IContext -> SystemMessage -> 'State -> 'T) = + fun (o: obj) -> match o with + | IsSystemMessage msg -> Some <| fun ctx state -> handler ctx msg state + | _ -> None + + type FSharpAsyncActor<'State>(handler: AsyncEffect<'State>, initialState: 'State) = let mutable state = initialState interface IActor with member this.ReceiveAsync(context: IContext) = async { - match context.Message with - | IsSystemMessage msg -> - try - let! state' = systemMessageHandler context msg state - state <- state' - with - | x -> - printfn "Failed to execute actor: %A" x - raise x - | :? 'Message as msg -> - try - let! state' = handler context msg state - state <- state' - with - | x -> - printfn "Failed to execute actor: %A" x - raise x - | _ -> () + try + let applied = handler context.Message + match applied with + | Some f -> let! state' = f context state + state <- state' + | None -> () + with + | x -> + printfn "Failed to execute actor: %A" x + raise x } |> Async.startAsPlainTask + type FSharpActor<'State>(handler: Effect<'State>, initialState: 'State) = + let mutable state = initialState + interface IActor with + member this.ReceiveAsync(context: IContext) = + async { + try + let applied = handler context.Message + match applied with + | Some f -> let state' = f context state + state <- state' + | None -> () + with + | x -> + printfn "Failed to execute actor: %A" x + raise x + } |> Async.startAsPlainTask [] module Actor = - let spawn (props: Props) = Actor.Spawn(props) + let inline spawn (props: Props) = Actor.Spawn(props) - let spawnPrefix prefix (props: Props) = Actor.SpawnPrefix(props, prefix) + let inline spawnPrefix prefix (props: Props) = Actor.SpawnPrefix(props, prefix) - let spawnNamed name (props: Props) = Actor.SpawnNamed(props, name) + let inline spawnNamed name (props: Props) = Actor.SpawnNamed(props, name) - let initProps (producer: unit -> IActor) = - let producerFunc = System.Func<_>(producer) - Actor.FromProducer(producerFunc) + let inline initProps (producer: unit -> IActor) = Actor.FromProducer(System.Func<_>(producer)) - let spawnProps = initProps >> spawn + let inline spawnProps p = p |> initProps |> spawn - let spawnPropsPrefix prefix = initProps >> spawnPrefix prefix + let inline spawnPropsPrefix prefix = initProps >> spawnPrefix prefix - let spawnPropsNamed name = initProps >> spawnNamed name + let inline spawnPropsNamed name = initProps >> spawnNamed name - let withState3Async (systemMessageHandler: IContext -> SystemMessage -> 'State -> Async<'State>) (handler: IContext -> 'Message -> 'State -> Async<'State>) (initialState: 'State) = - fun () -> new FSharpActor<'Message, 'State>(systemMessageHandler, handler, initialState) :> IActor + let inline withState3Async (systemMessageHandler: IContext -> SystemMessage -> 'State -> Async<'State>) (handler: IContext -> 'Message -> 'State -> Async<'State>) (initialState: 'State) = + fun () -> new FSharpAsyncActor<'State>(systemEffect(systemMessageHandler) <&&> typedEffect(handler), initialState) :> IActor + + let inline withState2Async (handler: IContext -> 'Message -> 'State -> Async<'State>) (initialState: 'State) = + fun () -> new FSharpAsyncActor<'State>(typedEffect handler, initialState) :> IActor - let withState2Async (handler: IContext -> 'Message -> 'State -> Async<'State>) (initialState: 'State) = - withState3Async (fun _ _ s -> async { return s }) handler initialState - - let withStateAsync (handler: 'Message -> 'State -> Async<'State>) (initialState: 'State) = + let inline withStateAsync (handler: 'Message -> 'State -> Async<'State>) (initialState: 'State) = withState2Async (fun _ m s -> handler m s) initialState - let create3Async (systemMessageHandler: IContext -> SystemMessage -> Async) (handler: IContext -> 'Message -> Async) = + let inline create3Async (systemMessageHandler: IContext -> SystemMessage -> Async) (handler: IContext -> 'Message -> Async) = withState3Async (fun context message _ -> systemMessageHandler context message) (fun context message _ -> handler context message) () - let create2Async (handler: IContext -> 'Message -> Async) = + let inline create2Async (handler: IContext -> 'Message -> Async) = withState2Async (fun context message _ -> handler context message) () - let createAsync (handler: 'Message -> Async) = + let inline createAsync (handler: 'Message -> Async) = withState2Async (fun _ m _ -> handler m) () - let withState2 (handler: IContext -> 'Message -> 'State -> 'State) (initialState: 'State) = - async { - return withState2Async (fun ctx msg state -> async { return handler ctx msg state }) initialState - } |> Async.RunSynchronously + let inline withState3 (systemMessageHandler: IContext -> SystemMessage -> 'State -> 'State) (handler: IContext -> 'Message -> 'State -> 'State) (initialState: 'State) = + fun () -> new FSharpActor<'State>(systemEffect(systemMessageHandler) <&> typedEffect(handler), initialState) :> IActor + + let inline withState2 (handler: IContext -> 'Message -> 'State -> 'State) (initialState: 'State) = + fun () -> new FSharpActor<'State>(typedEffect handler, initialState) :> IActor - let withState (handler: 'Message -> 'State -> 'State) (initialState: 'State) = - async { - return withStateAsync (fun msg state -> async { return handler msg state }) initialState - } |> Async.RunSynchronously + let inline withState (handler: 'Message -> 'State -> 'State) (initialState: 'State) = + withState2 (fun _ m s -> handler m s) initialState - let create (handler: 'Message -> unit) = - async { - return createAsync (fun msg -> async { return handler msg }) - } |> Async.RunSynchronously + let inline create3 (systemMessageHandler: IContext -> SystemMessage -> unit) (handler: IContext -> 'Message -> unit) = + withState3 (fun context message _ -> systemMessageHandler context message) (fun context message _ -> handler context message) () - let create2 (handler: IContext -> 'Message -> unit) = - async { - return create2Async (fun ctx msg -> async { return handler ctx msg }) - } |> Async.RunSynchronously + let inline create2 (handler: IContext -> 'Message -> unit) = + withState2 (fun context message _ -> handler context message) () + + let inline create (handler: 'Message -> unit) = + withState2 (fun _ m _ -> handler m) () [] module Props = open System - let newProps() = Props() + let inline newProps() = Props() - let withProducer producer (props: Props) = + let inline withProducer producer (props: Props) = props.WithProducer(producer) - let withDispatcher dispatcher (props: Props) = + let inline withDispatcher dispatcher (props: Props) = props.WithDispatcher(dispatcher) - let withMailbox mailbox (props: Props) = + let inline withMailbox mailbox (props: Props) = props.WithMailbox(mailbox) - let withChildSupervisorStrategy supervisorStrategy (props: Props) = + let inline withChildSupervisorStrategy supervisorStrategy (props: Props) = let strategy = match supervisorStrategy with | DefaultStrategy -> Supervision.DefaultStrategy @@ -168,36 +208,37 @@ module Props = Proto.ExponentialBackoffStrategy(backoffWindow, initialBackoff) :> ISupervisorStrategy props.WithChildSupervisorStrategy(strategy) - let withReceiveMiddleware (middleware: Receive -> Receive) (props: Props) = + let inline withReceiveMiddleware (middleware: Receive -> Receive) (props: Props) = props.WithReceiveMiddleware([|toFunc2(middleware)|]) - let withReceiveMiddlewares (middlewares: (Receive -> Receive) list) (props: Props) = + let inline withReceiveMiddlewares (middlewares: (Receive -> Receive) list) (props: Props) = middlewares |> List.map toFunc2 |> Array.ofList |> (fun arr -> props.WithReceiveMiddleware(arr)) - let withSenderMiddleware (middleware: Sender -> Sender) (props: Props) = + let inline withSenderMiddleware (middleware: Sender -> Sender) (props: Props) = props.WithSenderMiddleware([|toFunc2(middleware)|]) - let withSenderMiddlewares (middlewares: (Sender -> Sender) list) (props: Props) = + let inline withSenderMiddlewares (middlewares: (Sender -> Sender) list) (props: Props) = middlewares |> List.map toFunc2 |> Array.ofList |> (fun arr -> props.WithSenderMiddleware(arr)) - let withSpawner spawner (props: Props) = + let inline withSpawner spawner (props: Props) = props.WithSpawner(spawner) [] module Pid = - let tell (pid: PID) msg = + let inline tell (pid: PID) msg = pid.Tell(msg) - let ask (pid: PID) msg = + let inline ask (pid: PID) msg = pid.RequestAsync(msg) |> Async.AwaitTask - let (!) msg (pid: PID) = tell pid msg - let (?) msg (pid: PID) = ask pid msg + let inline (!) msg (pid: PID) = tell pid msg + let inline (?) msg (pid: PID) = ask pid msg +