diff --git a/orchestrator/elixir/lib/symphony_elixir/config/schema.ex b/orchestrator/elixir/lib/symphony_elixir/config/schema.ex index 34414c5..9f9a10b 100644 --- a/orchestrator/elixir/lib/symphony_elixir/config/schema.ex +++ b/orchestrator/elixir/lib/symphony_elixir/config/schema.ex @@ -140,6 +140,8 @@ defmodule SymphonyElixir.Config.Schema do field(:max_turns, :integer, default: 20) field(:max_retry_backoff_ms, :integer, default: 300_000) field(:max_retries, :integer, default: 10) + field(:min_retry_interval_ms, :integer, default: 60_000) + field(:short_run_threshold_ms, :integer, default: 60_000) field(:max_concurrent_agents_by_state, :map, default: %{}) end @@ -148,13 +150,23 @@ defmodule SymphonyElixir.Config.Schema do schema |> cast( attrs, - [:max_concurrent_agents, :max_turns, :max_retry_backoff_ms, :max_retries, :max_concurrent_agents_by_state], + [ + :max_concurrent_agents, + :max_turns, + :max_retry_backoff_ms, + :max_retries, + :min_retry_interval_ms, + :short_run_threshold_ms, + :max_concurrent_agents_by_state + ], empty_values: [] ) |> validate_number(:max_concurrent_agents, greater_than: 0) |> validate_number(:max_turns, greater_than: 0) |> validate_number(:max_retry_backoff_ms, greater_than: 0) |> validate_number(:max_retries, greater_than: 0) + |> validate_number(:min_retry_interval_ms, greater_than_or_equal_to: 0) + |> validate_number(:short_run_threshold_ms, greater_than_or_equal_to: 0) |> update_change(:max_concurrent_agents_by_state, &Schema.normalize_state_limits/1) |> Schema.validate_state_limits(:max_concurrent_agents_by_state) end diff --git a/orchestrator/elixir/lib/symphony_elixir/orchestrator.ex b/orchestrator/elixir/lib/symphony_elixir/orchestrator.ex index 15bc7aa..41c9eb6 100644 --- a/orchestrator/elixir/lib/symphony_elixir/orchestrator.ex +++ b/orchestrator/elixir/lib/symphony_elixir/orchestrator.ex @@ -217,6 +217,7 @@ defmodule SymphonyElixir.Orchestrator do running_entry -> {updated_running_entry, token_delta} = Metrics.integrate_worker_update(running_entry, update) + updated_running_entry = apply_entry_rate_limits(updated_running_entry, update) state = state @@ -267,18 +268,39 @@ defmodule SymphonyElixir.Orchestrator do end defp handle_agent_exit_reason(state, issue_id, running_entry, session_id, :normal) do - Logger.info("Agent task completed for issue_id=#{issue_id} session_id=#{session_id}; evaluating continuation policy") + if Retry.short_lived_exit?(running_entry) do + runtime_ms = Retry.running_entry_runtime_ms(running_entry) - updated_state = - state - |> complete_issue(issue_id) - |> reconcile_completed_issue_lifecycle(issue_id, running_entry) - |> maybe_schedule_continuation_retry(issue_id, running_entry) + Logger.warning( + "Short-lived normal exit for issue_id=#{issue_id} session_id=#{session_id}" <> + " runtime_ms=#{runtime_ms}; treating as transient failure with backoff" + ) - if retry_scheduled?(updated_state, issue_id) do - {updated_state, "waiting", "continuation_retry_scheduled"} + updated_state = + state + |> complete_issue(issue_id) + |> reconcile_completed_issue_lifecycle(issue_id, running_entry) + |> maybe_schedule_short_lived_retry(issue_id, running_entry, runtime_ms) + + if retry_scheduled?(updated_state, issue_id) do + {updated_state, "waiting", "short_lived_exit_retry_scheduled"} + else + {updated_state, "success", "short_lived_exit_no_retry"} + end else - {updated_state, "success", "agent_task_completed"} + Logger.info("Agent task completed for issue_id=#{issue_id} session_id=#{session_id}; evaluating continuation policy") + + updated_state = + state + |> complete_issue(issue_id) + |> reconcile_completed_issue_lifecycle(issue_id, running_entry) + |> maybe_schedule_continuation_retry(issue_id, running_entry) + + if retry_scheduled?(updated_state, issue_id) do + {updated_state, "waiting", "continuation_retry_scheduled"} + else + {updated_state, "success", "agent_task_completed"} + end end end @@ -1187,6 +1209,45 @@ defmodule SymphonyElixir.Orchestrator do defp maybe_schedule_continuation_retry(%State{} = state, _issue_id, _running_entry), do: state + defp maybe_schedule_short_lived_retry(%State{} = state, issue_id, running_entry, runtime_ms) + when is_binary(issue_id) and is_map(running_entry) do + next_attempt = Retry.next_retry_attempt_from_running(running_entry) + retry_after_ms = compute_retry_after_ms(running_entry, state) + + metadata = + Map.merge( + %{ + identifier: running_entry.identifier, + delay_type: :short_lived_exit, + worker_host: Map.get(running_entry, :worker_host), + workspace_path: Map.get(running_entry, :workspace_path), + runtime_ms: runtime_ms, + retry_after_ms: retry_after_ms + }, + Retry.retry_runtime_metadata(running_entry) + ) + + case refresh_issue_for_continuation(issue_id) do + {:ok, %Issue{} = refreshed_issue} -> + if Dispatch.retry_candidate_issue?(refreshed_issue, Dispatch.terminal_state_set(), state.tracked) do + Retry.schedule_issue_retry(state, issue_id, next_attempt, metadata) + else + release_issue_claim(state, issue_id) + end + + {:ok, :missing} -> + release_issue_claim(state, issue_id) + + {:error, reason} -> + Logger.debug("Failed to refresh issue for short-lived retry issue_id=#{issue_id}: #{inspect(reason)}; scheduling retry with backoff") + + Retry.schedule_issue_retry(state, issue_id, next_attempt, metadata) + end + end + + defp maybe_schedule_short_lived_retry(%State{} = state, _issue_id, _running_entry, _runtime_ms), + do: state + defp handle_retry_issue(%State{} = state, issue_id, attempt, metadata) do case Tracker.fetch_candidate_issues() do {:ok, issues} -> @@ -1658,8 +1719,6 @@ defmodule SymphonyElixir.Orchestrator do ) end - defp emit_symphony_run_analytics(_running_entry, _status, _note, _metrics), do: :ok - defp retry_scheduled?(%State{} = state, issue_id) when is_binary(issue_id) do Map.has_key?(state.retry_attempts, issue_id) end @@ -1760,6 +1819,20 @@ defmodule SymphonyElixir.Orchestrator do defp apply_worker_token_delta(state, _token_delta), do: state + defp compute_retry_after_ms(running_entry, %State{} = state) + when is_map(running_entry) do + entry_retry_after = Retry.extract_retry_after_ms(Map.get(running_entry, :last_rate_limits)) + state_retry_after = Retry.extract_retry_after_ms(state.worker_rate_limits) + max(entry_retry_after, state_retry_after) + end + + defp apply_entry_rate_limits(running_entry, update) when is_map(running_entry) and is_map(update) do + case Metrics.extract_rate_limits(update) do + %{} = rate_limits -> Map.put(running_entry, :last_rate_limits, rate_limits) + _ -> running_entry + end + end + defp apply_worker_rate_limits(%State{} = state, update) when is_map(update) do case Metrics.extract_rate_limits(update) do %{} = rate_limits -> diff --git a/orchestrator/elixir/lib/symphony_elixir/orchestrator/retry.ex b/orchestrator/elixir/lib/symphony_elixir/orchestrator/retry.ex index fc16c87..ca341bd 100644 --- a/orchestrator/elixir/lib/symphony_elixir/orchestrator/retry.ex +++ b/orchestrator/elixir/lib/symphony_elixir/orchestrator/retry.ex @@ -179,12 +179,45 @@ defmodule SymphonyElixir.Orchestrator.Retry do @spec retry_delay(pos_integer(), map()) :: pos_integer() @doc """ Calculate retry delay based on attempt number and metadata. + + Delay types: + - `:continuation` — worker ran long enough and exited normally; short delay + - `:short_lived_exit` — worker exited normally but ran below the short-run + threshold (e.g. hit a rate limit on the first API call); uses failure backoff + with `min_retry_interval_ms` floor + - anything else — failure backoff with `min_retry_interval_ms` floor """ def retry_delay(attempt, metadata) when is_integer(attempt) and attempt > 0 and is_map(metadata) do - if metadata[:delay_type] == :continuation and attempt == 1 do - @continuation_retry_delay_ms + base_delay = + if metadata[:delay_type] == :continuation and attempt == 1 do + @continuation_retry_delay_ms + else + failure_retry_delay(attempt) + end + + enforce_minimum_interval(base_delay, metadata) + end + + @spec enforce_minimum_interval(pos_integer(), map()) :: pos_integer() + @doc """ + Apply the configured minimum retry interval as a floor. + + Genuine continuation retries (long-running workers that completed some work) + are exempt from the floor — they should resume quickly. Short-lived exits + and failure retries always respect the floor. + + When rate limit information is present in `metadata[:retry_after_ms]`, it is + used as an additional floor alongside `min_retry_interval_ms`. The final + delay is the maximum of the base delay, the config floor, and the + rate-limit-derived floor. + """ + def enforce_minimum_interval(delay_ms, metadata) when is_integer(delay_ms) and is_map(metadata) do + if metadata[:delay_type] == :continuation do + delay_ms else - failure_retry_delay(attempt) + config_floor = Config.settings!().agent.min_retry_interval_ms + rate_limit_floor = Map.get(metadata, :retry_after_ms, 0) + Enum.max([delay_ms, config_floor, rate_limit_floor]) end end @@ -197,6 +230,54 @@ defmodule SymphonyElixir.Orchestrator.Retry do min(@failure_retry_base_ms * (1 <<< max_delay_power), Config.settings!().agent.max_retry_backoff_ms) end + @spec short_lived_exit?(map()) :: boolean() + @doc """ + Determine whether a running entry represents a short-lived run. + + A run is short-lived when it completed in less time than the configured + `short_run_threshold_ms`. Short-lived normal exits are treated as transient + failures (e.g. rate limits) rather than genuine work completions. + """ + def short_lived_exit?(running_entry) when is_map(running_entry) do + threshold_ms = Config.settings!().agent.short_run_threshold_ms + + case running_entry_runtime_ms(running_entry) do + ms when is_integer(ms) and ms < threshold_ms -> true + _ -> false + end + end + + @spec running_entry_runtime_ms(map()) :: non_neg_integer() | nil + @doc """ + Compute the runtime in milliseconds for a running entry. + """ + def running_entry_runtime_ms(%{started_at: %DateTime{} = started_at}) do + max(0, DateTime.diff(DateTime.utc_now(), started_at, :millisecond)) + end + + def running_entry_runtime_ms(_running_entry), do: nil + + @spec extract_retry_after_ms(map() | nil) :: non_neg_integer() + @doc """ + Extract a retry-after floor (in milliseconds) from rate limit data. + + Inspects the `primary` and `secondary` buckets for `reset_in_seconds` + values. When a bucket's `remaining` count is zero (rate limited), its + `reset_in_seconds` is converted to milliseconds. The maximum across + all exhausted buckets is returned. + + Returns `0` when no actionable rate limit data is present. + """ + def extract_retry_after_ms(rate_limits) when is_map(rate_limits) do + Enum.max([ + bucket_retry_after_ms(rate_limits, ["primary", :primary]), + bucket_retry_after_ms(rate_limits, ["secondary", :secondary]), + 0 + ]) + end + + def extract_retry_after_ms(_rate_limits), do: 0 + @spec normalize_retry_attempt(integer() | any()) :: non_neg_integer() @doc """ Normalize retry attempt to valid range. @@ -229,6 +310,34 @@ defmodule SymphonyElixir.Orchestrator.Retry do } end + # Private helpers for rate-limit-aware backoff + + defp bucket_retry_after_ms(rate_limits, keys) when is_map(rate_limits) and is_list(keys) do + bucket = Enum.find_value(keys, fn key -> Map.get(rate_limits, key) end) + bucket_reset_ms(bucket) + end + + defp bucket_reset_ms(bucket) when is_map(bucket) do + remaining = bucket_remaining(bucket) + reset_seconds = bucket_reset_seconds(bucket) + + if is_number(remaining) and remaining <= 0 and is_number(reset_seconds) and reset_seconds > 0 do + round(reset_seconds * 1_000) + else + 0 + end + end + + defp bucket_reset_ms(_bucket), do: 0 + + defp bucket_remaining(bucket) do + Map.get(bucket, "remaining") || Map.get(bucket, :remaining) + end + + defp bucket_reset_seconds(bucket) do + Map.get(bucket, "reset_in_seconds") || Map.get(bucket, :reset_in_seconds) + end + # Private helper functions for picking retry values defp pick_retry_identifier(issue_id, previous_retry, metadata) do metadata[:identifier] || Map.get(previous_retry, :identifier) || issue_id diff --git a/orchestrator/elixir/test/support/test_support.exs b/orchestrator/elixir/test/support/test_support.exs index 1e0d92c..410fc6a 100644 --- a/orchestrator/elixir/test/support/test_support.exs +++ b/orchestrator/elixir/test/support/test_support.exs @@ -127,6 +127,8 @@ defmodule SymphonyElixir.TestSupport do max_turns: 20, max_retry_backoff_ms: 300_000, max_retries: 10, + min_retry_interval_ms: 60_000, + short_run_threshold_ms: 60_000, max_concurrent_agents_by_state: %{}, codex_command: "codex app-server", codex_approval_policy: %{reject: %{sandbox_approval: true, rules: true, mcp_elicitations: true}}, @@ -215,6 +217,8 @@ defmodule SymphonyElixir.TestSupport do max_turns = Keyword.get(config, :max_turns) max_retry_backoff_ms = Keyword.get(config, :max_retry_backoff_ms) max_retries = Keyword.get(config, :max_retries) + min_retry_interval_ms = Keyword.get(config, :min_retry_interval_ms) + short_run_threshold_ms = Keyword.get(config, :short_run_threshold_ms) max_concurrent_agents_by_state = Keyword.get(config, :max_concurrent_agents_by_state) codex_command = Keyword.get(config, :codex_command) codex_approval_policy = Keyword.get(config, :codex_approval_policy) @@ -305,6 +309,8 @@ defmodule SymphonyElixir.TestSupport do " max_turns: #{yaml_value(max_turns)}", " max_retry_backoff_ms: #{yaml_value(max_retry_backoff_ms)}", " max_retries: #{yaml_value(max_retries)}", + " min_retry_interval_ms: #{yaml_value(min_retry_interval_ms)}", + " short_run_threshold_ms: #{yaml_value(short_run_threshold_ms)}", " max_concurrent_agents_by_state: #{yaml_value(max_concurrent_agents_by_state)}", "codex:", " command: #{yaml_value(codex_command)}", diff --git a/orchestrator/elixir/test/symphony_elixir/core_test.exs b/orchestrator/elixir/test/symphony_elixir/core_test.exs index e8ce955..e46cc48 100644 --- a/orchestrator/elixir/test/symphony_elixir/core_test.exs +++ b/orchestrator/elixir/test/symphony_elixir/core_test.exs @@ -666,12 +666,16 @@ defmodule SymphonyElixir.CoreTest do initial_state = :sys.get_state(pid) + # started_at must be far enough in the past to exceed the short_run_threshold_ms + # (default 60s), otherwise the exit is classified as short-lived and gets backoff. + long_ago = DateTime.add(DateTime.utc_now(), -120, :second) + running_entry = %{ pid: self(), ref: ref, identifier: "MT-558", issue: %Issue{id: issue_id, identifier: "MT-558", state: "In Progress"}, - started_at: DateTime.utc_now() + started_at: long_ago } :sys.replace_state(pid, fn _ -> @@ -732,12 +736,14 @@ defmodule SymphonyElixir.CoreTest do initial_state = :sys.get_state(pid) + long_ago = DateTime.add(DateTime.utc_now(), -120, :second) + running_entry = %{ pid: self(), ref: ref, identifier: "MT-558P", issue: %Issue{id: issue_id, identifier: "MT-558P", state: "In Review"}, - started_at: DateTime.utc_now() + started_at: long_ago } :sys.replace_state(pid, fn _ -> @@ -782,12 +788,14 @@ defmodule SymphonyElixir.CoreTest do initial_state = :sys.get_state(pid) + long_ago = DateTime.add(DateTime.utc_now(), -120, :second) + running_entry = %{ pid: self(), ref: ref, identifier: "MT-558M", issue: %Issue{id: issue_id, identifier: "MT-558M", state: "In Review"}, - started_at: DateTime.utc_now() + started_at: long_ago } :sys.replace_state(pid, fn _ -> @@ -808,6 +816,9 @@ defmodule SymphonyElixir.CoreTest do end test "abnormal worker exit increments retry attempt progressively" do + # Disable min_retry_interval so the raw exponential backoff is testable. + write_workflow_file!(Workflow.workflow_file_path(), min_retry_interval_ms: 0) + issue_id = "issue-crash" ref = make_ref() orchestrator_name = Module.concat(__MODULE__, :CrashRetryOrchestrator) @@ -848,6 +859,9 @@ defmodule SymphonyElixir.CoreTest do end test "first abnormal worker exit waits before retrying" do + # Disable min_retry_interval so the raw exponential backoff is testable. + write_workflow_file!(Workflow.workflow_file_path(), min_retry_interval_ms: 0) + issue_id = "issue-crash-initial" ref = make_ref() orchestrator_name = Module.concat(__MODULE__, :InitialCrashRetryOrchestrator) @@ -886,6 +900,360 @@ defmodule SymphonyElixir.CoreTest do assert_due_in_range(due_at_ms, 9_000, 10_500) end + test "short-lived normal exit uses failure backoff instead of continuation retry" do + write_workflow_file!(Workflow.workflow_file_path(), + short_run_threshold_ms: 60_000, + min_retry_interval_ms: 60_000 + ) + + issue_id = "issue-short-lived" + ref = make_ref() + orchestrator_name = Module.concat(__MODULE__, :ShortLivedExitOrchestrator) + {:ok, pid} = Orchestrator.start_link(name: orchestrator_name) + + on_exit(fn -> + if Process.alive?(pid) do + Process.exit(pid, :normal) + end + end) + + initial_state = :sys.get_state(pid) + + # Worker started 30s ago — well below the 60s threshold. + recently = DateTime.add(DateTime.utc_now(), -30, :second) + + running_entry = %{ + pid: self(), + ref: ref, + identifier: "MT-SL1", + issue: %Issue{id: issue_id, identifier: "MT-SL1", state: "In Progress"}, + started_at: recently + } + + :sys.replace_state(pid, fn _ -> + initial_state + |> Map.put(:running, %{issue_id => running_entry}) + |> Map.put(:claimed, MapSet.new([issue_id])) + |> Map.put(:retry_attempts, %{}) + end) + + send(pid, {:DOWN, ref, :process, self(), :normal}) + Process.sleep(50) + state = :sys.get_state(pid) + + refute Map.has_key?(state.running, issue_id) + assert MapSet.member?(state.completed, issue_id) + + # Should schedule a retry with failure backoff, NOT a 1s continuation retry. + # With min_retry_interval_ms=60_000, the delay must be at least 60s. + assert %{attempt: attempt, due_at_ms: due_at_ms} = state.retry_attempts[issue_id] + assert attempt >= 1 + assert_due_in_range(due_at_ms, 59_500, 61_000) + end + + test "short-lived normal exit increments attempt on subsequent short-lived exits" do + write_workflow_file!(Workflow.workflow_file_path(), + short_run_threshold_ms: 60_000, + min_retry_interval_ms: 0 + ) + + issue_id = "issue-short-spin" + ref = make_ref() + orchestrator_name = Module.concat(__MODULE__, :ShortLivedSpinOrchestrator) + {:ok, pid} = Orchestrator.start_link(name: orchestrator_name) + + on_exit(fn -> + if Process.alive?(pid) do + Process.exit(pid, :normal) + end + end) + + initial_state = :sys.get_state(pid) + + # Simulate a retry_attempt=2 worker that ran for only 5 seconds. + recently = DateTime.add(DateTime.utc_now(), -5, :second) + + running_entry = %{ + pid: self(), + ref: ref, + identifier: "MT-SPIN", + retry_attempt: 2, + issue: %Issue{id: issue_id, identifier: "MT-SPIN", state: "In Progress"}, + started_at: recently + } + + :sys.replace_state(pid, fn _ -> + initial_state + |> Map.put(:running, %{issue_id => running_entry}) + |> Map.put(:claimed, MapSet.new([issue_id])) + |> Map.put(:retry_attempts, %{}) + end) + + send(pid, {:DOWN, ref, :process, self(), :normal}) + Process.sleep(50) + state = :sys.get_state(pid) + + # Attempt should increment from 2 to 3, and delay should use exponential backoff. + # attempt=3 → failure_retry_delay(3) = 10_000 * 2^2 = 40_000ms + assert %{attempt: 3, due_at_ms: due_at_ms} = state.retry_attempts[issue_id] + assert_due_in_range(due_at_ms, 38_500, 41_000) + end + + test "min_retry_interval_ms enforces a floor on failure retry delays" do + write_workflow_file!(Workflow.workflow_file_path(), min_retry_interval_ms: 120_000) + + issue_id = "issue-min-interval" + ref = make_ref() + orchestrator_name = Module.concat(__MODULE__, :MinIntervalOrchestrator) + {:ok, pid} = Orchestrator.start_link(name: orchestrator_name) + + on_exit(fn -> + if Process.alive?(pid) do + Process.exit(pid, :normal) + end + end) + + initial_state = :sys.get_state(pid) + + running_entry = %{ + pid: self(), + ref: ref, + identifier: "MT-FLOOR", + issue: %Issue{id: issue_id, identifier: "MT-FLOOR", state: "In Progress"}, + started_at: DateTime.utc_now() + } + + :sys.replace_state(pid, fn _ -> + initial_state + |> Map.put(:running, %{issue_id => running_entry}) + |> Map.put(:claimed, MapSet.new([issue_id])) + |> Map.put(:retry_attempts, %{}) + end) + + # Abnormal exit — base failure delay for attempt 1 would be 10s, + # but min_retry_interval_ms=120s raises the floor. + send(pid, {:DOWN, ref, :process, self(), :boom}) + Process.sleep(50) + state = :sys.get_state(pid) + + assert %{attempt: 1, due_at_ms: due_at_ms} = state.retry_attempts[issue_id] + assert_due_in_range(due_at_ms, 119_500, 121_000) + end + + test "short-lived exit with rate limit info uses retry_after_ms as backoff floor" do + write_workflow_file!(Workflow.workflow_file_path(), + short_run_threshold_ms: 60_000, + min_retry_interval_ms: 60_000 + ) + + issue_id = "issue-rate-limited" + ref = make_ref() + orchestrator_name = Module.concat(__MODULE__, :RateLimitBackoffOrchestrator) + {:ok, pid} = Orchestrator.start_link(name: orchestrator_name) + + on_exit(fn -> + if Process.alive?(pid) do + Process.exit(pid, :normal) + end + end) + + initial_state = :sys.get_state(pid) + + # Worker started 10s ago — well below the 60s threshold. + recently = DateTime.add(DateTime.utc_now(), -10, :second) + + # Simulate rate limit info captured during the worker's lifetime. + # primary bucket is exhausted with a 120s reset. + rate_limits = %{ + "limit_id" => "anthropic", + "primary" => %{"remaining" => 0, "limit" => 100, "reset_in_seconds" => 120}, + "secondary" => nil + } + + running_entry = %{ + pid: self(), + ref: ref, + identifier: "MT-RL1", + issue: %Issue{id: issue_id, identifier: "MT-RL1", state: "In Progress"}, + started_at: recently, + last_rate_limits: rate_limits + } + + :sys.replace_state(pid, fn _ -> + initial_state + |> Map.put(:running, %{issue_id => running_entry}) + |> Map.put(:claimed, MapSet.new([issue_id])) + |> Map.put(:retry_attempts, %{}) + end) + + send(pid, {:DOWN, ref, :process, self(), :normal}) + Process.sleep(50) + state = :sys.get_state(pid) + + refute Map.has_key?(state.running, issue_id) + assert MapSet.member?(state.completed, issue_id) + + # The retry_after_ms from the rate limit (120s) exceeds min_retry_interval_ms (60s), + # so the delay should be at least 120s. + assert %{attempt: attempt, due_at_ms: due_at_ms} = state.retry_attempts[issue_id] + assert attempt >= 1 + assert_due_in_range(due_at_ms, 119_000, 121_000) + end + + test "short-lived exit uses state-level rate limits as fallback for retry_after_ms" do + write_workflow_file!(Workflow.workflow_file_path(), + short_run_threshold_ms: 60_000, + min_retry_interval_ms: 60_000 + ) + + issue_id = "issue-state-rate-limit" + ref = make_ref() + orchestrator_name = Module.concat(__MODULE__, :StateRateLimitOrchestrator) + {:ok, pid} = Orchestrator.start_link(name: orchestrator_name) + + on_exit(fn -> + if Process.alive?(pid) do + Process.exit(pid, :normal) + end + end) + + initial_state = :sys.get_state(pid) + + recently = DateTime.add(DateTime.utc_now(), -10, :second) + + # No per-entry rate limits, but the state has global rate limit info + # from a previous worker's events. + state_rate_limits = %{ + "limit_id" => "anthropic", + "primary" => %{"remaining" => 0, "limit" => 100, "reset_in_seconds" => 90} + } + + running_entry = %{ + pid: self(), + ref: ref, + identifier: "MT-SRL", + issue: %Issue{id: issue_id, identifier: "MT-SRL", state: "In Progress"}, + started_at: recently + } + + :sys.replace_state(pid, fn _ -> + initial_state + |> Map.put(:running, %{issue_id => running_entry}) + |> Map.put(:claimed, MapSet.new([issue_id])) + |> Map.put(:retry_attempts, %{}) + |> Map.put(:worker_rate_limits, state_rate_limits) + end) + + send(pid, {:DOWN, ref, :process, self(), :normal}) + Process.sleep(50) + state = :sys.get_state(pid) + + # The state-level rate limit has 90s reset, which exceeds min_retry_interval_ms (60s), + # so the delay should be at least 90s. + assert %{attempt: attempt, due_at_ms: due_at_ms} = state.retry_attempts[issue_id] + assert attempt >= 1 + assert_due_in_range(due_at_ms, 89_000, 91_000) + end + + test "rate limit info from worker updates is stored in the running entry" do + issue_id = "issue-rl-update" + orchestrator_name = Module.concat(__MODULE__, :RateLimitUpdateOrchestrator) + {:ok, pid} = Orchestrator.start_link(name: orchestrator_name) + + on_exit(fn -> + if Process.alive?(pid) do + Process.exit(pid, :normal) + end + end) + + initial_state = :sys.get_state(pid) + + running_entry = %{ + pid: self(), + ref: make_ref(), + identifier: "MT-RLU", + session_id: nil, + issue: %Issue{id: issue_id, identifier: "MT-RLU", state: "In Progress"}, + started_at: DateTime.utc_now(), + last_worker_message: nil, + last_worker_timestamp: nil, + last_worker_event: nil, + worker_pid: nil, + worker_input_tokens: 0, + worker_output_tokens: 0, + worker_total_tokens: 0, + worker_last_reported_input_tokens: 0, + worker_last_reported_output_tokens: 0, + worker_last_reported_total_tokens: 0, + turn_count: 0 + } + + :sys.replace_state(pid, fn _ -> + initial_state + |> Map.put(:running, %{issue_id => running_entry}) + end) + + # Send a worker update with rate limit info + rate_limits = %{ + "limit_id" => "anthropic", + "primary" => %{"remaining" => 5, "limit" => 100, "reset_in_seconds" => 30} + } + + send( + pid, + {:worker_update, issue_id, + %{ + event: :notification, + timestamp: DateTime.utc_now(), + payload: %{"rate_limits" => rate_limits} + }} + ) + + Process.sleep(50) + state = :sys.get_state(pid) + + # Rate limits should be stored in the running entry + updated_entry = state.running[issue_id] + assert updated_entry.last_rate_limits == rate_limits + end + + test "extract_retry_after_ms returns 0 when buckets are not exhausted" do + alias SymphonyElixir.Orchestrator.Retry + + rate_limits = %{ + "limit_id" => "anthropic", + "primary" => %{"remaining" => 50, "limit" => 100, "reset_in_seconds" => 30} + } + + assert Retry.extract_retry_after_ms(rate_limits) == 0 + assert Retry.extract_retry_after_ms(nil) == 0 + assert Retry.extract_retry_after_ms(%{}) == 0 + end + + test "extract_retry_after_ms returns reset time when a bucket is exhausted" do + alias SymphonyElixir.Orchestrator.Retry + + rate_limits = %{ + "limit_id" => "anthropic", + "primary" => %{"remaining" => 0, "limit" => 100, "reset_in_seconds" => 95}, + "secondary" => %{"remaining" => 0, "limit" => 60, "reset_in_seconds" => 45} + } + + # Should return the max of all exhausted buckets: 95s = 95_000ms + assert Retry.extract_retry_after_ms(rate_limits) == 95_000 + end + + test "extract_retry_after_ms handles atom keys" do + alias SymphonyElixir.Orchestrator.Retry + + rate_limits = %{ + limit_id: "anthropic", + primary: %{remaining: 0, limit: 100, reset_in_seconds: 60} + } + + assert Retry.extract_retry_after_ms(rate_limits) == 60_000 + end + test "stale retry timer messages do not consume newer retry entries" do issue_id = "issue-stale-retry" orchestrator_name = Module.concat(__MODULE__, :StaleRetryOrchestrator) diff --git a/orchestrator/elixir/test/symphony_elixir/orchestrator_status_test.exs b/orchestrator/elixir/test/symphony_elixir/orchestrator_status_test.exs index b0cf85b..e42f8e1 100644 --- a/orchestrator/elixir/test/symphony_elixir/orchestrator_status_test.exs +++ b/orchestrator/elixir/test/symphony_elixir/orchestrator_status_test.exs @@ -965,9 +965,11 @@ defmodule SymphonyElixir.OrchestratorStatusTest do end test "orchestrator restarts stalled workers with retry backoff" do + # Disable min_retry_interval so the raw exponential backoff is testable. write_workflow_file!(Workflow.workflow_file_path(), tracker_api_token: nil, - codex_stall_timeout_ms: 1_000 + codex_stall_timeout_ms: 1_000, + min_retry_interval_ms: 0 ) issue_id = "issue-stall"