Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .paket/paket.bootstrapper.exe
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/example04manyactors/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ let main argv =
let endTime = DateTime.Now
let duration = (endTime - startTime)
printfn "%A" duration
printfn "Messages per second %f" ((float numberOfMessages) / (float duration.Seconds))
printfn "Messages per second %f" ((float numberOfMessages) / (float duration.TotalSeconds))
Console.ReadLine() |> ignore
0 // return an integer exit code
49 changes: 49 additions & 0 deletions examples/example04manyactors/Program2.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
open System
open Proto.FSharp
open Proto

type Msg =
| Inc of int
| Get

[<EntryPoint>]
let main argv =
let mutable state = 0
let handler (ctx: IContext) msg =
match msg with
| Inc _ -> state <- state + 1
| Get -> ctx.Sender <! state

let counter = Actor.create2 handler |> Actor.spawnProps

let createAndTell x =
Actor.create (fun msg -> counter <! msg)
|> Actor.initProps
|> Actor.spawn

let numberOfMessages = 1000000
let startTime = DateTime.Now
async {
Seq.init numberOfMessages id
|> Seq.map createAndTell
|> Seq.iteri (fun i pid -> pid <! Inc i)
//|> Async.Parallel
//|> Async.RunSynchronously
//|> ignore
} |> Async.Start

async {
let mutable count = -1
while count < 1000000 do
do! Async.Sleep 100
let! (c: int) = counter <? Get
printfn "%d" c
count <- c
} |> Async.RunSynchronously

let endTime = DateTime.Now
let duration = (endTime - startTime)
printfn "%A" duration
printfn "Messages per second %f" ((float numberOfMessages) / (float duration.TotalSeconds))
Console.ReadLine() |> ignore
0 // return an integer exit code
47 changes: 28 additions & 19 deletions src/Proto.Actor.FSharp.Persistence/Proto.FSharp.Persistence.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ open Proto
open System.Threading.Tasks
open System

namespace Avalanchain.Core

open Proto.FSharp

[<RequireQualifiedAccess>]
module Persistence =
open Proto
open Proto.Persistence
open Proto.Persistence.SnapshotStrategies

type SenderInfo<'Event> = { // Mostly for streaming support
Address: string
Tell: 'Event -> unit
} with static member FromPID (pid: PID) = { Address = pid.Address; Tell = fun e -> pid <! e }

type CommandProcessor<'Command,'Event,'State,'CommandError> = SenderInfo<'Event> -> 'State -> int64 -> 'Command -> Async<Result<('Event option) * bool, 'CommandError>>

let private applyEvent recoverEvent replayEvent persistedEvent state (evt: Event) =
match evt with
| :? RecoverEvent as e ->
Expand Down Expand Up @@ -51,19 +55,24 @@ module Persistence =
| Started _ -> async { do! persistence.RecoverStateAsync() |> Async.AwaitTask }
| _ -> async {()}

