diff --git a/guides/advanced/.gitkeep b/guides/advanced/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/guides/operational/.gitkeep b/guides/operational/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/spawn_sdk/spawn_sdk/Examples.cheatmd b/spawn_sdk/spawn_sdk/Examples.cheatmd deleted file mode 100644 index 3df7213a..00000000 --- a/spawn_sdk/spawn_sdk/Examples.cheatmd +++ /dev/null @@ -1,333 +0,0 @@ -# Examples - -## Using Named Actors - -```elixir -# my_actor.ex -defmodule SpawnSdkExample.Actors.MyActor do - use SpawnSdk.Actor, - name: "jose", # Default is Full Qualified Module name a.k.a __MODULE__ - kind: :named, # Default is already :uΒ«named. Valid are :named | :unnamed | :pooled - stateful: true, # Default is already true - state_type: Io.Eigr.Spawn.Example.MyState, - deactivate_timeout: 30_000, - snapshot_timeout: 2_000 - - require Logger - - alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} - - defact init(%Context{state: state} = ctx) do - Logger.info("[joe] Received InitRequest. Context: #{inspect(ctx)}") - - %Value{} - |> Value.state(state) - |> Value.reply!() - end - - defact sum( - %MyBusinessMessage{value: value} = data, - %Context{state: state} = ctx - ) do - Logger.info("Received Request: #{inspect(data)}. Context: #{inspect(ctx)}") - - new_value = if is_nil(state), do: value, else: (state.value || 0) + value - - %Value{} - |> Value.of(%MyBusinessMessage{value: new_value}, %MyState{value: new_value}) - |> Value.reply!() - end -end - -# Invoking Named Actor - -iex> SpawnSdk.invoke("joe", system: "spawn-system", action: "sum", payload: %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 1}) - {:ok, %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 12} - -# Invoke Actors with a delay set in milliseconds -SpawnSdk.invoke("joe", system: "spawn-system", action: "ping", delay: 5_000) - {:ok, :async} - -# Invoke Actors scheduled to a specific DateTime -SpawnSdk.invoke("joe", system: "spawn-system", action: "ping", scheduled_to: ~U[2023-01-01 00:32:00.145Z]) - {:ok, :async} -``` - -## Using Unnamed Actors - -```elixir -# Unnamed.ex -defmodule SpawnSdkExample.Actors.UnnamedActor do - use SpawnSdk.Actor, - name: "unnamed_actor", - kind: :unnamed, - state_type: Io.Eigr.Spawn.Example.MyState - - require Logger - - alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} - - defact sum( - %MyBusinessMessage{value: value} = data, - %Context{state: state} = ctx - ) do - Logger.info("Received Request: #{inspect(data)}. Context: #{inspect(ctx)}") - - new_value = if is_nil(state), do: value, else: (state.value || 0) + value - - %Value{} - |> Value.of(%MyBusinessMessage{value: new_value}, %MyState{value: new_value}) - |> Value.reply!() - end -end - -# Spawning Unnamed actors -SpawnSdk.spawn_actor("robert", system: "spawn-system", actor: "unnamed_actor") - :ok - -# Invoke Spawned Actors -SpawnSdk.invoke("robert", system: "spawn-system", action: "sum", payload: %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 1}) - {:ok, %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 16}} - -# Invoke Actors in a lazy way without having to spawn them before -SpawnSdk.invoke("robert_lazy", ref: SpawnSdkExample.Actors.UnnamedActor, system: "spawn-system", action: "sum", payload: %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 1}) - {:ok, %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 1}} -``` - -## Using Pooled Actors - -```elixir -# pooled.ex -defmodule SpawnSdkExample.Actors.PooledActor do - use SpawnSdk.Actor, - name: "pooled_actor", - kind: :pooled, - stateful: false - - require Logger - - defact ping(_data, %Context{} = ctx) do - Logger.info("Received Request. Context: #{inspect(ctx)}") - - Value.of() - |> Value.void() - end -end - -# Invoke Pooled Actors -SpawnSdk.invoke("pooled_actor", system: "spawn-system", action: "ping", pooled: true) - {:ok, nil} -``` - -## Using Timer Actions - -```elixir -defmodule SpawnSdkExample.Actors.ClockActor do - use SpawnSdk.Actor, - name: "clock_actor", - state_type: Io.Eigr.Spawn.Example.MyState, - deactivate_timeout: 86_400_000 - - require Logger - - alias Io.Eigr.Spawn.Example.MyState - - @set_timer 15_000 - defact clock(%Context{state: state} = ctx) do - Logger.info("[clock] Clock Actor Received Request. Context: #{inspect(ctx)}") - - new_value = if is_nil(state), do: 0, else: state.value + 1 - new_state = MyState.new(value: new_value) - - Value.of() - |> Value.state(new_state) - |> Value.noreply!() - end -end -``` - -## Worflows - -```elixir -# side_effect.ex -defmodule SpawnSdkExample.Actors.UnnamedActor do - use SpawnSdk.Actor, - kind: :unnamed, - stateful: false, - state_type: Io.Eigr.Spawn.Example.MyState - - require Logger - - alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} - - alias SpawnSdk.Flow.SideEffect - - defact sum(%MyBusinessMessage{value: value} = data, %Context{state: state} = ctx) do - Logger.info("Received Request: #{inspect(data)}. Context: #{inspect(ctx)}") - - new_value = if is_nil(state), do: value, else: (state.value || 0) + value - - result = %MyBusinessMessage{value: new_value} - new_state = %MyState{value: new_value} - - Value.of() - |> Value.value(result) - |> Value.state(new_state) - |> Value.effects( - # This returns a list of side effects. In this case containing only one effect. However, multiple effects can be chained together, - # just by calling the effect function as shown here. - # The delay means that it will be fired asynchronously after 5000 milliseconds (5 seconds) - # If only one effect is desired, you can also choose to use the to/3 function together with Value.effect(). - # Example: Values.effect(SideEffect.to(name, func, payload)) - SideEffect.of() - |> SideEffect.effect("joe", :sum, result, delay: 5_000) - ) - |> Value.reply!() - end -end - -``` - -```elixir -# Pipes and Forwards - -# pipe_forward.ex -defmodule SpawnSdkExample.Actors.ForwardPipeActor do - use SpawnSdk.Actor, - name: "pipeforward", - kind: :named, - stateful: false - - require Logger - - alias Io.Eigr.Spawn.Example.MyBusinessMessage - - defact forward_example(%MyBusinessMessage{} = msg, _ctx) do - Logger.info("Received request with #{msg.value}") - - Value.of() - |> Value.value(MyBusinessMessage.new(value: 999)) - |> Value.forward( - Forward.to("second_actor", "sum_plus_one") - ) - |> Value.void() - end - - defact pipe_example(%MyBusinessMessage{} = msg, _ctx) do - Logger.info("Received request with #{msg.value}") - - Value.of() - |> Value.value(MyBusinessMessage.new(value: 999)) - |> Value.pipe( - Pipe.to("second_actor", "sum_plus_one") - ) - |> Value.void() - end -end - -# other.ex -defmodule SpawnSdkExample.Actors.SecondActorExample do - use SpawnSdk.Actor, - name: "second_actor", - stateful: false - - require Logger - - alias Io.Eigr.Spawn.Example.MyBusinessMessage - - defact sum_plus_one(%MyBusinessMessage{} = msg, _ctx) do - Logger.info("Received request with #{msg.value}") - - Value.of() - |> Value.value(MyBusinessMessage.new(value: msg.value + 1)) - |> Value.void() - end -end - -``` - -```elixir -# Broadcast -# driver.ex -defmodule Fleet.Actors.Driver do - use SpawnSdk.Actor, - kind: :unnamed, - state_type: Fleet.Domain.Driver - - alias Fleet.Domain.{ - Driver, - OfferRequest, - OfferResponse, - Point - } - - require Logger - - @brain_actor_channel "fleet.controllers.topic" - - defact update_position(%Point{} = position, %Context{state: %Driver{id: name} = driver} = ctx) do - Logger.info( - "Driver [#{name}] Received Update Position Event. Position: [{inspect(position)}]. Context: #{inspect(ctx)}" - ) - - driver_state = %Driver{driver | position: position} - - %Value{} - |> Value.of(driver_state, driver_state) - |> Value.broadcast( - Broadcast.to( - @brain_actor_channel, - driver_state - ) - ) - end -end - -defmodule Fleet.Actors.FleetControllersActor do - use SpawnSdk.Actor, - kind: :unnamed, - channels: [ - {"fleet.controllers.topic", "update_position_receive"} - ] # or just ["fleet.controllers.topic"] and it will forward to a action called receive - - alias Fleet.Domain.Point - - defact update_position_receive(%Point{} = position, _ctx) do - Logger.info( - "Received Update Position Event. Position: [#{inspect(position)}]" - ) - - Value.of() - end -end -``` - -## Declaring the supervision tree - -```elixir -# application.ex -defmodule SpawnSdkExample.Application do - @moduledoc false - use Application - - @impl true - def start(_type, _args) do - children = [ - { - SpawnSdk.System.Supervisor, - system: "spawn-system", - actors: [ - # your actors here - SpawnSdkExample.Actors.MyActor, - SpawnSdkExample.Actors.UnnamedActor, - SpawnSdkExample.Actors.ClockActor, - SpawnSdkExample.Actors.PooledActor - ] - } - ] - - opts = [strategy: :one_for_one, name: SpawnSdkExample.Supervisor] - Supervisor.start_link(children, opts) - end -end -``` \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/README.md b/spawn_sdk/spawn_sdk/README.md index 33c16368..d1901e16 100644 --- a/spawn_sdk/spawn_sdk/README.md +++ b/spawn_sdk/spawn_sdk/README.md @@ -7,134 +7,182 @@ Spawn is a Stateful Serverless Platform for providing the multi-language Actor M The advantage of the Elixir SDK over other SDKs is in Elixir's native ability to connect directly to an Erlang network. For this reason, the Elixir SDK allows any valid Elixir application to be part of a Spawn network without needing a sidecar attached. -## Installation +## Quick Start -[Available in Hex](https://hex.pm/packages/spawn_sdk), the package can be installed -by adding `spawn_sdk` and `spawn_statestores_*` to your list of dependencies in `mix.exs`: +Get up and running with Spawn actors in minutes: + +1. **Add to your dependencies:** ```elixir def deps do [ {:spawn_sdk, "~> 2.0.0-RC9"}, - - # You can uncomment one of those dependencies if you are going to use Persistent Actors - #{:spawn_statestores_mariadb, "~> 2.0.0-RC9"}, + # Optional: for persistent actors #{:spawn_statestores_postgres, "~> 2.0.0-RC9"}, ] end ``` -### Deploy +2. **Create your first actor:** -Following the steps below, you will have a valid Elixir application to use in a Spawn cluster. However, you will still need to generate a container image with your application to use it together with the [Spawn Operator for Kubernetes](https://github.com/eigr/spawn/tree/main/spawn_operator/spawn_operator). +```elixir +defmodule MyApp.Actors.Counter do + use SpawnSdk.Actor, + name: "counter", + stateful: true, + state_type: MyApp.Domain.CounterState + + action "increment", fn %Context{state: state}, %{value: value} -> + new_count = (state.count || 0) + value + new_state = %CounterState{count: new_count} + + Value.of(%{count: new_count}, new_state) + end +end +``` -## How to use +3. **Set up your supervision tree:** -After creating an Elixir application project, create the protobuf files for your business domain. -It is common practice to do this under the **priv/** folder. Let's demonstrate an example: +```elixir +defmodule MyApp.Application do + use Application -```protobuf -syntax = "proto3"; + def start(_type, _args) do + children = [ + {SpawnSdk.System.Supervisor, + system: "my-app", actors: [MyApp.Actors.Counter]} + ] -package io.eigr.spawn.example; + Supervisor.start_link(children, strategy: :one_for_one) + end +end +``` -message MyState { - int32 value = 1; -} +4. **Use your actor:** -message MyBusinessMessage { - int32 value = 1; -} +```elixir +SpawnSdk.invoke("counter", + system: "my-app", + action: "increment", + payload: %{value: 5} +) +#=> {:ok, %{count: 5}} ``` -It is important to try to separate the type of message that must be stored as the actors' state from the type of messages -that will be exchanged between their actors' operations calls. In other words, the Actor's internal state is also represented -as a protobuf type, and it is a good practice to separate these types of messages from the others in its business domain. +## Documentation -In the above case `MyState` is the type protobuf that represents the state of the Actor that we will create later -while `MyBusiness` is the type of message that we will send and receive from this Actor. +### πŸ“š Basic Concepts -Now that we have defined our input and output types as Protobuf types we will need to compile these files to generate their respective Elixir modules. An example of how to do this can be found [here](https://github.com/eigr/spawn/blob/main/spawn_sdk/spawn_sdk_example/compile-example-pb.sh) +- **[Quickstart Guide](guides/basic/quickstart.md)** - Get started with your first actor +- **[Actor Types](guides/basic/actor_types.md)** - Named, unnamed, pooled, and timer actors +- **[Actor Configuration](guides/basic/actor_configuration.md)** - Complete configuration reference +- **[Client API](guides/basic/client_api.md)** - Invoking actors and handling responses +- **[Supervision](guides/basic/supervision.md)** - Setting up your actor system -> **_NOTE:_** You need to have installed the elixir plugin for protoc. More information on how to obtain and install the necessary tools can be found here [here](https://github.com/elixir-protobuf/protobuf#usage) +### πŸš€ Advanced Features -Now that the protobuf types have been created we can proceed with the code. Example definition of an Actor. +- **[Side Effects](guides/advanced/side_effects.md)** - Asynchronous actor-to-actor communication +- **[Forwards and Pipes](guides/advanced/forwards_and_pipes.md)** - Request routing and processing pipelines +- **[Broadcast](guides/advanced/broadcast.md)** - Event-driven architectures and pub-sub patterns -## Named Actors +## Key Features -In this example we are creating an actor in a Named way, that is, it is a known actor at compile time. +- **🎭 Multiple Actor Types** - Named, unnamed, pooled, and timer actors +- **πŸ’Ύ State Management** - Persistent, transactional state with snapshots +- **πŸ”„ Side Effects** - Asynchronous, fire-and-forget operations +- **πŸ“‘ Broadcasting** - Pub-sub messaging between actors +- **πŸ”€ Flow Control** - Forward and pipe patterns for request routing +- **⚑ High Performance** - Native Elixir clustering and distribution +- **πŸ”§ Easy Integration** - Direct Phoenix and LiveView integration +## Examples + +### Named Actor (Always Available) ```elixir -defmodule SpawnSdkExample.Actors.MyActor do +defmodule MyApp.Actors.ConfigService do use SpawnSdk.Actor, - name: "jose", # Default is Full Qualified Module name a.k.a __MODULE__ - kind: :named, # Default is already :named. Valid are :named | :unnamed - stateful: true, # Default is already true - state_type: Io.Eigr.Spawn.Example.MyState, # or :json if you don't care about protobuf types - deactivate_timeout: 30_000, - snapshot_timeout: 2_000 - - require Logger - - alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} - - # The callback could also be referenced to an existing function: - # action "SomeAction", &some_defp_handler/0 - # action "SomeAction", &SomeModule.handler/1 - # action "SomeAction", &SomeModule.handler/2 - - init fn %Context{state: state} = ctx -> - Logger.info("[joe] Received InitRequest. Context: #{inspect(ctx)}") + name: "config_service", + kind: :named, + stateful: true - Value.of() - |> Value.state(state) + action "get_config", fn %Context{state: state}, %{key: key} -> + value = Map.get(state.config, key) + Value.of(%{key: key, value: value}, state) end +end +``` - action "Sum", fn %Context{state: state} = ctx, %MyBusinessMessage{value: value} = data -> - Logger.info("Received Request: #{inspect(data)}. Context: #{inspect(ctx)}") - - new_value = if is_nil(state), do: value, else: (state.value || 0) + value - - Value.of(%MyBusinessMessage{value: new_value}, %MyState{value: new_value}) +### Unnamed Actor (Dynamic Instances) +```elixir +defmodule MyApp.Actors.UserSession do + use SpawnSdk.Actor, + name: "user_session", + kind: :unnamed, + state_type: MyApp.Domain.SessionState + + action "login", fn %Context{}, %{user_id: user_id} -> + session_state = %SessionState{ + user_id: user_id, + logged_in_at: DateTime.utc_now() + } + + Value.of(%{success: true}, session_state) end end +# Usage +SpawnSdk.invoke("user_123", ref: "user_session", ...) ``` -We declare two actions that the Actor can do. An initialization action that will be called every time an Actor instance is created and an action that will be responsible for performing a simple sum. - -Note Keep in mind that any Action that has the names present in the list below will behave as an initialization Action and will be called when the Actor is started (if there is more than one Action with one of these names, only one will be called). - -Defaults inicialization Action names: "**init**", "**Init**", "**setup**", "**Setup**" +### Side Effects (Async Operations) +```elixir +action "process_order", fn ctx, order_data -> + Value.of(order_result, new_state) + |> Value.effects([ + SideEffect.of() + |> SideEffect.effect("email_service", :send_confirmation, order_data) + |> SideEffect.effect("inventory_service", :update_stock, order_data) + ]) +end +``` -## Unnamed Actor +### Broadcasting (Event-Driven) +```elixir +action "create_user", fn ctx, user_data -> + user_created_event = %UserCreated{...} + + Value.of(new_user, updated_state) + |> Value.broadcast(Broadcast.to("user.events", user_created_event)) +end +``` -We can also create Unnamed Dynamic/Lazy actors, that is, despite having its unnamed behavior defined at compile time, a Lazy actor will only have a concrete instance when it is associated with an identifier/name at runtime. Below follows the same previous actor being defined as Unnamed. +## Production Ready -```elixir -defmodule SpawnSdkExample.Actors.UnnamedActor do - use SpawnSdk.Actor, - name: "unnamed_actor", - kind: :unnamed, - state_type: Io.Eigr.Spawn.Example.MyState +Spawn is production-ready and battle-tested: - require Logger +- **Kubernetes Native** - Full Kubernetes operator support +- **Observability** - OpenTelemetry integration, metrics, and tracing +- **Persistence** - PostgreSQL and MySQL state stores +- **Scalability** - Horizontal scaling across multiple nodes +- **Security** - State encryption and secure networking - alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} +## Getting Help - action "Sum", fn %Context{state: state} = ctx, %MyBusinessMessage{value: value} = data -> - Logger.info("Received Request: #{inspect(data)}. Context: #{inspect(ctx)}") +- **[GitHub Issues](https://github.com/eigr/spawn/issues)** - Bug reports and feature requests +- **[Documentation](https://hexdocs.pm/spawn_sdk)** - Complete API documentation +- **[Examples](https://github.com/eigr/spawn/tree/main/examples)** - Sample applications +- **[Community](https://github.com/eigr/spawn/discussions)** - Questions and discussions - new_value = if is_nil(state), do: value, else: (state.value || 0) + value +## What's Next? - Value.of(%MyBusinessMessage{value: new_value}, %MyState{value: new_value}) - end -end -``` +1. Follow the **[Quickstart Guide](guides/basic/quickstart.md)** to build your first actor +2. Explore **[Actor Types](guides/basic/actor_types.md)** to understand different patterns +3. Learn about **[Advanced Features](guides/advanced/)** for complex scenarios +4. Check out **[Examples](https://github.com/eigr/spawn/tree/main/examples)** for real-world patterns -Notice that the only thing that has changed is the the kind of actor, in this case the kind is set to :unnamed. +--- -> **_NOTE:_** Can Elixir programmers think in terms of Named vs Unnamed actors as more or less known at startup vs dynamically supervised/registered? That is, defining your actors directly in the supervision tree or using a Dynamic Supervisor for that. +**Ready to build distributed, stateful applications with ease? Start with the [Quickstart Guide](guides/basic/quickstart.md)!** ## Side Effects diff --git a/spawn_sdk/spawn_sdk/guides/advanced/broadcast.md b/spawn_sdk/spawn_sdk/guides/advanced/broadcast.md new file mode 100644 index 00000000..4c978bdd --- /dev/null +++ b/spawn_sdk/spawn_sdk/guides/advanced/broadcast.md @@ -0,0 +1,667 @@ +# Broadcast + +Broadcast allows actors to send messages to multiple subscribers simultaneously, enabling event-driven architectures and pub-sub patterns. + +## Understanding Broadcast + +Broadcast in Spawn enables: +- **One-to-many communication** - Send messages to multiple actors at once +- **Event-driven architectures** - Publish domain events that multiple services can react to +- **Decoupled systems** - Publishers don't need to know about subscribers +- **External integration** - Bridge with Phoenix PubSub and other external systems + +## Basic Broadcast + +### Publishing Events + +```elixir +defmodule MyApp.Actors.OrderProcessor do + use SpawnSdk.Actor, + name: "order_processor", + state_type: MyApp.Domain.OrderState + + alias SpawnSdk.Flow.Broadcast + alias MyApp.Events.{OrderCreated, OrderUpdated} + + action "CreateOrder", fn %Context{state: state} = ctx, order_data -> + # Process the order + new_order = create_order(order_data) + updated_state = add_order_to_state(state, new_order) + + # Create event + order_event = %OrderCreated{ + order_id: new_order.id, + user_id: new_order.user_id, + amount: new_order.amount, + created_at: DateTime.utc_now() + } + + Value.of() + |> Value.response(new_order) + |> Value.state(updated_state) + |> Value.broadcast( + Broadcast.to("order.events", order_event) + ) + end + + action "UpdateOrderStatus", fn %Context{state: state} = ctx, update_data -> + # Update order status + updated_order = update_order_status(state, update_data) + new_state = update_order_in_state(state, updated_order) + + # Broadcast status change + status_event = %OrderUpdated{ + order_id: updated_order.id, + old_status: update_data.old_status, + new_status: updated_order.status, + updated_at: DateTime.utc_now() + } + + Value.of() + |> Value.response(updated_order) + |> Value.state(new_state) + |> Value.broadcast( + Broadcast.to("order.status", status_event) + ) + end +end +``` + +### Subscribing to Events + +```elixir +defmodule MyApp.Actors.EmailService do + use SpawnSdk.Actor, + name: "email_service", + channels: [ + {"order.events", "handle_order_event"}, + {"user.events", "handle_user_event"} + ], + stateful: false + + require Logger + + action "HandleOrderEvent", fn _ctx, %OrderCreated{} = event -> + Logger.info("Sending order confirmation email for order #{event.order_id}") + + send_order_confirmation_email(event.user_id, event.order_id) + + Value.of(%{email_sent: true, order_id: event.order_id}) + end + + action "HandleOrderEvent", fn _ctx, %OrderUpdated{} = event -> + Logger.info("Sending order status update email for order #{event.order_id}") + + send_status_update_email(event.order_id, event.new_status) + + Value.of(%{status_email_sent: true, order_id: event.order_id}) + end + + defp send_order_confirmation_email(user_id, order_id) do + # Email sending logic + end + + defp send_status_update_email(order_id, status) do + # Status email logic + end +end + +defmodule MyApp.Actors.InventoryService do + use SpawnSdk.Actor, + name: "inventory_service", + channels: [ + {"order.events", "handle_order_event"} + ], + state_type: MyApp.Domain.InventoryState + + action "HandleOrderEvent", fn %Context{state: state} = ctx, %OrderCreated{} = event -> + Logger.info("Updating inventory for order #{event.order_id}") + + # Reduce inventory + updated_state = reduce_inventory(state, event.order_id) + + Value.of() + |> Value.state(updated_state) + end +end +``` + +## Multiple Channel Broadcasting + +Send events to multiple channels simultaneously: + +```elixir +defmodule MyApp.Actors.UserManager do + use SpawnSdk.Actor, + name: "user_manager", + state_type: MyApp.Domain.UserManagerState + + action "CreateUser", fn %Context{state: state} = ctx, user_data -> + # Create user + new_user = create_user(user_data) + updated_state = add_user_to_state(state, new_user) + + # Create different events for different audiences + user_event = %UserCreated{ + user_id: new_user.id, + email: new_user.email, + created_at: DateTime.utc_now() + } + + analytics_event = %UserSignup{ + user_id: new_user.id, + signup_source: user_data.source, + timestamp: DateTime.utc_now(), + metadata: user_data.metadata + } + + Value.of() + |> Value.response(new_user) + |> Value.state(updated_state) + |> Value.broadcast( + # Send to user events channel + Broadcast.to("user.events", user_event) + # Send to analytics channel + |> Broadcast.to("analytics.events", analytics_event) + # Send to audit channel + |> Broadcast.to("audit.events", %AuditLog{ + action: "user_created", + user_id: new_user.id, + timestamp: DateTime.utc_now() + }) + ) + end +end +``` + +## Channel Patterns + +### Domain-Based Channels + +```elixir +# User domain events +"user.events" # All user-related events +"user.created" # Specific user creation events +"user.updated" # User update events +"user.deleted" # User deletion events + +# Order domain events +"order.events" # All order events +"order.created" # New orders +"order.status" # Status changes +"order.payment" # Payment events + +# System events +"system.alerts" # System alerts +"system.metrics" # Performance metrics +"system.audit" # Audit trail +``` + +### Hierarchical Channel Subscriptions + +```elixir +defmodule MyApp.Actors.AuditLogger do + use SpawnSdk.Actor, + name: "audit_logger", + channels: [ + # Listen to all user events + {"user.events", "log_user_event"}, + # Listen to all order events + {"order.events", "log_order_event"}, + # Listen to specific security events + {"security.breach", "log_security_event"} + ], + stateful: false + + action "LogUserEvent", fn _ctx, event -> + log_audit_event("USER", event) + Value.of() + end + + action "LogOrderEvent", fn _ctx, event -> + log_audit_event("ORDER", event) + Value.of() + end + + action "LogSecurityEvent", fn _ctx, event -> + # High priority logging + log_security_breach(event) + send_security_alert(event) + Value.of() + end +end +``` + +## External Broadcast Integration + +### Phoenix PubSub Integration + +```elixir +defmodule MyApp.ExternalEventSubscriber do + @moduledoc """ + Subscribes to actor broadcasts and forwards them to Phoenix PubSub + for LiveView integration. + """ + use GenServer + require Logger + + alias SpawnSdk.Channel.Subscriber + + @impl true + def init(state) do + # Subscribe to actor channels + Subscriber.subscribe("user.events") + Subscriber.subscribe("order.events") + Subscriber.subscribe("notification.events") + + {:ok, state} + end + + @impl true + def handle_info({:receive, payload}, state) do + Logger.debug("Forwarding actor event to Phoenix PubSub: #{inspect(payload)}") + + # Forward to Phoenix PubSub based on event type + case payload do + %UserCreated{} = event -> + Phoenix.PubSub.broadcast(MyApp.PubSub, "user_updates", {:user_created, event}) + + %OrderCreated{} = event -> + Phoenix.PubSub.broadcast(MyApp.PubSub, "order_updates", {:order_created, event}) + Phoenix.PubSub.broadcast(MyApp.PubSub, "user:#{event.user_id}", {:new_order, event}) + + %NotificationEvent{} = event -> + Phoenix.PubSub.broadcast(MyApp.PubSub, "notifications", {:notification, event}) + + _ -> + Logger.warn("Unknown event type: #{inspect(payload)}") + end + + {:noreply, state} + end + + def start_link(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end +end +``` + +### Registering External Subscribers + +```elixir +defmodule MyApp.Application do + use Application + + @impl true + def start(_type, _args) do + children = [ + # Phoenix PubSub + {Phoenix.PubSub, name: MyApp.PubSub}, + + # Spawn system with external subscribers + { + SpawnSdk.System.Supervisor, + system: "spawn-system", + actors: [ + MyApp.Actors.UserManager, + MyApp.Actors.OrderProcessor, + MyApp.Actors.EmailService + ], + external_subscribers: [ + {MyApp.ExternalEventSubscriber, []}, + {MyApp.MetricsCollector, []}, + {MyApp.WebhookNotifier, []} + ] + } + ] + + opts = [strategy: :one_for_one, name: MyApp.Supervisor] + Supervisor.start_link(children, opts) + end +end +``` + +### LiveView Integration + +```elixir +defmodule MyAppWeb.DashboardLive do + use MyAppWeb, :live_view + + @impl true + def mount(_params, _session, socket) do + # Subscribe to Phoenix PubSub topics that receive actor events + Phoenix.PubSub.subscribe(MyApp.PubSub, "user_updates") + Phoenix.PubSub.subscribe(MyApp.PubSub, "order_updates") + + {:ok, assign(socket, users: [], orders: [], notifications: [])} + end + + @impl true + def handle_info({:user_created, %UserCreated{} = event}, socket) do + # Handle user created event from actors + updated_users = [event | socket.assigns.users] + {:noreply, assign(socket, users: updated_users)} + end + + @impl true + def handle_info({:order_created, %OrderCreated{} = event}, socket) do + # Handle order created event from actors + updated_orders = [event | socket.assigns.orders] + {:noreply, assign(socket, orders: updated_orders)} + end +end +``` + +## Advanced Broadcast Patterns + +### Event Sourcing with Broadcasts + +```elixir +defmodule MyApp.Actors.EventSourcingAggregate do + use SpawnSdk.Actor, + name: "user_aggregate", + kind: :unnamed, + state_type: MyApp.Domain.UserAggregate + + action "UpdateProfile", fn %Context{state: state} = ctx, update_data -> + # Apply business rules and generate events + {updated_state, domain_events} = apply_profile_update(state, update_data) + + # Build broadcast for all generated events + broadcast = domain_events + |> Enum.reduce(Broadcast.new(), fn event, acc -> + channel = channel_for_event(event) + Broadcast.to(acc, channel, event) + end) + + Value.of() + |> Value.response(%{events_generated: length(domain_events)}) + |> Value.state(updated_state) + |> Value.broadcast(broadcast) + end + + defp channel_for_event(%ProfileUpdated{}), do: "user.profile" + defp channel_for_event(%EmailChanged{}), do: "user.email" + defp channel_for_event(%AddressChanged{}), do: "user.address" + defp channel_for_event(_), do: "user.events" +end +``` + +### Saga Coordination via Broadcasts + +```elixir +defmodule MyApp.Actors.OrderSaga do + use SpawnSdk.Actor, + name: "order_saga", + kind: :unnamed, + channels: [ + {"payment.events", "handle_payment_event"}, + {"inventory.events", "handle_inventory_event"}, + {"shipping.events", "handle_shipping_event"} + ], + state_type: MyApp.Domain.SagaState + + action "StartOrderSaga", fn ctx, %OrderCreated{} = event -> + saga_state = %SagaState{ + order_id: event.order_id, + steps: [:payment, :inventory, :shipping], + completed_steps: [], + status: :in_progress + } + + # Start by triggering payment processing + payment_command = %ProcessPayment{ + order_id: event.order_id, + amount: event.amount, + saga_id: "saga_#{event.order_id}" + } + + Value.of() + |> Value.state(saga_state) + |> Value.broadcast( + Broadcast.to("payment.commands", payment_command) + ) + end + + action "HandlePaymentEvent", fn ctx, %PaymentProcessed{} = event -> + updated_state = mark_step_completed(ctx.state, :payment) + + case next_step(updated_state) do + :inventory -> + # Trigger inventory reservation + inventory_command = %ReserveInventory{ + order_id: event.order_id, + items: event.order_items, + saga_id: event.saga_id + } + + Value.of() + |> Value.state(updated_state) + |> Value.broadcast( + Broadcast.to("inventory.commands", inventory_command) + ) + + :completed -> + # Saga completed successfully + completion_event = %OrderSagaCompleted{ + order_id: event.order_id, + completed_at: DateTime.utc_now() + } + + Value.of() + |> Value.state(%{updated_state | status: :completed}) + |> Value.broadcast( + Broadcast.to("saga.events", completion_event) + ) + end + end +end +``` + +### Circuit Breaker with Health Broadcasting + +```elixir +defmodule MyApp.Actors.HealthMonitor do + use SpawnSdk.Actor, + name: "health_monitor", + state_type: MyApp.Domain.HealthState + + action "CheckHealth", [timer: 30_000], fn %Context{state: state} = ctx -> + # Check various system components + health_checks = perform_health_checks() + + # Update health state + new_state = update_health_state(state, health_checks) + + # Broadcast health status + health_event = %HealthStatusUpdate{ + timestamp: DateTime.utc_now(), + overall_status: new_state.overall_status, + component_statuses: health_checks, + previous_status: state.overall_status + } + + broadcast = case {state.overall_status, new_state.overall_status} do + {:healthy, :unhealthy} -> + # System became unhealthy - alert immediately + Broadcast.to("system.alerts", %SystemAlert{ + level: :critical, + message: "System health degraded", + timestamp: DateTime.utc_now() + }) + |> Broadcast.to("health.events", health_event) + + {:unhealthy, :healthy} -> + # System recovered - celebrate + Broadcast.to("system.alerts", %SystemAlert{ + level: :info, + message: "System health recovered", + timestamp: DateTime.utc_now() + }) + |> Broadcast.to("health.events", health_event) + + _ -> + # Regular health update + Broadcast.to("health.events", health_event) + end + + Value.of() + |> Value.state(new_state) + |> Value.broadcast(broadcast) + end +end +``` + +## Performance Considerations + +### Efficient Broadcasting + +```elixir +# Good: Batch related events +action "ProcessBulkOrders", fn ctx, orders -> + processed_orders = Enum.map(orders, &process_order/1) + + # Single broadcast with all events + events = Enum.map(processed_orders, fn order -> + %OrderCreated{order_id: order.id, user_id: order.user_id} + end) + + batch_event = %BulkOrdersProcessed{ + orders: events, + processed_at: DateTime.utc_now() + } + + Value.of() + |> Value.response(processed_orders) + |> Value.broadcast(Broadcast.to("order.events", batch_event)) +end + +# Avoid: Too many individual broadcasts +action "ProcessOrdersInefficiently", fn ctx, orders -> + Enum.each(orders, fn order -> + # This creates too many individual broadcasts + Value.of() + |> Value.broadcast(Broadcast.to("order.events", %OrderCreated{...})) + end) +end +``` + +### Channel Organization + +```elixir +# Good: Organized channel hierarchy +"domain.aggregate.event" # user.profile.updated +"domain.action" # payment.processed +"system.component.metric" # database.performance.slow_query + +# Avoid: Flat channel names +"user_updated" +"payment_done" +"slow_db" +``` + +## Testing Broadcasts + +```elixir +defmodule MyApp.BroadcastTest do + use ExUnit.Case + + setup do + # Setup test subscribers to capture broadcast events + {:ok, subscriber_pid} = start_test_subscriber() + + {:ok, _} = SpawnSdk.System.Supervisor.start_link( + system: "test-system", + actors: [MyApp.Actors.OrderProcessor, MyApp.Actors.EmailService], + external_subscribers: [{TestEventSubscriber, [test_pid: subscriber_pid]}] + ) + + %{subscriber: subscriber_pid} + end + + test "order creation broadcasts event", %{subscriber: subscriber} do + # Create order + {:ok, order} = SpawnSdk.invoke("order_processor", + system: "test-system", + action: "create_order", + payload: %OrderData{user_id: "user_123", amount: 100} + ) + + # Wait for broadcast to be received + assert_receive {:broadcast_received, "order.events", %OrderCreated{} = event}, 1000 + + assert event.order_id == order.id + assert event.user_id == "user_123" + assert event.amount == 100 + end +end + +defmodule TestEventSubscriber do + use GenServer + alias SpawnSdk.Channel.Subscriber + + def init([test_pid: test_pid]) do + Subscriber.subscribe("order.events") + Subscriber.subscribe("user.events") + {:ok, %{test_pid: test_pid}} + end + + def handle_info({:receive, payload}, %{test_pid: test_pid} = state) do + send(test_pid, {:broadcast_received, "order.events", payload}) + {:noreply, state} + end +end +``` + +## Best Practices + +### Event Design + +```elixir +# Good: Rich, descriptive events +%OrderCreated{ + order_id: "ord_123", + user_id: "user_456", + amount: 100.00, + currency: "USD", + items: [%{sku: "item_1", quantity: 2}], + created_at: ~U[2023-12-01 10:00:00Z], + version: 1 +} + +# Avoid: Sparse events +%OrderEvent{type: "created", id: "ord_123"} +``` + +### Channel Naming + +```elixir +# Use consistent patterns +"domain.entity.action" # user.profile.updated +"system.component" # database.health +"integration.external" # webhook.shopify +``` + +### Error Handling + +```elixir +# Handle broadcast failures gracefully +action "SafeBroadcast", fn ctx, data -> + try do + Value.of() + |> Value.response(data) + |> Value.broadcast(Broadcast.to("events", data)) + rescue + error -> + Logger.error("Broadcast failed: #{inspect(error)}") + # Continue without broadcast + Value.of(data) + end +end +``` + +## Next Steps + +- Learn about [timers and scheduling](timers.md) +- Explore [event-driven patterns](event_driven_patterns.md) +- See [monitoring broadcasts](../observability/monitoring.md) for production insights \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/guides/advanced/forwards_and_pipes.md b/spawn_sdk/spawn_sdk/guides/advanced/forwards_and_pipes.md new file mode 100644 index 00000000..b3fdeae4 --- /dev/null +++ b/spawn_sdk/spawn_sdk/guides/advanced/forwards_and_pipes.md @@ -0,0 +1,614 @@ +# Forwards and Pipes + +Forwards and pipes allow actors to route requests to other actors as part of their response flow, enabling powerful composition and delegation patterns. + +## Understanding Forwards vs Pipes + +### Forward +- Passes the **original request payload** to the target actor +- The original actor's response is **replaced** by the target actor's response +- The target actor receives exactly what the original actor received + +### Pipe +- Passes the **current response** to the target actor +- The original response is transformed by the target actor +- Creates a processing pipeline + +## Basic Forward Example + +```elixir +defmodule MyApp.Actors.Router do + use SpawnSdk.Actor, + name: "request_router", + stateful: false + + alias SpawnSdk.Flow.Forward + + action "RoutePayment", fn _ctx, %PaymentRequest{} = payment -> + Logger.info("Routing payment request: #{payment.amount}") + + # Determine which payment processor to use + processor = case payment.payment_method do + :credit_card -> "credit_card_processor" + :paypal -> "paypal_processor" + :bank_transfer -> "bank_processor" + end + + Value.of() + |> Value.forward(Forward.to(processor, "process_payment")) + |> Value.void() + end +end + +defmodule MyApp.Actors.CreditCardProcessor do + use SpawnSdk.Actor, + name: "credit_card_processor", + stateful: false + + action "ProcessPayment", fn _ctx, %PaymentRequest{} = payment -> + # The payment request is exactly what was sent to the router + Logger.info("Processing credit card payment: #{payment.amount}") + + result = process_credit_card(payment) + + Value.of(%PaymentResponse{ + status: :success, + transaction_id: result.transaction_id, + amount: payment.amount + }) + end +end +``` + +**Usage:** +```elixir +# Client sends to router, gets response from credit card processor +iex> SpawnSdk.invoke("request_router", + system: "spawn-system", + action: "route_payment", + payload: %PaymentRequest{amount: 100, payment_method: :credit_card} +) +{:ok, %PaymentResponse{status: :success, transaction_id: "txn_123", amount: 100}} +``` + +## Basic Pipe Example + +```elixir +defmodule MyApp.Actors.DataProcessor do + use SpawnSdk.Actor, + name: "data_processor", + stateful: false + + alias SpawnSdk.Flow.Pipe + + action "ProcessData", fn _ctx, %DataRequest{} = data -> + Logger.info("Initial data processing") + + # Transform the data + processed_data = %ProcessedData{ + original_data: data.raw_data, + timestamp: DateTime.utc_now(), + processed_by: "data_processor" + } + + Value.of() + |> Value.response(processed_data) + |> Value.pipe(Pipe.to("data_enricher", "enrich_data")) + |> Value.void() + end +end + +defmodule MyApp.Actors.DataEnricher do + use SpawnSdk.Actor, + name: "data_enricher", + stateful: false + + action "EnrichData", fn _ctx, %ProcessedData{} = data -> + # Receives the ProcessedData from data_processor + Logger.info("Enriching processed data") + + enriched_data = %EnrichedData{ + original_data: data.original_data, + processed_timestamp: data.timestamp, + processed_by: data.processed_by, + enriched_at: DateTime.utc_now(), + enriched_by: "data_enricher", + metadata: %{source: "external_api", confidence: 0.95} + } + + Value.of(enriched_data) + end +end +``` + +**Usage:** +```elixir +# Client sends DataRequest, gets back EnrichedData +iex> SpawnSdk.invoke("data_processor", + system: "spawn-system", + action: "process_data", + payload: %DataRequest{raw_data: "user input"} +) +{:ok, %EnrichedData{ + original_data: "user input", + processed_timestamp: ~U[2023-12-01 10:00:00Z], + enriched_at: ~U[2023-12-01 10:00:01Z], + metadata: %{source: "external_api", confidence: 0.95} +}} +``` + +## Chain Processing with Multiple Pipes + +```elixir +defmodule MyApp.Actors.DocumentProcessor do + use SpawnSdk.Actor, + name: "document_processor", + stateful: false + + action "ProcessDocument", fn _ctx, %DocumentRequest{} = doc -> + # Stage 1: Parse document + parsed_doc = %ParsedDocument{ + content: parse_content(doc.raw_content), + format: doc.format, + size: byte_size(doc.raw_content) + } + + Value.of() + |> Value.response(parsed_doc) + |> Value.pipe(Pipe.to("content_analyzer", "analyze_content")) + |> Value.void() + end +end + +defmodule MyApp.Actors.ContentAnalyzer do + use SpawnSdk.Actor, + name: "content_analyzer", + stateful: false + + action "AnalyzeContent", fn _ctx, %ParsedDocument{} = doc -> + # Stage 2: Analyze content + analyzed_doc = %AnalyzedDocument{ + content: doc.content, + format: doc.format, + size: doc.size, + sentiment: analyze_sentiment(doc.content), + keywords: extract_keywords(doc.content), + language: detect_language(doc.content) + } + + Value.of() + |> Value.response(analyzed_doc) + |> Value.pipe(Pipe.to("content_summarizer", "summarize_content")) + |> Value.void() + end +end + +defmodule MyApp.Actors.ContentSummarizer do + use SpawnSdk.Actor, + name: "content_summarizer", + stateful: false + + action "SummarizeContent", fn _ctx, %AnalyzedDocument{} = doc -> + # Stage 3: Generate summary + summary = %DocumentSummary{ + original_size: doc.size, + format: doc.format, + sentiment: doc.sentiment, + keywords: doc.keywords, + language: doc.language, + summary: generate_summary(doc.content), + processing_chain: ["document_processor", "content_analyzer", "content_summarizer"] + } + + Value.of(summary) + end +end +``` + +## Conditional Forwarding + +```elixir +defmodule MyApp.Actors.SmartRouter do + use SpawnSdk.Actor, + name: "smart_router", + state_type: MyApp.Domain.RouterState + + action "RouteRequest", fn %Context{state: state} = ctx, request -> + # Update routing metrics + new_state = update_routing_stats(state, request) + + # Determine routing based on load, request type, etc. + target = determine_target(request, state) + + case target do + {:forward, actor, action} -> + Value.of() + |> Value.state(new_state) + |> Value.forward(Forward.to(actor, action)) + |> Value.void() + + {:pipe, actor, action} -> + # Transform request before piping + transformed_request = transform_request(request) + + Value.of() + |> Value.response(transformed_request) + |> Value.state(new_state) + |> Value.pipe(Pipe.to(actor, action)) + |> Value.void() + + {:local, response} -> + # Handle locally + Value.of(response, new_state) + end + end + + defp determine_target(%PriorityRequest{}, %{high_priority_actor: actor}) do + {:forward, actor, "handle_priority"} + end + + defp determine_target(%BatchRequest{}, _state) do + {:pipe, "batch_processor", "process_batch"} + end + + defp determine_target(request, %{load: load}) when load > 0.8 do + {:forward, "overflow_handler", "handle_overflow"} + end + + defp determine_target(request, _state) do + {:local, %StandardResponse{handled_by: "smart_router"}} + end +end +``` + +## Error Handling in Forwards and Pipes + +```elixir +defmodule MyApp.Actors.ResilientProcessor do + use SpawnSdk.Actor, + name: "resilient_processor", + state_type: MyApp.Domain.ProcessorState + + action "ProcessSafely", fn %Context{state: state} = ctx, request -> + case validate_request(request) do + :ok -> + # Normal processing path + process_and_forward(ctx, request) + + {:error, :invalid_format} -> + # Forward to format fixer + Value.of() + |> Value.forward(Forward.to("format_fixer", "fix_format")) + |> Value.void() + + {:error, :rate_limited} -> + # Delay and forward to rate limited queue + delayed_request = %{request | retry_count: (request.retry_count || 0) + 1} + + Value.of() + |> Value.response(delayed_request) + |> Value.pipe(Pipe.to("rate_limited_queue", "queue_request")) + |> Value.void() + + {:error, reason} -> + # Handle locally with error response + error_response = %ErrorResponse{ + error: reason, + handled_by: "resilient_processor" + } + Value.of(error_response, state) + end + end + + defp process_and_forward(ctx, request) do + case can_process_locally?(ctx.state) do + true -> + # Process locally + result = process_request(request) + Value.of(result, ctx.state) + + false -> + # Forward to specialized processor + processor = select_processor(request) + + Value.of() + |> Value.forward(Forward.to(processor, "process")) + |> Value.void() + end + end +end +``` + +## Dynamic Routing Patterns + +### Load Balancer Actor + +```elixir +defmodule MyApp.Actors.LoadBalancer do + use SpawnSdk.Actor, + name: "load_balancer", + state_type: MyApp.Domain.LoadBalancerState + + action "DistributeLoad", fn %Context{state: state} = ctx, request -> + # Select least loaded worker + {selected_worker, updated_state} = select_worker(state) + + Logger.info("Routing to #{selected_worker}") + + Value.of() + |> Value.state(updated_state) + |> Value.forward(Forward.to(selected_worker, "process_work")) + |> Value.void() + end + + defp select_worker(state) do + worker = state.workers + |> Enum.min_by(& &1.current_load) + + updated_workers = Enum.map(state.workers, fn w -> + if w.name == worker.name do + %{w | current_load: w.current_load + 1} + else + w + end + end) + + {worker.name, %{state | workers: updated_workers}} + end +end +``` + +### Circuit Breaker with Fallback + +```elixir +defmodule MyApp.Actors.CircuitBreakerRouter do + use SpawnSdk.Actor, + name: "circuit_breaker_router", + state_type: MyApp.Domain.CircuitBreakerState + + action "RouteWithCircuitBreaker", fn %Context{state: state} = ctx, request -> + case state.circuit_state do + :closed -> + # Circuit closed - try primary service + attempt_primary_routing(ctx, request) + + :half_open -> + # Circuit half-open - try primary with monitoring + attempt_primary_with_monitoring(ctx, request) + + :open -> + # Circuit open - use fallback + Logger.warn("Circuit breaker open, using fallback") + + Value.of() + |> Value.forward(Forward.to("fallback_service", "handle_fallback")) + |> Value.void() + end + end + + defp attempt_primary_routing(ctx, request) do + primary_service = select_primary_service(request) + + Value.of() + |> Value.state(ctx.state) + |> Value.forward(Forward.to(primary_service, "process")) + |> Value.void() + end +end +``` + +## Advanced Pipe Patterns + +### Data Transformation Pipeline + +```elixir +defmodule MyApp.Actors.ETLPipeline do + use SpawnSdk.Actor, + name: "etl_pipeline", + stateful: false + + action "StartETL", fn _ctx, %ETLRequest{} = request -> + # Extract phase + extracted_data = %ExtractedData{ + source: request.source, + raw_data: extract_data(request.source_config), + extracted_at: DateTime.utc_now() + } + + Value.of() + |> Value.response(extracted_data) + |> Value.pipe(Pipe.to("data_transformer", "transform")) + |> Value.void() + end +end + +defmodule MyApp.Actors.DataTransformer do + use SpawnSdk.Actor, + name: "data_transformer", + stateful: false + + action "Transform", fn _ctx, %ExtractedData{} = data -> + # Transform phase + transformed_data = %TransformedData{ + source: data.source, + raw_data: data.raw_data, + extracted_at: data.extracted_at, + transformed_data: transform_data(data.raw_data), + transformed_at: DateTime.utc_now() + } + + Value.of() + |> Value.response(transformed_data) + |> Value.pipe(Pipe.to("data_loader", "load")) + |> Value.void() + end +end + +defmodule MyApp.Actors.DataLoader do + use SpawnSdk.Actor, + name: "data_loader", + stateful: false + + action "Load", fn _ctx, %TransformedData{} = data -> + # Load phase + load_result = load_data(data.transformed_data) + + final_result = %ETLResult{ + source: data.source, + extracted_at: data.extracted_at, + transformed_at: data.transformed_at, + loaded_at: DateTime.utc_now(), + records_processed: length(data.transformed_data), + load_result: load_result + } + + Value.of(final_result) + end +end +``` + +### Conditional Pipeline Branching + +```elixir +defmodule MyApp.Actors.SmartPipeline do + use SpawnSdk.Actor, + name: "smart_pipeline", + stateful: false + + action "ProcessSmartly", fn _ctx, request -> + # Analyze request to determine processing path + analysis = analyze_request(request) + + processed_request = %ProcessedRequest{ + original: request, + analysis: analysis, + pipeline_step: 1 + } + + # Choose pipeline path based on analysis + next_actor = case analysis.complexity do + :simple -> "simple_processor" + :complex -> "complex_processor" + :ai_required -> "ai_processor" + end + + Value.of() + |> Value.response(processed_request) + |> Value.pipe(Pipe.to(next_actor, "process_request")) + |> Value.void() + end +end +``` + +## Testing Forwards and Pipes + +```elixir +defmodule MyApp.ForwardPipeTest do + use ExUnit.Case + + setup do + # Setup test actor system with all actors in the chain + {:ok, _} = SpawnSdk.System.Supervisor.start_link( + system: "test-system", + actors: [ + MyApp.Actors.DataProcessor, + MyApp.Actors.DataEnricher, + MyApp.Actors.MockTargetActor + ] + ) + + :ok + end + + test "forward passes original payload" do + # Test forward behavior + {:ok, response} = SpawnSdk.invoke("request_router", + system: "test-system", + action: "route_payment", + payload: %PaymentRequest{amount: 50, payment_method: :credit_card} + ) + + # Response should come from the forwarded-to actor + assert %PaymentResponse{amount: 50} = response + assert response.processed_by == "credit_card_processor" + end + + test "pipe transforms response through chain" do + # Test pipe behavior + {:ok, response} = SpawnSdk.invoke("data_processor", + system: "test-system", + action: "process_data", + payload: %DataRequest{raw_data: "test data"} + ) + + # Response should be final transformed result + assert %EnrichedData{} = response + assert response.original_data == "test data" + assert response.enriched_by == "data_enricher" + end +end +``` + +## Best Practices + +### When to Use Forward vs Pipe + +**Use Forward when:** +- Routing requests to appropriate handlers +- Load balancing across similar services +- Delegating to specialized processors +- The target needs the original request context + +**Use Pipe when:** +- Building processing pipelines +- Transforming data through multiple stages +- Each stage adds value to the previous result +- You want to compose operations + +### Performance Considerations + +```elixir +# Good: Minimize pipe chain depth +action "ProcessEfficiently", fn _ctx, data -> + # Do as much processing as possible in one step + result = comprehensive_processing(data) + + Value.of() + |> Value.response(result) + |> Value.pipe(Pipe.to("finalizer", "finalize")) + |> Value.void() +end + +# Avoid: Excessive pipe chaining +action "ProcessInefficiently", fn _ctx, data -> + Value.of() + |> Value.response(step1(data)) + |> Value.pipe(Pipe.to("step2_actor", "process")) # Too many steps + |> Value.void() +end +``` + +### Error Handling + +```elixir +# Always handle the case where forwarding/piping might fail +action "SafeForward", fn ctx, request -> + case validate_target_availability("target_actor") do + :ok -> + Value.of() + |> Value.forward(Forward.to("target_actor", "process")) + |> Value.void() + + :unavailable -> + # Fallback to local processing + local_result = process_locally(request) + Value.of(local_result, ctx.state) + end +end +``` + +## Next Steps + +- Learn about [broadcast patterns](broadcast.md) +- Explore [event-driven architectures](event_driven_patterns.md) +- See [performance optimization](../observability/performance.md) for pipeline tuning \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/guides/advanced/side_effects.md b/spawn_sdk/spawn_sdk/guides/advanced/side_effects.md new file mode 100644 index 00000000..9ff83d36 --- /dev/null +++ b/spawn_sdk/spawn_sdk/guides/advanced/side_effects.md @@ -0,0 +1,506 @@ +# Side Effects + +Side effects allow actors to trigger actions on other actors asynchronously as part of their response flow. This is a powerful pattern for event-driven architectures and decoupled systems. + +## Understanding Side Effects + +Side effects are **fire-and-forget** operations that: +- Execute asynchronously after the main action completes +- Don't affect the request-response flow +- Don't return responses to the triggering actor +- Can be delayed or scheduled for specific times + +## Basic Side Effect + +```elixir +defmodule MyApp.Actors.OrderProcessor do + use SpawnSdk.Actor, + name: "order_processor", + state_type: MyApp.Domain.OrderState + + alias SpawnSdk.Flow.SideEffect + alias MyApp.Messages.{OrderCreated, NotificationRequest} + + action "ProcessOrder", fn %Context{} = ctx, %OrderCreated{} = order -> + # Main business logic + processed_order = process_order_logic(order) + new_state = %OrderState{orders: [processed_order | ctx.state.orders]} + + # Return response with side effect + Value.of() + |> Value.response(processed_order) + |> Value.state(new_state) + |> Value.effects([ + # Send notification asynchronously + SideEffect.of() + |> SideEffect.effect( + "notification_service", + :send_notification, + %NotificationRequest{ + user_id: order.user_id, + message: "Order #{order.id} has been processed" + } + ) + ]) + end + + defp process_order_logic(order) do + # Your business logic here + %{order | status: :processed, processed_at: DateTime.utc_now()} + end +end +``` + +## Multiple Side Effects + +You can chain multiple side effects together: + +```elixir +action "CompletePayment", fn ctx, payment_data -> + # Process payment + payment_result = process_payment(payment_data) + + Value.of() + |> Value.response(payment_result) + |> Value.state(update_payment_state(ctx.state, payment_result)) + |> Value.effects([ + # Update inventory + SideEffect.of() + |> SideEffect.effect("inventory_service", :reduce_stock, %{ + product_id: payment_data.product_id, + quantity: payment_data.quantity + }) + # Send confirmation email + |> SideEffect.effect("email_service", :send_confirmation, %{ + user_id: payment_data.user_id, + payment_id: payment_result.id + }) + # Update analytics + |> SideEffect.effect("analytics_service", :record_purchase, %{ + user_id: payment_data.user_id, + amount: payment_data.amount, + timestamp: DateTime.utc_now() + }) + ]) +end +``` + +## Delayed Side Effects + +Execute side effects after a specified delay: + +```elixir +action "CreateUserAccount", fn ctx, user_data -> + # Create account immediately + new_user = create_user_account(user_data) + + Value.of() + |> Value.response(new_user) + |> Value.state(add_user_to_state(ctx.state, new_user)) + |> Value.effects([ + # Send welcome email after 5 minutes + SideEffect.of() + |> SideEffect.effect( + "email_service", + :send_welcome_email, + %{user_id: new_user.id}, + delay: 300_000 # 5 minutes + ) + # Schedule follow-up email after 24 hours + |> SideEffect.effect( + "email_service", + :send_followup_email, + %{user_id: new_user.id}, + delay: 86_400_000 # 24 hours + ) + ]) +end +``` + +## Scheduled Side Effects + +Schedule side effects for specific dates/times: + +```elixir +action "ScheduleReminder", fn ctx, reminder_data -> + # Store reminder immediately + new_state = add_reminder(ctx.state, reminder_data) + + Value.of() + |> Value.response(%{reminder_id: reminder_data.id, scheduled: true}) + |> Value.state(new_state) + |> Value.effects([ + # Execute reminder at specific time + SideEffect.of() + |> SideEffect.effect( + "notification_service", + :send_reminder, + %{ + user_id: reminder_data.user_id, + message: reminder_data.message + }, + scheduled_to: reminder_data.scheduled_time + ) + ]) +end +``` + +## Side Effect Patterns + +### Event Sourcing Pattern + +```elixir +defmodule MyApp.Actors.AggregateRoot do + use SpawnSdk.Actor, + name: "user_aggregate", + kind: :unnamed, + state_type: MyApp.Domain.UserAggregate + + action "UpdateProfile", fn ctx, update_data -> + # Apply business logic + {updated_state, events} = apply_profile_update(ctx.state, update_data) + + # Build side effects from domain events + effects = events + |> Enum.reduce(SideEffect.of(), fn event, acc -> + case event do + %ProfileUpdated{} = event -> + acc |> SideEffect.effect("event_store", :store_event, event) + + %EmailChanged{} = event -> + acc + |> SideEffect.effect("event_store", :store_event, event) + |> SideEffect.effect("email_service", :send_verification, %{ + user_id: event.user_id, + email: event.new_email + }) + end + end) + + Value.of() + |> Value.response(%{success: true, events: length(events)}) + |> Value.state(updated_state) + |> Value.effects(effects) + end +end +``` + +### Saga Pattern + +```elixir +defmodule MyApp.Actors.OrderSaga do + use SpawnSdk.Actor, + name: "order_saga", + kind: :unnamed, + state_type: MyApp.Domain.SagaState + + action "StartOrderProcess", fn ctx, order_data -> + # Initialize saga state + saga_state = %SagaState{ + order_id: order_data.id, + step: :payment_processing, + steps_completed: [], + compensations: [] + } + + Value.of() + |> Value.response(%{saga_started: true, saga_id: order_data.id}) + |> Value.state(saga_state) + |> Value.effects([ + # Start first step + SideEffect.of() + |> SideEffect.effect("payment_service", :process_payment, order_data) + ]) + end + + action "PaymentCompleted", fn ctx, payment_result -> + next_state = %{ctx.state | + step: :inventory_reservation, + steps_completed: [:payment_processing | ctx.state.steps_completed] + } + + Value.of() + |> Value.state(next_state) + |> Value.effects([ + # Continue to next step + SideEffect.of() + |> SideEffect.effect("inventory_service", :reserve_items, %{ + order_id: ctx.state.order_id, + items: payment_result.items + }) + ]) + end + + action "PaymentFailed", fn ctx, failure_reason -> + # Saga failed - no compensation needed for first step + failed_state = %{ctx.state | step: :failed, failure_reason: failure_reason} + + Value.of() + |> Value.state(failed_state) + |> Value.effects([ + SideEffect.of() + |> SideEffect.effect("notification_service", :send_failure_notification, %{ + order_id: ctx.state.order_id, + reason: failure_reason + }) + ]) + end +end +``` + +### Circuit Breaker Pattern + +```elixir +defmodule MyApp.Actors.ResilientProcessor do + use SpawnSdk.Actor, + name: "resilient_processor", + state_type: MyApp.Domain.ProcessorState + + action "ProcessWithFallback", fn ctx, data -> + case ctx.state.circuit_state do + :closed -> + # Normal processing with fallback side effect + Value.of() + |> Value.response(process_normally(data)) + |> Value.state(ctx.state) + |> Value.effects([ + # If main processing fails, this fallback will be triggered + SideEffect.of() + |> SideEffect.effect( + "fallback_processor", + :process_fallback, + data, + delay: 1000 # Small delay for fallback + ) + ]) + + :open -> + # Circuit is open, use fallback immediately + Value.of() + |> Value.response(%{status: :fallback_used}) + |> Value.state(ctx.state) + |> Value.effects([ + SideEffect.of() + |> SideEffect.effect("fallback_processor", :process_fallback, data) + ]) + end + end +end +``` + +## Error Handling in Side Effects + +Side effects are fire-and-forget, but you can implement error handling patterns: + +### Dead Letter Queue Pattern + +```elixir +defmodule MyApp.Actors.ReliableProcessor do + use SpawnSdk.Actor, + name: "reliable_processor", + state_type: MyApp.Domain.ProcessorState + + action "ProcessReliably", fn ctx, data -> + Value.of() + |> Value.response(process_data(data)) + |> Value.state(ctx.state) + |> Value.effects([ + # Primary side effect + SideEffect.of() + |> SideEffect.effect("primary_service", :process, data) + # Delayed verification and retry + |> SideEffect.effect( + "verification_service", + :verify_processing, + %{data_id: data.id, expected_completion: DateTime.add(DateTime.utc_now(), 30)}, + delay: 35_000 # Check after 35 seconds + ) + ]) + end +end + +defmodule MyApp.Actors.VerificationService do + use SpawnSdk.Actor, + name: "verification_service", + stateful: false + + action "VerifyProcessing", fn _ctx, verification_data -> + case check_processing_completion(verification_data.data_id) do + :completed -> + Value.of(%{status: :verified}) + + :failed -> + # Send to dead letter queue or retry + Value.of() + |> Value.effects([ + SideEffect.of() + |> SideEffect.effect("dead_letter_queue", :store_failed, verification_data) + ]) + end + end +end +``` + +## Advanced Side Effect Patterns + +### Event Bus Pattern + +```elixir +defmodule MyApp.Actors.EventPublisher do + use SpawnSdk.Actor, + name: "event_publisher", + stateful: false + + action "PublishEvent", fn _ctx, event_data -> + # Publish to multiple subscribers via side effects + subscribers = get_subscribers_for_event(event_data.event_type) + + effects = subscribers + |> Enum.reduce(SideEffect.of(), fn subscriber, acc -> + acc |> SideEffect.effect(subscriber.actor, subscriber.action, event_data) + end) + + Value.of() + |> Value.response(%{published: true, subscriber_count: length(subscribers)}) + |> Value.effects(effects) + end + + defp get_subscribers_for_event("user.created") do + [ + %{actor: "email_service", action: "send_welcome"}, + %{actor: "analytics_service", action: "track_signup"}, + %{actor: "recommendation_service", action: "initialize_profile"} + ] + end +end +``` + +### Distributed State Synchronization + +```elixir +defmodule MyApp.Actors.DistributedCache do + use SpawnSdk.Actor, + name: "distributed_cache", + kind: :unnamed, + state_type: MyApp.Domain.CacheState + + action "UpdateCache", fn ctx, update_data -> + # Update local state + new_state = apply_cache_update(ctx.state, update_data) + + # Synchronize with other cache instances + other_instances = get_other_cache_instances() + + sync_effects = other_instances + |> Enum.reduce(SideEffect.of(), fn instance, acc -> + acc |> SideEffect.effect(instance, :sync_update, update_data) + end) + + Value.of() + |> Value.response(%{updated: true}) + |> Value.state(new_state) + |> Value.effects(sync_effects) + end +end +``` + +## Best Practices + +### Side Effect Design Principles + +1. **Keep side effects idempotent** - They may be retried +2. **Make them atomic** - Each side effect should be a single, complete operation +3. **Use appropriate delays** - Don't overwhelm target actors +4. **Handle failures gracefully** - Consider what happens if side effects fail + +### Performance Considerations + +```elixir +# Good: Batch related side effects +SideEffect.of() +|> SideEffect.effect("batch_processor", :process_batch, %{ + items: [item1, item2, item3] +}) + +# Avoid: Too many individual side effects +SideEffect.of() +|> SideEffect.effect("processor", :process_item, item1) +|> SideEffect.effect("processor", :process_item, item2) +|> SideEffect.effect("processor", :process_item, item3) +``` + +### Monitoring Side Effects + +```elixir +action "ProcessWithMonitoring", fn ctx, data -> + correlation_id = UUID.uuid4() + + Value.of() + |> Value.response(%{correlation_id: correlation_id}) + |> Value.state(ctx.state) + |> Value.effects([ + # Main side effect with correlation ID + SideEffect.of() + |> SideEffect.effect("target_service", :process, %{ + data: data, + correlation_id: correlation_id + }) + # Monitoring side effect + |> SideEffect.effect("monitoring_service", :track_side_effect, %{ + correlation_id: correlation_id, + source_actor: ctx.actor_name, + target_actor: "target_service", + timestamp: DateTime.utc_now() + }) + ]) +end +``` + +## Testing Side Effects + +```elixir +defmodule MyApp.ActorTest do + use ExUnit.Case + + test "side effects are triggered correctly" do + # Setup test actors + {:ok, _} = SpawnSdk.spawn_actor("test_processor", + system: "test-system", + actor: "order_processor" + ) + + # Mock the side effect target + {:ok, _} = SpawnSdk.spawn_actor("mock_notification_service", + system: "test-system", + actor: "mock_notification_service" + ) + + # Invoke main action + {:ok, response} = SpawnSdk.invoke("test_processor", + system: "test-system", + action: "process_order", + payload: %OrderCreated{id: "123", user_id: "user1"} + ) + + # Verify main response + assert response.status == :processed + + # Wait for side effect to complete (in real tests, use proper synchronization) + :timer.sleep(100) + + # Verify side effect was executed + {:ok, notification_state} = SpawnSdk.invoke("mock_notification_service", + system: "test-system", + action: "get" + ) + + assert length(notification_state.sent_notifications) == 1 + end +end +``` + +## Next Steps + +- Learn about [forward and pipe patterns](forwards_and_pipes.md) +- Explore [broadcast messaging](broadcast.md) +- See [monitoring and debugging](../observability/debugging.md) for side effect troubleshooting \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/guides/basic/actor_configuration.md b/spawn_sdk/spawn_sdk/guides/basic/actor_configuration.md new file mode 100644 index 00000000..e2691f22 --- /dev/null +++ b/spawn_sdk/spawn_sdk/guides/basic/actor_configuration.md @@ -0,0 +1,325 @@ +# Actor Configuration + +This guide covers all the configuration options available when defining Spawn actors. + +## Basic Configuration Options + +### Actor Definition + +```elixir +defmodule MyApp.Actors.ConfiguredActor do + use SpawnSdk.Actor, + name: "configured_actor", # Actor name in the system + kind: :named, # :named | :unnamed + stateful: true, # true | false + state_type: MyApp.Domain.MyState, # Protobuf type or :json + deactivate_timeout: 30_000, # Milliseconds before deactivation + snapshot_timeout: 2_000, # Milliseconds between snapshots + channels: [ # Channels for broadcast messages + {"my.channel", "handle_broadcast"} + ] + + # Actor implementation... +end +``` + +## Configuration Parameters + +### name +- **Type:** String +- **Default:** Module name (`__MODULE__`) +- **Description:** The unique identifier for this actor in the system + +```elixir +# Using custom name +name: "my_custom_actor" + +# Using module name (default) +# name defaults to "MyApp.Actors.ConfiguredActor" +``` + +### kind +- **Type:** `:named | :unnamed` +- **Default:** `:named` +- **Description:** Determines how the actor is instantiated + +```elixir +# Named actor - created at system startup +kind: :named + +# Unnamed actor - created dynamically +kind: :unnamed +``` + +### stateful +- **Type:** `true | false` +- **Default:** `true` +- **Description:** Whether the actor maintains state between invocations + +```elixir +# Stateful actor - maintains state +stateful: true + +# Stateless actor - no state persistence +stateful: false +``` + +### state_type +- **Type:** Module (Protobuf) | `:json` +- **Default:** None (required for stateful actors) +- **Description:** The type used for actor state serialization + +```elixir +# Using Protobuf type +state_type: MyApp.Domain.UserState + +# Using JSON (less efficient) +state_type: :json +``` + +### deactivate_timeout +- **Type:** Integer (milliseconds) +- **Default:** 30,000 (30 seconds) +- **Description:** Time of inactivity before actor is deactivated + +```elixir +# 5 minutes +deactivate_timeout: 300_000 + +# Never deactivate (24 hours) +deactivate_timeout: 86_400_000 + +# Quick deactivation for testing +deactivate_timeout: 5_000 +``` + +### snapshot_timeout +- **Type:** Integer (milliseconds) +- **Default:** 2,000 (2 seconds) +- **Description:** Interval for automatic state snapshots + +```elixir +# Snapshot every 10 seconds +snapshot_timeout: 10_000 + +# Frequent snapshots for critical data +snapshot_timeout: 1_000 + +# Less frequent snapshots for performance +snapshot_timeout: 30_000 +``` + +### channels +- **Type:** List of tuples `{channel_name, action_name}` +- **Default:** `[]` +- **Description:** Channels this actor subscribes to for broadcast messages + +```elixir +# Subscribe to multiple channels +channels: [ + {"user.events", "handle_user_event"}, + {"system.notifications", "handle_notification"} +] + +# Simple channel subscription (uses "receive" action) +channels: ["my.channel"] +``` + +## Action Configuration + +### Basic Actions + +```elixir +# Simple action +action "MyAction", fn ctx, payload -> + # Action implementation +end + +# Action with multiple clauses +action "ProcessMessage" do + # Pattern matching on different message types + fn ctx, %MyApp.Events.UserCreated{} = event -> + # Handle user created + end + + fn ctx, %MyApp.Events.UserDeleted{} = event -> + # Handle user deleted + end +end +``` + +### Timer Actions + +```elixir +# Timer action - executes every 30 seconds +action "Heartbeat", [timer: 30_000], fn %Context{} = ctx -> + Logger.info("Heartbeat from #{ctx.caller}") + Value.of() +end + +# Timer with immediate execution +action "Initialize", [timer: 60_000, immediate: true], fn ctx -> + # Runs immediately when actor starts, then every minute +end +``` + +### Initialization Actions + +Special action names that trigger during actor initialization: + +```elixir +# Standard initialization actions +init fn ctx -> + # Called when actor starts +end + +# Alternative names (any of these work) +action "init", fn ctx -> ... end +action "Init", fn ctx -> ... end +action "setup", fn ctx -> ... end +action "Setup", fn ctx -> ... end +``` + +## Environment Configuration + +Spawn can be configured using environment variables for deployment compatibility: + +### Database Configuration + +```bash +# Database type +export PROXY_DATABASE_TYPE=postgres # postgres | mysql + +# State store encryption +export SPAWN_STATESTORE_KEY=your-encryption-key-here + +# Function port +export USER_FUNCTION_PORT=8092 +``` + +### Elixir Configuration + +Alternatively, use traditional Elixir configuration: + +```elixir +# config/config.exs +config :spawn, + pubsub_group: :my_actor_channel # Default: :actor_channel +``` + +## Performance Considerations + +### For High-Traffic Actors + +```elixir +use SpawnSdk.Actor, + name: "high_traffic_actor", + stateful: true, + deactivate_timeout: 300_000, # Longer timeout for active actors + snapshot_timeout: 1_000 # More frequent snapshots +``` + +### For Memory-Sensitive Actors + +```elixir +use SpawnSdk.Actor, + name: "memory_sensitive_actor", + stateful: true, + deactivate_timeout: 10_000, # Quick deactivation + snapshot_timeout: 30_000 # Less frequent snapshots +``` + +### For Stateless Workers + +```elixir +use SpawnSdk.Actor, + name: "worker_actor", + kind: :unnamed, + stateful: false # No state = better performance +``` + +## Best Practices + +### Naming Conventions + +```elixir +# Use descriptive names +name: "user_session_manager" +name: "order_processor" +name: "payment_validator" + +# For unnamed actors, use type names +name: "user_session" # Instances: user_session_123, user_session_456 +name: "shopping_cart" # Instances: shopping_cart_abc, shopping_cart_xyz +``` + +### State Types + +```elixir +# Prefer Protobuf for performance +state_type: MyApp.Domain.UserState + +# Use JSON only for rapid prototyping +state_type: :json +``` + +### Timeout Configuration + +```elixir +# For frequently accessed actors +deactivate_timeout: 600_000 # 10 minutes + +# For occasionally accessed actors +deactivate_timeout: 60_000 # 1 minute + +# For cached data actors +deactivate_timeout: 1_800_000 # 30 minutes +``` + +## Common Patterns + +### User Session Actor + +```elixir +defmodule MyApp.Actors.UserSession do + use SpawnSdk.Actor, + name: "user_session", + kind: :unnamed, + state_type: MyApp.Domain.SessionState, + deactivate_timeout: 1_800_000, # 30 min session timeout + snapshot_timeout: 10_000 # Save every 10 seconds +end +``` + +### System Service Actor + +```elixir +defmodule MyApp.Actors.ConfigService do + use SpawnSdk.Actor, + name: "config_service", + kind: :named, + state_type: MyApp.Domain.ConfigState, + deactivate_timeout: 86_400_000, # Never deactivate + snapshot_timeout: 60_000 # Save every minute +end +``` + +### Event Processor + +```elixir +defmodule MyApp.Actors.EventProcessor do + use SpawnSdk.Actor, + name: "event_processor", + kind: :unnamed, + stateful: false, # Stateless for scalability + channels: [ + {"events", "process_event"} + ] +end +``` + +## Next Steps + +- Learn about the [client API](client_api.md) +- Explore [advanced features](../advanced/) for complex interactions +- See [deployment guide](deployment.md) for production setup \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/guides/basic/actor_types.md b/spawn_sdk/spawn_sdk/guides/basic/actor_types.md new file mode 100644 index 00000000..4904442b --- /dev/null +++ b/spawn_sdk/spawn_sdk/guides/basic/actor_types.md @@ -0,0 +1,220 @@ +# Actor Types + +Spawn supports different types of actors to fit various use cases. This guide covers all available actor types and when to use each. + +## Named Actors + +Named actors are known at compile time and are registered with a specific name in the system. + +### Basic Named Actor + +```elixir +defmodule SpawnSdkExample.Actors.NamedActor do + use SpawnSdk.Actor, + name: "jose", # Default is Full Qualified Module name + kind: :named, # Default is already :named + stateful: true, # Default is already true + state_type: Io.Eigr.Spawn.Example.MyState, + deactivate_timeout: 30_000, + snapshot_timeout: 2_000 + + require Logger + + alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} + + init fn %Context{state: state} = ctx -> + Logger.info("[jose] Received InitRequest. Context: #{inspect(ctx)}") + + Value.of() + |> Value.state(state) + end + + action "Sum", fn %Context{state: state} = ctx, %MyBusinessMessage{value: value} = data -> + Logger.info("Received Request: #{inspect(data)}. Context: #{inspect(ctx)}") + + new_value = if is_nil(state), do: value, else: (state.value || 0) + value + + Value.of(%MyBusinessMessage{value: new_value}, %MyState{value: new_value}) + end +end +``` + +**When to use Named Actors:** +- When you have a fixed number of actors that are known at design time +- Singleton actors that represent unique entities in your domain +- Actors that should be immediately available when the system starts + +## Unnamed Actors + +Unnamed actors are defined at compile time but only instantiated dynamically at runtime when associated with an identifier. + +### Basic Unnamed Actor + +```elixir +defmodule SpawnSdkExample.Actors.UnnamedActor do + use SpawnSdk.Actor, + name: "unnamed_actor", + kind: :unnamed, + state_type: Io.Eigr.Spawn.Example.MyState + + require Logger + + alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} + + action "Sum", fn %Context{state: state} = ctx, %MyBusinessMessage{value: value} = data -> + Logger.info("Received Request: #{inspect(data)}. Context: #{inspect(ctx)}") + + new_value = if is_nil(state), do: value, else: (state.value || 0) + value + + Value.of(%MyBusinessMessage{value: new_value}, %MyState{value: new_value}) + end +end +``` + +### Using Unnamed Actors + +```elixir +# Spawn an instance with a specific name +iex> SpawnSdk.spawn_actor("user_123", system: "spawn-system", actor: "unnamed_actor") +:ok + +# Now you can invoke it +iex> SpawnSdk.invoke("user_123", system: "spawn-system", action: "sum", payload: %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 1}) +{:ok, %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 1}} + +# Or invoke without spawning (lazy spawning) +iex> SpawnSdk.invoke("user_456", ref: "unnamed_actor", system: "spawn-system", action: "sum", payload: %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 1}) +{:ok, %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 1}} +``` + +**When to use Unnamed Actors:** +- When you need to create actors dynamically based on user data +- For modeling entities that have many instances (users, sessions, devices, etc.) +- When you don't know how many actors you'll need at compile time + +## Pooled Actors + +Pooled actors are unnamed actors that can be invoked without specifying a particular instance. The system automatically load-balances requests across multiple instances. + +### Basic Pooled Actor + +```elixir +defmodule SpawnSdkExample.Actors.PooledActor do + use SpawnSdk.Actor, + name: "pooled_actor", + kind: :unnamed, + stateful: false # Usually stateless for load balancing + + require Logger + + alias Io.Eigr.Spawn.Example.MyBusinessMessage + + action "Process", fn _ctx, %MyBusinessMessage{value: value} = data -> + Logger.info("Processing request: #{inspect(data)} in #{inspect(self())}") + + # Simulate some processing + :timer.sleep(100) + + Value.of(%MyBusinessMessage{value: value * 2}) + end +end +``` + +### Using Pooled Actors + +```elixir +# Invoke with pooled: true - system handles load balancing +iex> SpawnSdk.invoke("pooled_actor", system: "spawn-system", action: "process", payload: %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 5}, pooled: true) +{:ok, %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 10}} +``` + +**When to use Pooled Actors:** +- For stateless operations that can be distributed across multiple instances +- When you need horizontal scaling for CPU-intensive tasks +- For processing queues or handling high-throughput operations + +## Timer Actors + +Actors can declare actions that execute periodically as timers. + +### Timer Actor Example + +```elixir +defmodule SpawnSdkExample.Actors.ClockActor do + use SpawnSdk.Actor, + name: "clock_actor", + state_type: Io.Eigr.Spawn.Example.MyState, + deactivate_timeout: 86_400_000 + + require Logger + + alias Io.Eigr.Spawn.Example.MyState + + action "Clock", [timer: 15_000], fn %Context{state: state} = ctx -> + Logger.info("[clock] Clock Actor Received Request. Context: #{inspect(ctx)}") + + new_value = if is_nil(state), do: 0, else: state.value + 1 + new_state = MyState.new(value: new_value) + + Value.of() + |> Value.state(new_state) + end +end +``` + +**When to use Timer Actors:** +- For periodic cleanup tasks +- Heartbeat or health check operations +- Scheduled data processing +- Periodic state synchronization + +> **Note:** Timer actions are ephemeral and only exist while there is at least one active VM in the cluster. + +## Task Actors (Stateless) + +Task actors are designed for stateless operations and don't maintain state between invocations. + +### Task Actor Example + +```elixir +defmodule SpawnSdkExample.Actors.TaskActor do + use SpawnSdk.Actor, + name: "task_actor", + stateful: false # No state management + + require Logger + + alias Io.Eigr.Spawn.Example.MyBusinessMessage + + action "Calculate", fn _ctx, %MyBusinessMessage{value: value} = data -> + Logger.info("Calculating: #{inspect(data)}") + + # Perform stateless calculation + result = value * value + 10 + + Value.of(%MyBusinessMessage{value: result}) + end +end +``` + +**When to use Task Actors:** +- For pure functions or stateless operations +- Mathematical calculations +- Data transformations +- API integrations that don't require state + +## Choosing the Right Actor Type + +| Use Case | Actor Type | Stateful | Example | +|----------|------------|----------|---------| +| User sessions | Unnamed | Yes | Individual user data | +| System configuration | Named | Yes | Global settings | +| Message processing | Pooled | No | Queue processing | +| Scheduled jobs | Timer | Yes/No | Cleanup tasks | +| Calculations | Task | No | Math operations | + +## Next Steps + +- Learn about [actor configuration](actor_configuration.md) +- Explore the [client API](client_api.md) +- Check out [advanced features](../advanced/) for more complex patterns \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/guides/basic/client_api.md b/spawn_sdk/spawn_sdk/guides/basic/client_api.md new file mode 100644 index 00000000..124a30b0 --- /dev/null +++ b/spawn_sdk/spawn_sdk/guides/basic/client_api.md @@ -0,0 +1,417 @@ +# Client API + +This guide covers the complete client API for interacting with Spawn actors from your Elixir application. + +## Basic Invocation + +### SpawnSdk.invoke/2 + +The primary function for calling actor actions. + +```elixir +# Basic invocation +SpawnSdk.invoke(actor_name, options) +``` + +#### Parameters + +- `actor_name` - String name of the actor to invoke +- `options` - Keyword list of invocation options + +#### Basic Example + +```elixir +iex> SpawnSdk.invoke("counter", + system: "spawn-system", + action: "increment", + payload: %MyApp.Messages.IncrementMessage{value: 5} +) +{:ok, %MyApp.Messages.CounterResponse{value: 15}} +``` + +## Invocation Options + +### Required Options + +```elixir +# system - The actor system name +system: "spawn-system" + +# action - The action to call on the actor +action: "my_action" +``` + +### Optional Options + +```elixir +# payload - Data to send to the actor +payload: %MyMessage{data: "hello"} + +# ref - For unnamed actors, specifies the actor type +ref: "user_session" + +# pooled - Use pooled invocation for load balancing +pooled: true + +# delay - Delay execution by milliseconds +delay: 5_000 + +# scheduled_to - Schedule for specific DateTime +scheduled_to: ~U[2023-12-25 10:00:00Z] + +# revision - Restore actor from specific revision +revision: 42 +``` + +## Default Actions + +Actors have built-in actions that don't require custom implementation: + +### Get State + +```elixir +# Get current actor state +iex> SpawnSdk.invoke("user_123", system: "spawn-system", action: "get") +{:ok, %MyApp.Domain.UserState{name: "John", balance: 100}} + +# Alternative action names that work the same way: +action: "Get" +action: "get_state" +action: "getState" +action: "GetState" +``` + +## Working with Named Actors + +Named actors are pre-registered and always available: + +```elixir +# Invoke a named actor directly +iex> SpawnSdk.invoke("config_manager", + system: "spawn-system", + action: "get_config", + payload: %ConfigRequest{key: "database_url"} +) +{:ok, %ConfigResponse{value: "postgres://..."}} + +# Get named actor state +iex> SpawnSdk.invoke("config_manager", system: "spawn-system", action: "get") +{:ok, %ConfigState{settings: %{...}}} +``` + +## Working with Unnamed Actors + +Unnamed actors must be spawned before use, or you can use lazy spawning: + +### Explicit Spawning + +```elixir +# First, spawn the actor +iex> SpawnSdk.spawn_actor("user_123", + system: "spawn-system", + actor: "user_session" +) +:ok + +# Then invoke it +iex> SpawnSdk.invoke("user_123", + system: "spawn-system", + action: "login", + payload: %LoginRequest{username: "john"} +) +{:ok, %LoginResponse{success: true}} +``` + +### Lazy Spawning + +```elixir +# Spawn and invoke in one call using `ref` +iex> SpawnSdk.invoke("user_456", + ref: "user_session", + system: "spawn-system", + action: "login", + payload: %LoginRequest{username: "jane"} +) +{:ok, %LoginResponse{success: true}} +``` + +### Spawning with Revision + +```elixir +# Restore actor from specific point in time +iex> SpawnSdk.spawn_actor("user_789", + system: "spawn-system", + actor: "user_session", + revision: 15 +) +:ok +``` + +## Pooled Invocations + +For stateless, load-balanced operations: + +```elixir +# System automatically selects an available instance +iex> SpawnSdk.invoke("image_processor", + system: "spawn-system", + action: "resize_image", + payload: %ImageRequest{url: "...", width: 800}, + pooled: true +) +{:ok, %ImageResponse{processed_url: "..."}} +``` + +## Scheduled and Delayed Invocations + +### Delayed Execution + +```elixir +# Execute after 5 seconds +iex> SpawnSdk.invoke("notification_service", + system: "spawn-system", + action: "send_reminder", + payload: %ReminderRequest{user_id: 123, message: "Meeting in 5 minutes"}, + delay: 5_000 +) +{:ok, :async} +``` + +### Scheduled Execution + +```elixir +# Execute at specific time +iex> SpawnSdk.invoke("report_generator", + system: "spawn-system", + action: "generate_daily_report", + payload: %ReportRequest{date: "2023-12-25"}, + scheduled_to: ~U[2023-12-25 23:59:00Z] +) +{:ok, :async} +``` + +## Error Handling + +### Common Error Patterns + +```elixir +case SpawnSdk.invoke("my_actor", system: "spawn-system", action: "my_action") do + {:ok, response} -> + # Success - handle response + IO.inspect(response) + + {:error, :not_found} -> + # Actor not found + Logger.error("Actor not found") + + {:error, :timeout} -> + # Request timed out + Logger.error("Request timeout") + + {:error, reason} -> + # Other errors + Logger.error("Invocation failed: #{inspect(reason)}") +end +``` + +### Async Responses + +Delayed and scheduled invocations return `:async`: + +```elixir +case SpawnSdk.invoke("actor", options ++ [delay: 1000]) do + {:ok, :async} -> + Logger.info("Request scheduled successfully") + + {:error, reason} -> + Logger.error("Failed to schedule request: #{inspect(reason)}") +end +``` + +## Advanced Patterns + +### Conditional Invocation + +```elixir +defmodule MyApp.ActorClient do + def call_user_actor(user_id, action, payload) do + # Check if user exists first + case SpawnSdk.invoke("user_#{user_id}", system: "spawn-system", action: "get") do + {:ok, _user_state} -> + # User exists, proceed with action + SpawnSdk.invoke("user_#{user_id}", + system: "spawn-system", + action: action, + payload: payload + ) + + {:error, :not_found} -> + # Lazy spawn and invoke + SpawnSdk.invoke("user_#{user_id}", + ref: "user_session", + system: "spawn-system", + action: action, + payload: payload + ) + end + end +end +``` + +### Bulk Operations + +```elixir +defmodule MyApp.BulkOperations do + def broadcast_to_users(user_ids, message) do + tasks = Enum.map(user_ids, fn user_id -> + Task.async(fn -> + SpawnSdk.invoke("user_#{user_id}", + ref: "user_session", + system: "spawn-system", + action: "receive_message", + payload: message + ) + end) + end) + + # Wait for all to complete + results = Task.await_many(tasks, 5_000) + + # Process results + {successes, failures} = Enum.split_with(results, &match?({:ok, _}, &1)) + + %{ + success_count: length(successes), + failure_count: length(failures), + failures: failures + } + end +end +``` + +### Circuit Breaker Pattern + +```elixir +defmodule MyApp.SafeActorClient do + @max_retries 3 + @retry_delay 1_000 + + def safe_invoke(actor, opts, retries \\ 0) do + case SpawnSdk.invoke(actor, opts) do + {:ok, response} -> + {:ok, response} + + {:error, :timeout} when retries < @max_retries -> + :timer.sleep(@retry_delay) + safe_invoke(actor, opts, retries + 1) + + {:error, reason} -> + Logger.error("Actor invocation failed after #{retries} retries: #{inspect(reason)}") + {:error, reason} + end + end +end +``` + +## Performance Tips + +### Batch Similar Requests + +```elixir +# Instead of multiple calls +Enum.each(user_ids, fn id -> + SpawnSdk.invoke("user_#{id}", system: "spawn-system", action: "update") +end) + +# Use async tasks for parallel execution +user_ids +|> Enum.map(&Task.async(fn -> + SpawnSdk.invoke("user_#{&1}", system: "spawn-system", action: "update") + end)) +|> Task.await_many() +``` + +### Use Pooled Actors for Stateless Operations + +```elixir +# Instead of creating many unnamed actors +SpawnSdk.invoke("processor_#{unique_id}", ...) + +# Use pooled actors for better resource utilization +SpawnSdk.invoke("processor", pooled: true, ...) +``` + +### Minimize Payload Size + +```elixir +# Instead of sending large objects +payload = %HugeDataStructure{...} + +# Send references and let actors fetch data +payload = %DataReference{id: data_id} +``` + +## Testing with Actors + +### Test Helper + +```elixir +defmodule MyApp.TestHelpers do + def setup_test_actor(name, actor_type) do + SpawnSdk.spawn_actor(name, + system: "test-system", + actor: actor_type + ) + end + + def cleanup_test_actor(name) do + # Actors auto-deactivate, but you can force cleanup if needed + SpawnSdk.invoke(name, + system: "test-system", + action: "deactivate" + ) + end +end +``` + +### Integration Test Example + +```elixir +defmodule MyApp.ActorIntegrationTest do + use ExUnit.Case + import MyApp.TestHelpers + + test "user session workflow" do + user_id = "test_user_#{System.unique_integer()}" + + # Setup + setup_test_actor(user_id, "user_session") + + # Test login + {:ok, response} = SpawnSdk.invoke(user_id, + system: "test-system", + action: "login", + payload: %LoginRequest{username: "test"} + ) + + assert response.success == true + + # Test state persistence + {:ok, state} = SpawnSdk.invoke(user_id, + system: "test-system", + action: "get" + ) + + assert state.username == "test" + + # Cleanup + cleanup_test_actor(user_id) + end +end +``` + +## Next Steps + +- Learn about [supervision tree setup](supervision.md) +- Explore [advanced patterns](../advanced/) like side effects and forwards +- Check out [deployment strategies](deployment.md) \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/guides/basic/quickstart.md b/spawn_sdk/spawn_sdk/guides/basic/quickstart.md new file mode 100644 index 00000000..d96bb328 --- /dev/null +++ b/spawn_sdk/spawn_sdk/guides/basic/quickstart.md @@ -0,0 +1,129 @@ +# Quickstart Guide + +This guide will help you get started with Spawn Elixir SDK quickly. + +## Installation + +Add `spawn_sdk` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:spawn_sdk, "~> 2.0.0-RC9"}, + + # Optional: if you are going to use Persistent Actors + #{:spawn_statestores_mariadb, "~> 2.0.0-RC9"}, + #{:spawn_statestores_postgres, "~> 2.0.0-RC9"}, + ] +end +``` + +## Quick Example + +Let's create a simple counter actor to demonstrate the basics: + +### 1. Define your Protobuf messages + +Create a file `priv/example.proto`: + +```protobuf +syntax = "proto3"; + +package io.eigr.spawn.example; + +message MyState { + int32 value = 1; +} + +message MyBusinessMessage { + int32 value = 1; +} +``` + +### 2. Compile Protobuf files + +You need to compile these protobuf files to generate Elixir modules. + +> **Note:** You need the elixir protobuf plugin installed. See [protobuf documentation](https://github.com/elixir-protobuf/protobuf#usage) for installation instructions. + +### 3. Create your first Actor + +```elixir +defmodule MyApp.Actors.Counter do + use SpawnSdk.Actor, + name: "counter", + stateful: true, + state_type: Io.Eigr.Spawn.Example.MyState + + require Logger + + alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} + + # Initialization action - called when actor starts + init fn %Context{state: state} = ctx -> + Logger.info("Counter actor initialized. Context: #{inspect(ctx)}") + + Value.of() + |> Value.state(state || %MyState{value: 0}) + end + + # Sum action - adds a value to the current state + action "Sum", fn %Context{state: state} = ctx, %MyBusinessMessage{value: value} = data -> + Logger.info("Received Sum request: #{inspect(data)}") + + current_value = if is_nil(state), do: 0, else: state.value + new_value = current_value + value + + Value.of(%MyBusinessMessage{value: new_value}, %MyState{value: new_value}) + end +end +``` + +### 4. Set up your supervision tree + +```elixir +defmodule MyApp.Application do + use Application + + @impl true + def start(_type, _args) do + children = [ + { + SpawnSdk.System.Supervisor, + system: "spawn-system", + actors: [ + MyApp.Actors.Counter + ] + } + ] + + opts = [strategy: :one_for_one, name: MyApp.Supervisor] + Supervisor.start_link(children, opts) + end +end +``` + +### 5. Use your Actor + +```elixir +# Add 5 to the counter +iex> SpawnSdk.invoke("counter", system: "spawn-system", action: "Sum", payload: %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 5}) +{:ok, %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 5}} + +# Add 3 more +iex> SpawnSdk.invoke("counter", system: "spawn-system", action: "Sum", payload: %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 3}) +{:ok, %Io.Eigr.Spawn.Example.MyBusinessMessage{value: 8}} + +# Get current state +iex> SpawnSdk.invoke("counter", system: "spawn-system", action: "get") +{:ok, %Io.Eigr.Spawn.Example.MyState{value: 8}} +``` + +That's it! You now have a working Spawn actor that maintains state across invocations. + +## Next Steps + +- Learn about [different actor types](actor_types.md) +- Understand [actor configuration](actor_configuration.md) +- Explore [client API](client_api.md) +- Check out [advanced features](../advanced/) for side effects, forwards, and pipes \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/guides/basic/supervision.md b/spawn_sdk/spawn_sdk/guides/basic/supervision.md new file mode 100644 index 00000000..25bcbccb --- /dev/null +++ b/spawn_sdk/spawn_sdk/guides/basic/supervision.md @@ -0,0 +1,516 @@ +# Supervision Tree Setup + +This guide covers how to properly set up and configure your Spawn actor supervision tree in your Elixir application. + +## Basic Setup + +### Application Module + +```elixir +defmodule MyApp.Application do + use Application + + @impl true + def start(_type, _args) do + children = [ + { + SpawnSdk.System.Supervisor, + system: "spawn-system", + actors: [ + MyApp.Actors.UserSession, + MyApp.Actors.OrderProcessor, + MyApp.Actors.ConfigManager + ] + } + ] + + opts = [strategy: :one_for_one, name: MyApp.Supervisor] + Supervisor.start_link(children, opts) + end +end +``` + +## Configuration Options + +### System Configuration + +```elixir +{ + SpawnSdk.System.Supervisor, + system: "my-actor-system", # Required: system name + actors: [...], # Required: list of actor modules + external_subscribers: [...], # Optional: external event subscribers + startup_timeout: 30_000, # Optional: startup timeout in ms + shutdown_timeout: 5_000 # Optional: shutdown timeout in ms +} +``` + +### system +- **Type:** String +- **Required:** Yes +- **Description:** Unique name for your actor system + +```elixir +# Development system +system: "my-app-dev" + +# Production system +system: "my-app-prod" + +# Multi-tenant systems +system: "tenant-#{tenant_id}" +``` + +### actors +- **Type:** List of modules +- **Required:** Yes +- **Description:** Actor modules to register in the system + +```elixir +actors: [ + # Named actors + MyApp.Actors.ConfigService, + MyApp.Actors.StatsCollector, + + # Unnamed actors (for dynamic spawning) + MyApp.Actors.UserSession, + MyApp.Actors.ShoppingCart, + MyApp.Actors.WorkerTask +] +``` + +## Multiple Actor Systems + +You can run multiple actor systems in the same application: + +```elixir +defmodule MyApp.Application do + use Application + + @impl true + def start(_type, _args) do + children = [ + # Primary system for business logic + { + SpawnSdk.System.Supervisor, + system: "business-system", + actors: [ + MyApp.Actors.UserService, + MyApp.Actors.OrderService + ] + }, + + # Secondary system for background tasks + { + SpawnSdk.System.Supervisor, + system: "background-system", + actors: [ + MyApp.Actors.EmailProcessor, + MyApp.Actors.ReportGenerator + ] + } + ] + + opts = [strategy: :one_for_one, name: MyApp.Supervisor] + Supervisor.start_link(children, opts) + end +end +``` + +## External Subscribers + +For integrating with Phoenix PubSub or other external systems: + +### Basic External Subscriber + +```elixir +defmodule MyApp.ExternalSubscriber do + use GenServer + require Logger + + alias SpawnSdk.Channel.Subscriber + + @impl true + def init(state) do + # Subscribe to actor broadcast events + Subscriber.subscribe("user.events") + {:ok, state} + end + + @impl true + def handle_info({:receive, payload}, state) do + Logger.info("Received external event: #{inspect(payload)}") + + # Forward to Phoenix PubSub, LiveView, etc. + Phoenix.PubSub.broadcast(MyApp.PubSub, "user_updates", {:user_event, payload}) + + {:noreply, state} + end + + def start_link(args) do + GenServer.start_link(__MODULE__, args) + end +end +``` + +### Registering External Subscribers + +```elixir +defmodule MyApp.Application do + use Application + + @impl true + def start(_type, _args) do + children = [ + # Your regular supervision tree + MyApp.PubSub, + + # Spawn system with external subscribers + { + SpawnSdk.System.Supervisor, + system: "spawn-system", + actors: [MyApp.Actors.UserActor], + external_subscribers: [ + {MyApp.ExternalSubscriber, []}, + {MyApp.MetricsCollector, [interval: 5000]}, + {MyApp.EventLogger, [log_level: :info]} + ] + } + ] + + opts = [strategy: :one_for_one, name: MyApp.Supervisor] + Supervisor.start_link(children, opts) + end +end +``` + +## Configuration Management + +### Environment-Based Configuration + +```elixir +defmodule MyApp.Application do + use Application + + @impl true + def start(_type, _args) do + # Configure based on environment + system_name = Application.get_env(:my_app, :actor_system, "my-app-#{Mix.env()}") + + actors = case Mix.env() do + :test -> + [MyApp.Actors.TestUserActor] + :prod -> + [ + MyApp.Actors.UserActor, + MyApp.Actors.OrderActor, + MyApp.Actors.PaymentActor + ] + _ -> + [ + MyApp.Actors.UserActor, + MyApp.Actors.OrderActor + ] + end + + children = [ + { + SpawnSdk.System.Supervisor, + system: system_name, + actors: actors + } + ] + + opts = [strategy: :one_for_one, name: MyApp.Supervisor] + Supervisor.start_link(children, opts) + end +end +``` + +### Runtime Configuration + +```elixir +# config/runtime.exs +import Config + +config :my_app, + actor_system: System.get_env("ACTOR_SYSTEM_NAME", "my-app-prod"), + actor_timeout: String.to_integer(System.get_env("ACTOR_TIMEOUT", "30000")) +``` + +## Health Monitoring + +### Custom Health Check Actor + +```elixir +defmodule MyApp.Actors.HealthCheck do + use SpawnSdk.Actor, + name: "health_check", + stateful: false + + require Logger + + action "Ping", fn _ctx, _payload -> + Logger.debug("Health check ping received") + + Value.of(%{ + status: "healthy", + timestamp: DateTime.utc_now(), + system: "spawn-system" + }) + end + + # Timer-based health monitoring + action "Monitor", [timer: 30_000], fn _ctx -> + # Check system health periodically + memory_usage = :erlang.memory() + process_count = :erlang.system_info(:process_count) + + Logger.info("System health - Processes: #{process_count}, Memory: #{memory_usage[:total]}") + + Value.of() + end +end +``` + +### Integration with Application Health + +```elixir +defmodule MyApp.HealthController do + use MyAppWeb, :controller + + def health(conn, _params) do + case SpawnSdk.invoke("health_check", + system: "spawn-system", + action: "ping" + ) do + {:ok, status} -> + json(conn, status) + + {:error, _reason} -> + conn + |> put_status(503) + |> json(%{status: "unhealthy", error: "actor_system_down"}) + end + end +end +``` + +## Development vs Production Setup + +### Development Setup + +```elixir +# config/dev.exs +config :my_app, + actor_system: "my-app-dev", + actors: [ + MyApp.Actors.UserSession, + MyApp.Actors.TestDataGenerator # Development-only actor + ], + external_subscribers: [ + {MyApp.DevSubscriber, []} # Development event logging + ] +``` + +### Production Setup + +```elixir +# config/prod.exs +config :my_app, + actor_system: "my-app-prod", + actors: [ + MyApp.Actors.UserSession, + MyApp.Actors.OrderProcessor, + MyApp.Actors.PaymentValidator, + MyApp.Actors.MetricsCollector + ], + external_subscribers: [ + {MyApp.MetricsSubscriber, []}, + {MyApp.AlertSubscriber, []} + ] +``` + +## Error Handling and Recovery + +### Supervision Strategy + +```elixir +defmodule MyApp.Application do + use Application + + @impl true + def start(_type, _args) do + children = [ + # Critical services first + MyApp.Database, + + # Then actor system + { + SpawnSdk.System.Supervisor, + system: "spawn-system", + actors: actors_for_env() + }, + + # Less critical services last + MyApp.MetricsServer + ] + + # Use :one_for_one to isolate failures + opts = [strategy: :one_for_one, name: MyApp.Supervisor] + Supervisor.start_link(children, opts) + end + + defp actors_for_env do + base_actors = [ + MyApp.Actors.UserSession, + MyApp.Actors.OrderProcessor + ] + + case Mix.env() do + :prod -> base_actors ++ [MyApp.Actors.MonitoringActor] + _ -> base_actors + end + end +end +``` + +### Graceful Shutdown + +```elixir +defmodule MyApp.Application do + use Application + + @impl true + def start(_type, _args) do + # ... setup children ... + + # Handle graceful shutdown + :ok = :gen_event.swap_handler( + :erl_signal_server, + {:erl_signal_handler, []}, + {MyApp.SignalHandler, []} + ) + + Supervisor.start_link(children, opts) + end +end + +defmodule MyApp.SignalHandler do + @behaviour :gen_event + + def init(args), do: {:ok, args} + + def handle_event(:sigterm, state) do + # Gracefully shutdown actors + # Spawn system handles this automatically + {:ok, state} + end + + def handle_event(_signal, state), do: {:ok, state} + def handle_call(_request, state), do: {:ok, :ok, state} + def handle_info(_info, state), do: {:ok, state} +end +``` + +## Testing Setup + +### Test Configuration + +```elixir +# test/test_helper.exs +ExUnit.start() + +# Setup test actor system +{:ok, _pid} = SpawnSdk.System.Supervisor.start_link( + system: "test-system", + actors: [ + MyApp.Actors.TestUser, + MyApp.Actors.TestOrder + ] +) +``` + +### Test Utilities + +```elixir +defmodule MyApp.TestHelpers do + def setup_test_system do + SpawnSdk.System.Supervisor.start_link( + system: "test-#{System.unique_integer()}", + actors: [MyApp.Actors.TestActor] + ) + end + + def with_test_actor(actor_name, actor_type, test_fun) do + system = "test-#{System.unique_integer()}" + + {:ok, _} = SpawnSdk.System.Supervisor.start_link( + system: system, + actors: [actor_type] + ) + + SpawnSdk.spawn_actor(actor_name, system: system, actor: actor_type) + + try do + test_fun.(system) + after + # Cleanup happens automatically when supervisor terminates + :ok + end + end +end +``` + +## Best Practices + +### Naming Conventions + +```elixir +# Use descriptive system names +system: "ecommerce-system" # Better than "system1" +system: "analytics-pipeline" # Clear purpose +system: "user-management" # Domain-specific + +# Environment-specific names +system: "my-app-#{Mix.env()}" +system: "tenant-#{tenant_id}-system" +``` + +### Actor Organization + +```elixir +# Group related actors +actors: [ + # User domain + MyApp.Actors.UserSession, + MyApp.Actors.UserProfile, + + # Order domain + MyApp.Actors.ShoppingCart, + MyApp.Actors.OrderProcessor, + + # System actors + MyApp.Actors.HealthMonitor, + MyApp.Actors.ConfigManager +] +``` + +### Resource Management + +```elixir +# Configure appropriate timeouts +{ + SpawnSdk.System.Supervisor, + system: "spawn-system", + actors: actors, + startup_timeout: 60_000, # Allow time for actor initialization + shutdown_timeout: 10_000 # Graceful shutdown time +} +``` + +## Next Steps + +- Learn about [deployment strategies](deployment.md) +- Explore [advanced patterns](../advanced/) for complex actor interactions +- See [monitoring and observability](monitoring.md) for production insights \ No newline at end of file diff --git a/spawn_sdk/spawn_sdk/lib/actor.ex b/spawn_sdk/spawn_sdk/lib/actor.ex index 8dc720da..b08104d0 100644 --- a/spawn_sdk/spawn_sdk/lib/actor.ex +++ b/spawn_sdk/spawn_sdk/lib/actor.ex @@ -1,29 +1,142 @@ defmodule SpawnSdk.Actor do @moduledoc """ - Documentation for `Actor`. + Defines the behavior and macros for creating Spawn actors. + + `SpawnSdk.Actor` is a behaviour module that provides the foundation for building + stateful, distributed actors in the Spawn ecosystem. Actors are autonomous entities + that encapsulate state and behavior, communicating through message passing. + + ## Features + + - **Stateful computation** - Actors maintain persistent state across invocations + - **Message-based communication** - Interact through well-defined actions + - **Fault tolerance** - Built-in supervision and error recovery + - **Horizontal scaling** - Distribute actors across multiple nodes + - **Event sourcing** - Optional state snapshots and event replay + - **Side effects** - Asynchronous communication with other actors + - **Broadcasting** - Publish events to multiple subscribers + + ## Actor Types + + - **Named actors** - Singleton actors with fixed names, started at system boot + - **Unnamed actors** - Dynamic actors spawned on-demand with unique identifiers + - **Pooled actors** - Load-balanced actors for stateless operations + - **Timer actors** - Actors with periodic execution capabilities + + ## Configuration Options + + - `name` - Actor identifier within the system (required) + - `kind` - `:named` (default) or `:unnamed` + - `stateful` - `true` (default) or `false` for stateless actors + - `state_type` - Protobuf message type for state serialization + - `deactivate_timeout` - Idle time before actor deactivation (ms) + - `snapshot_timeout` - Interval for automatic state snapshots (ms) + - `channels` - List of broadcast channels to subscribe to + + ## Basic Usage + + defmodule MyApp.Actors.Counter do + use SpawnSdk.Actor, + name: "counter", + stateful: true, + state_type: MyApp.Domain.CounterState + + # Initialization action (optional) + init fn %Context{} = ctx -> + initial_state = %CounterState{count: 0} + Value.of() |> Value.state(initial_state) + end - Actor look like this: + # Define actor actions + action "increment", fn %Context{state: state}, %{value: value} -> + new_count = (state.count || 0) + value + new_state = %{state | count: new_count} + + Value.of(%{count: new_count}, new_state) + end - defmodule MyActor do - use SpawnSdk.Actor, - name: "joe", - persistent: false, - state_type: Io.Eigr.Spawn.Example.MyState, - deactivate_timeout: 5_000, - snapshot_timeout: 2_000 + action "get_count", fn %Context{state: state}, _payload -> + Value.of(%{count: state.count || 0}, state) + end + end + + ## Advanced Features - require Logger - alias Io.Eigr.Spawn.Example.{MyState, MyBusinessMessage} + ### Side Effects - defact sum(%MyBusinessMessage{value: value} = data}, %Context{state: state} = ctx) do - Logger.info("Received Request...") + action "process_order", fn ctx, order_data -> + # Process order logic... + + Value.of(order_result, new_state) + |> Value.effects([ + SideEffect.of() + |> SideEffect.effect("email_service", :send_confirmation, order_data) + |> SideEffect.effect("inventory_service", :update_stock, order_data) + ]) + end - new_value = (state.value || 0) + value + ### Broadcasting - %Value{} - |> Value.of(%MyBusinessMessage{value: new_value}, %MyState{value: new_value}) - |> Value.reply!() + action "user_registered", fn ctx, user_data -> + event = %UserRegistered{user_id: user_data.id, timestamp: DateTime.utc_now()} + + Value.of(user_data, ctx.state) + |> Value.broadcast(Broadcast.to("user.events", event)) end + + ### Timer Actions + + # Execute every 30 seconds + action "heartbeat", [timer: 30_000], fn %Context{} = ctx -> + Logger.info("Actor heartbeat") + Value.of() |> Value.state(ctx.state) + end + + ## Actor Lifecycle + + 1. **Initialization** - Actor starts and runs `init` action if defined + 2. **Active** - Actor processes incoming messages and maintains state + 3. **Idle** - Actor becomes inactive after `deactivate_timeout` + 4. **Reactivation** - Actor restores state when receiving new messages + 5. **Termination** - Actor shuts down gracefully during system shutdown + + ## State Management + + Actors can maintain state using Protobuf messages for efficient serialization + and cross-language compatibility: + + # Define state schema + message CounterState { + int32 count = 1; + google.protobuf.Timestamp last_updated = 2; + } + + # Use in actor + use SpawnSdk.Actor, + state_type: MyApp.Proto.CounterState + + ## Error Handling + + Actors provide built-in error recovery through supervision. Failed actors + are automatically restarted while preserving their last known state. + + ## Performance Considerations + + - Use `stateful: false` for computational actors without state + - Configure appropriate `deactivate_timeout` based on usage patterns + - Leverage pooled actors for CPU-intensive, stateless operations + - Use side effects for non-blocking inter-actor communication + + ## Documentation + + For comprehensive guides and examples: + + - [Actor Types Guide](guides/basic/actor_types.md) + - [Actor Configuration](guides/basic/actor_configuration.md) + - [Side Effects](guides/advanced/side_effects.md) + - [Broadcasting](guides/advanced/broadcast.md) + - [Forwards and Pipes](guides/advanced/forwards_and_pipes.md) + """ alias SpawnSdk.{ diff --git a/spawn_sdk/spawn_sdk/lib/flow.ex b/spawn_sdk/spawn_sdk/lib/flow.ex index 5af49114..cf3ad608 100644 --- a/spawn_sdk/spawn_sdk/lib/flow.ex +++ b/spawn_sdk/spawn_sdk/lib/flow.ex @@ -1,4 +1,6 @@ defmodule SpawnSdk.Flow do + @moduledoc false + defmodule Broadcast do @moduledoc """ Actors can also send messages to a group of actors at once as an action callback. This we call Broadcast. diff --git a/spawn_sdk/spawn_sdk/lib/spawn_sdk.ex b/spawn_sdk/spawn_sdk/lib/spawn_sdk.ex index cc22c277..d6eb03e3 100644 --- a/spawn_sdk/spawn_sdk/lib/spawn_sdk.ex +++ b/spawn_sdk/spawn_sdk/lib/spawn_sdk.ex @@ -1,10 +1,9 @@ defmodule SpawnSdk do - @moduledoc "README.md" - |> File.read!() - |> String.split("") - |> Enum.fetch!(1) + @moduledoc """ + """ defmodule ActorRef do + @moduledoc false @enforce_keys [:name] defstruct system: nil, name: nil, opts: [] @@ -16,6 +15,7 @@ defmodule SpawnSdk do end defmodule ActorChannel do + @moduledoc false @enforce_keys [:channel] defstruct channel: nil, opts: [] @@ -26,6 +26,7 @@ defmodule SpawnSdk do end defmodule ActorGroupRef do + @moduledoc false @enforce_keys [:actors] defstruct actors: nil, opts: [] diff --git a/spawn_sdk/spawn_sdk/mix.exs b/spawn_sdk/spawn_sdk/mix.exs index 7bbf9683..ce133418 100644 --- a/spawn_sdk/spawn_sdk/mix.exs +++ b/spawn_sdk/spawn_sdk/mix.exs @@ -10,6 +10,7 @@ defmodule SpawnSdk.MixProject do app: @app, version: @version, description: "Spawn Elixir SDK is the support library for the Spawn Actors System", + name: "Spawn Elixir SDK", source_url: @source_url, homepage_url: "https://eigr.io/", build_path: "../../_build", @@ -44,17 +45,62 @@ defmodule SpawnSdk.MixProject do defp docs do [ - main: "readme", + main: "SpawnSdk.Actor", source_url: @source_url, source_ref: "v#{@version}", formatter_opts: [gfm: true], extras: [ - "README.md" + "guides/basic/quickstart.md", + "guides/basic/actor_types.md", + "guides/basic/actor_configuration.md", + "guides/basic/client_api.md", + "guides/basic/supervision.md", + "guides/advanced/side_effects.md", + "guides/advanced/forwards_and_pipes.md", + "guides/advanced/broadcast.md" + ], + groups_for_extras: [ + "Getting Started": [ + "guides/basic/quickstart.md" + ], + "Basic Concepts": [ + "guides/basic/actor_types.md", + "guides/basic/actor_configuration.md", + "guides/basic/client_api.md", + "guides/basic/supervision.md" + ], + "Advanced Features": [ + "guides/advanced/side_effects.md", + "guides/advanced/forwards_and_pipes.md", + "guides/advanced/broadcast.md" + ] + ], + groups_for_modules: [ + "Actors": [ + SpawnSdk.Actor, + SpawnSdk.System.Supervisor, + SpawnSdk.Context, + SpawnSdk.Value + ], + Workflows: [ + SpawnSdk.Flow.Broadcast, + SpawnSdk.Flow.Forward, + SpawnSdk.Flow.Pipe, + SpawnSdk.Flow.SideEffect + ], + Deprecated: [ + SpawnSdk + ], + Miscellaneous: [ + SpawnSdk.Defact, + SpawnSdk.System, + SpawnSdk.System.SpawnSystem, + SpawnSdk.Channel.Subscriber + ] ] ] end - # Run "mix help deps" to learn about dependencies. defp deps do [ {:spawn, path: "../.."},