Skip to content

Commit 34f6b13

Browse files
authored
feat: Allow Reactors to be able to be undone after successful completion. (#262)
1 parent 1080158 commit 34f6b13

File tree

5 files changed

+181
-15
lines changed

5 files changed

+181
-15
lines changed

lib/reactor.ex

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,24 @@ defmodule Reactor do
104104
"""
105105
@type concurrency_key_option :: {:concurrency_key, reference()}
106106

107-
@type options ::
107+
@typedoc """
108+
When this option is set the Reactor will return a copy of the completed Reactor
109+
struct for potential future undo.
110+
"""
111+
@type fully_reversible_option :: {:fully_reversible?, boolean}
112+
113+
@type run_options ::
114+
Enumerable.t(
115+
max_concurrency_option
116+
| timeout_option
117+
| max_iterations_option
118+
| halt_timeout_option
119+
| async_option
120+
| concurrency_key_option
121+
| fully_reversible_option
122+
)
123+
124+
@type undo_options ::
108125
Enumerable.t(
109126
max_concurrency_option
110127
| timeout_option
@@ -135,7 +152,7 @@ defmodule Reactor do
135152
@spec is_reactor(any) :: Macro.t()
136153
defguard is_reactor(reactor) when is_struct(reactor, __MODULE__)
137154

138-
@option_schema [
155+
@run_schema [
139156
max_concurrency: [
140157
type: :pos_integer,
141158
required: false,
@@ -168,6 +185,12 @@ defmodule Reactor do
168185
type: :any,
169186
required: false,
170187
doc: "A unique identifier for the Reactor run"
188+
],
189+
fully_reversible?: [
190+
type: :boolean,
191+
required: false,
192+
default: false,
193+
doc: "Return the completed reactor as well as the result for possible later reversal"
171194
]
172195
]
173196

@@ -185,10 +208,10 @@ defmodule Reactor do
185208
186209
## Options
187210
188-
#{Spark.Options.docs(@option_schema)}
211+
#{Spark.Options.docs(@run_schema)}
189212
"""
190-
@doc spark_opts: [{4, @option_schema}]
191-
@spec run(t | module, inputs, context_arg, options) :: {:ok, any} | {:error, any} | {:halted, t}
213+
@spec run(t | module, inputs, context_arg, run_options) ::
214+
{:ok, any} | {:error, any} | {:halted, t}
192215
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
193216

194217
def run(reactor, inputs, context, options) when is_atom(reactor) do
@@ -215,7 +238,7 @@ defmodule Reactor do
215238
end
216239

217240
@doc "Raising version of `run/4`."
218-
@spec run!(t | module, inputs, context_arg, options) :: any | no_return
241+
@spec run!(t | module, inputs, context_arg, run_options) :: any | no_return
219242
def run!(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
220243

221244
def run!(reactor, inputs, context, options) do
@@ -224,4 +247,63 @@ defmodule Reactor do
224247
{:error, reason} -> raise reason
225248
end
226249
end
250+
251+
@undo_options Keyword.drop(@run_schema, [:fully_reversible?])
252+
253+
@doc """
254+
Attempt to undo a previously successful Reactor.
255+
256+
## Arguments
257+
258+
* `reactor` - The previously successful Reactor struct.
259+
* `context` - An arbitrary map that will be merged into the Reactor context and passed into each undo.
260+
261+
## Options
262+
263+
#{Spark.Options.docs(@undo_options)}
264+
"""
265+
@spec undo(t, context_arg, undo_options) :: :ok | {:error, any}
266+
def undo(reactor, context, options \\ [])
267+
268+
def undo(reactor, _context, _options) when not is_struct(reactor, __MODULE__) do
269+
{:error,
270+
ArgumentError.exception(
271+
message: "`reactor` value `#{inspect(reactor)}` is not a Reactor struct"
272+
)}
273+
end
274+
275+
def undo(reactor, _context, _options) when reactor.state != :successful do
276+
{:error,
277+
StateError.exception(
278+
reactor: reactor,
279+
state: reactor.state,
280+
expected: ~w[successful]a
281+
)}
282+
end
283+
284+
def undo(_reactor, context, _options) when not is_map(context) do
285+
{:error,
286+
ArgumentError.exception(
287+
message: "`context` value `#{inspect(context)}` is not valid context - must be a map"
288+
)}
289+
end
290+
291+
def undo(_reactor, _context, options) when not is_list(options) do
292+
{:error,
293+
ArgumentError.exception(
294+
message: "`options` value `#{inspect(options)}` is not a keyword list"
295+
)}
296+
end
297+
298+
def undo(reactor, context, options) do
299+
Reactor.Executor.undo(reactor, context, options)
300+
end
301+
302+
@doc "A raising version of `undo/2`"
303+
@spec undo!(t, context_arg, undo_options) :: :ok | no_return
304+
def undo!(reactor, context, options) do
305+
with {:error, reason} <- undo(reactor, context, options) do
306+
raise reason
307+
end
308+
end
227309
end

lib/reactor/executor.ex

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ defmodule Reactor.Executor do
5353
5454
You probably shouldn't call this directly, but use `Reactor.run/4` instead.
5555
"""
56-
@spec run(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.options()) ::
57-
{:ok, any} | {:halted, Reactor.t()} | {:error, any}
56+
@spec run(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.run_options()) ::
57+
{:ok, any} | {:ok, any, Reactor.t()} | {:halted, Reactor.t()} | {:error, any}
5858
def run(reactor, inputs \\ %{}, context \\ %{}, options \\ [])
5959

6060
def run(reactor, _inputs, _context, _options) when is_nil(reactor.return),
@@ -76,6 +76,21 @@ defmodule Reactor.Executor do
7676
expected: ~w[pending halted]a
7777
)}
7878

79+
@doc """
80+
Undo a previously successful Reactor.
81+
"""
82+
@spec undo(Reactor.t(), Reactor.context(), Reactor.undo_options()) :: :ok | {:error, any}
83+
def undo(reactor, context, options) do
84+
inputs =
85+
reactor.context
86+
|> Map.get(:private, %{})
87+
|> Map.get(:inputs, %{})
88+
89+
with {:ok, reactor, state} <- Executor.Init.init(reactor, inputs, context, options) do
90+
handle_undo(reactor, state)
91+
end
92+
end
93+
7994
defp execute(reactor, state) when state.max_iterations == 0 do
8095
{reactor, _status} = Executor.Async.collect_remaining_tasks_for_shutdown(reactor, state)
8196
maybe_release_pool(state)
@@ -107,10 +122,16 @@ defmodule Reactor.Executor do
107122
{:error, reason} -> {:error, reason}
108123
end
109124

110-
{:ok, result} ->
125+
{:ok, result, reactor} ->
111126
maybe_release_pool(state)
112127

113-
Executor.Hooks.complete(reactor, result, reactor.context)
128+
with {:ok, result} <- Executor.Hooks.complete(reactor, result, reactor.context) do
129+
if state.fully_reversible? do
130+
{:ok, result, reactor}
131+
else
132+
{:ok, result}
133+
end
134+
end
114135

115136
{:error, reason} ->
116137
maybe_release_pool(state)
@@ -231,6 +252,8 @@ defmodule Reactor.Executor do
231252
handle_undo(%{reactor | state: :failed, undo: []}, state, Enum.reverse(reactor.undo))
232253
end
233254

255+
defp handle_undo(_reactor, state, []) when state.errors == [], do: :ok
256+
234257
defp handle_undo(reactor, state, []) do
235258
error = Reactor.Error.to_class(state.errors)
236259
Executor.Hooks.error(reactor, error, reactor.context)
@@ -246,7 +269,7 @@ defmodule Reactor.Executor do
246269
defp all_done(reactor) do
247270
with 0 <- Graph.num_vertices(reactor.plan),
248271
{:ok, value} <- Map.fetch(reactor.intermediate_results, reactor.return) do
249-
{:ok, value}
272+
{:ok, value, %{reactor | state: :successful}}
250273
else
251274
:error ->
252275
{:error, MissingReturnResultError.exception(reactor: reactor)}

lib/reactor/executor/init.ex

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ defmodule Reactor.Executor.Init do
88
import Reactor.Utils
99

1010
@doc false
11-
@spec init(Reactor.t(), Reactor.inputs(), Reactor.context(), Reactor.options()) ::
11+
@spec init(
12+
Reactor.t(),
13+
Reactor.inputs(),
14+
Reactor.context(),
15+
Reactor.run_options() | Reactor.undo_options()
16+
) ::
1217
{:ok, Reactor.t(), state :: map} | {:error, any}
1318
def init(reactor, _inputs, _context, _options) when not is_reactor(reactor),
1419
do: {:error, ArgumentError.exception(message: "`reactor` is not a Reactor.")}
@@ -24,7 +29,7 @@ defmodule Reactor.Executor.Init do
2429
reactor.context
2530
|> deep_merge(context)
2631
|> deep_merge(%{private: %{inputs: inputs}})
27-
|> Map.put(:run_id, state.run_id)
32+
|> Map.put_new(:run_id, state.run_id)
2833

2934
{:ok, %{reactor | context: context}, state}
3035
end

lib/reactor/executor/state.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ defmodule Reactor.Executor.State do
99
async?: true,
1010
halt_timeout: 5000,
1111
max_iterations: :infinity,
12-
timeout: :infinity
12+
timeout: :infinity,
13+
fully_reversible?: false
1314
}
1415

1516
defstruct async?: @defaults.async?,
@@ -24,7 +25,8 @@ defmodule Reactor.Executor.State do
2425
run_id: nil,
2526
skipped: MapSet.new(),
2627
started_at: nil,
27-
timeout: @defaults.timeout
28+
timeout: @defaults.timeout,
29+
fully_reversible?: @defaults.fully_reversible?
2830

2931
alias Reactor.{Executor.ConcurrencyTracker, Step}
3032

test/reactor_test.exs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,59 @@ defmodule ReactorTest do
5252

5353
assert {:ok, "McFly Marty"} = Reactor.run(reactor, name: "Marty McFly")
5454
end
55+
56+
test "it can return successful reactors" do
57+
assert {:ok, "Marty McFly", reactor} =
58+
Reactor.run(BasicReactor, %{name: "McFly Marty"}, %{}, fully_reversible?: true)
59+
60+
assert reactor.state == :successful
61+
end
62+
end
63+
64+
describe "undo/2" do
65+
defmodule UndoableReactor do
66+
use Reactor
67+
68+
input :agent
69+
70+
step :push_a do
71+
argument :agent, input(:agent)
72+
run &push(&1, :a)
73+
undo &undo/2
74+
end
75+
76+
step :push_b do
77+
wait_for :push_a
78+
argument :agent, input(:agent)
79+
run &push(&1, :b)
80+
undo &undo/2
81+
end
82+
83+
return :push_b
84+
85+
def push(args, value) do
86+
Agent.update(args.agent, fn list -> [value | list] end)
87+
{:ok, value}
88+
end
89+
90+
def undo(value, args) do
91+
Agent.update(args.agent, fn list -> List.delete(list, value) end)
92+
93+
:ok
94+
end
95+
end
96+
97+
test "previously successful reactors can be undone" do
98+
{:ok, pid} = Agent.start_link(fn -> [:z] end)
99+
100+
assert {:ok, :b, reactor} =
101+
Reactor.run(UndoableReactor, %{agent: pid}, %{}, fully_reversible?: true)
102+
103+
assert [:b, :a, :z] = Agent.get(pid, &Function.identity/1)
104+
105+
assert :ok = Reactor.undo(reactor, %{})
106+
107+
assert [:z] = Agent.get(pid, &Function.identity/1)
108+
end
55109
end
56110
end

0 commit comments

Comments
 (0)