let private handler (persistence: Persistence) processCommand state (ctx: IContext) (cmd: 'Command): Async<unit> =
let res = processCommand state persistence.Index cmd
let private handler (persistence: Persistence) (processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>) (state: 'State) (ctx: IContext) (cmd: 'Command): Async<unit> = async {
let! res = processCommand (SenderInfo<'Event>.FromPID ctx.Sender) state persistence.Index cmd
match res with
| Ok evt -> async { do! persistence.PersistEventAsync evt |> Async.AwaitTask
if isNull(ctx.Sender) |> not then ctx.Sender <! Ok() }
| Error e -> async { e >! ctx.Sender }
| Ok (evtOpt, save) -> match evtOpt with
| Some evt -> if save then do! persistence.PersistEventAsync evt |> Async.AwaitTask
if isNull(ctx.Sender) |> not then ctx.Sender <! Ok evt
| None -> ()
| Error e -> e >! ctx.Sender
}

let simpleProcessCommand = fun _ _ _ cmd -> async { return (Some cmd, true) |> Ok }

[<RequireQualifiedAccess>]
module CommandSourcingAndSnapshotting =
let persistDetailed
(eventStore: IEventStore)
(snapshotStore: ISnapshotStore)
(processCommand: 'State -> int64 -> 'Command -> Result<'Event, 'CommandError>)
(processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>)
(recoverEvent: 'State -> int64 -> 'Event -> 'State)
(replayEvent: 'State -> int64 -> 'Event -> 'State)
(persistedEvent: 'State -> int64 -> 'Event -> 'State)
Expand All @@ -89,7 +98,7 @@ module Persistence =

let persist
(provider: IProvider)
(processCommand: 'State -> int64 -> 'Command -> Result<'Event, 'CommandError>)
(processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>)
(onEvent: 'State -> int64 -> 'Event -> 'State)
(log: string -> unit)
(snapshotStrategy: ISnapshotStrategy)
Expand All @@ -111,7 +120,7 @@ module Persistence =

let persistLight
(provider: IProvider)
(processCommand: 'State -> int64 -> 'Command -> Result<'Event, 'CommandError>)
(processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>)
(onEvent: 'State -> int64 -> 'Event -> 'State)
(persistentID: string)
(initialState: 'State)
Expand All @@ -126,7 +135,7 @@ module Persistence =
initialState

[<RequireQualifiedAccess>]
module EventSourcingAndSnapshotting =
module EventSourcingAndSnapshotting =
let persistDetailed
(eventStore: IEventStore)
(snapshotStore: ISnapshotStore)
Expand All @@ -142,7 +151,7 @@ module Persistence =
CommandSourcingAndSnapshotting.persistDetailed
eventStore
snapshotStore
(fun _ _ cmd -> Ok cmd)
simpleProcessCommand
recoverEvent
replayEvent
persistedEvent
Expand All @@ -162,7 +171,7 @@ module Persistence =
=
CommandSourcingAndSnapshotting.persist
provider
(fun _ _ cmd -> Ok cmd)
simpleProcessCommand
onEvent
log
snapshotStrategy
Expand All @@ -177,7 +186,7 @@ module Persistence =
=
CommandSourcingAndSnapshotting.persistLight
provider
(fun _ _ cmd -> Ok cmd)
simpleProcessCommand
onEvent
persistentID
initialState
Expand All @@ -196,13 +205,13 @@ module Persistence =
persistentID,
System.Action<_>(applySnapshot recoverSnapshot persistedSnapshot ignore))

Actor.create3Async (systemHandler persistence) (handler persistence (fun _ _ e -> Ok e) None)
Actor.create3Async (systemHandler persistence) (handler persistence simpleProcessCommand None)

[<RequireQualifiedAccess>]
module CommandSourcing =
let persist
(eventStore: IEventStore)
(processCommand: 'State -> int64 -> 'Command -> Result<'Event, 'CommandError>)
(processCommand: CommandProcessor<'Command,'Event,'State,'CommandError>)
(persistedEvent: 'State -> int64 -> 'Event -> 'State)
(persistentID: string)
(initialState: 'State)
Expand All @@ -227,7 +236,7 @@ module Persistence =
=
CommandSourcing.persist
eventStore
(fun _ _ cmd -> Ok cmd)
simpleProcessCommand
persistedEvent
persistentID
initialState
Expand All @@ -242,7 +251,7 @@ module Persistence =
persistentID
None

let getEvents<'T> (handler: 'T -> unit) (persistentID: string) (indexStart: int64) (indexEnd: int64) (eventStore: IEventStore) =
let getEvents<'T> (eventStore: IEventStore) (persistentID: string) (indexStart: int64) (indexEnd: int64) (handler: 'T -> unit) =
eventStore.GetEventsAsync (persistentID, indexStart, indexEnd,
System.Action<_>(fun o -> if not (isNull o) then
match o with
Expand Down
Loading