Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion orchestrator/elixir/lib/symphony_elixir/config/schema.ex
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ defmodule SymphonyElixir.Config.Schema do
field(:max_turns, :integer, default: 20)
field(:max_retry_backoff_ms, :integer, default: 300_000)
field(:max_retries, :integer, default: 10)
field(:min_retry_interval_ms, :integer, default: 60_000)
field(:short_run_threshold_ms, :integer, default: 60_000)
field(:max_concurrent_agents_by_state, :map, default: %{})
end

Expand All @@ -148,13 +150,23 @@ defmodule SymphonyElixir.Config.Schema do
schema
|> cast(
attrs,
[:max_concurrent_agents, :max_turns, :max_retry_backoff_ms, :max_retries, :max_concurrent_agents_by_state],
[
:max_concurrent_agents,
:max_turns,
:max_retry_backoff_ms,
:max_retries,
:min_retry_interval_ms,
:short_run_threshold_ms,
:max_concurrent_agents_by_state
],
empty_values: []
)
|> validate_number(:max_concurrent_agents, greater_than: 0)
|> validate_number(:max_turns, greater_than: 0)
|> validate_number(:max_retry_backoff_ms, greater_than: 0)
|> validate_number(:max_retries, greater_than: 0)
|> validate_number(:min_retry_interval_ms, greater_than_or_equal_to: 0)
|> validate_number(:short_run_threshold_ms, greater_than_or_equal_to: 0)
|> update_change(:max_concurrent_agents_by_state, &Schema.normalize_state_limits/1)
|> Schema.validate_state_limits(:max_concurrent_agents_by_state)
end
Expand Down
95 changes: 84 additions & 11 deletions orchestrator/elixir/lib/symphony_elixir/orchestrator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ defmodule SymphonyElixir.Orchestrator do

running_entry ->
{updated_running_entry, token_delta} = Metrics.integrate_worker_update(running_entry, update)
updated_running_entry = apply_entry_rate_limits(updated_running_entry, update)

state =
state
Expand Down Expand Up @@ -267,18 +268,39 @@ defmodule SymphonyElixir.Orchestrator do
end

defp handle_agent_exit_reason(state, issue_id, running_entry, session_id, :normal) do
Logger.info("Agent task completed for issue_id=#{issue_id} session_id=#{session_id}; evaluating continuation policy")
if Retry.short_lived_exit?(running_entry) do
runtime_ms = Retry.running_entry_runtime_ms(running_entry)

updated_state =
state
|> complete_issue(issue_id)
|> reconcile_completed_issue_lifecycle(issue_id, running_entry)
|> maybe_schedule_continuation_retry(issue_id, running_entry)
Logger.warning(
"Short-lived normal exit for issue_id=#{issue_id} session_id=#{session_id}" <>
" runtime_ms=#{runtime_ms}; treating as transient failure with backoff"
)

if retry_scheduled?(updated_state, issue_id) do
{updated_state, "waiting", "continuation_retry_scheduled"}
updated_state =
state
|> complete_issue(issue_id)
|> reconcile_completed_issue_lifecycle(issue_id, running_entry)
|> maybe_schedule_short_lived_retry(issue_id, running_entry, runtime_ms)

if retry_scheduled?(updated_state, issue_id) do
{updated_state, "waiting", "short_lived_exit_retry_scheduled"}
else
{updated_state, "success", "short_lived_exit_no_retry"}
end
else
{updated_state, "success", "agent_task_completed"}
Logger.info("Agent task completed for issue_id=#{issue_id} session_id=#{session_id}; evaluating continuation policy")

updated_state =
state
|> complete_issue(issue_id)
|> reconcile_completed_issue_lifecycle(issue_id, running_entry)
|> maybe_schedule_continuation_retry(issue_id, running_entry)

if retry_scheduled?(updated_state, issue_id) do
{updated_state, "waiting", "continuation_retry_scheduled"}
else
{updated_state, "success", "agent_task_completed"}
end
end
end

Expand Down Expand Up @@ -1187,6 +1209,45 @@ defmodule SymphonyElixir.Orchestrator do

defp maybe_schedule_continuation_retry(%State{} = state, _issue_id, _running_entry), do: state

defp maybe_schedule_short_lived_retry(%State{} = state, issue_id, running_entry, runtime_ms)
when is_binary(issue_id) and is_map(running_entry) do
next_attempt = Retry.next_retry_attempt_from_running(running_entry)
retry_after_ms = compute_retry_after_ms(running_entry, state)

metadata =
Map.merge(
%{
identifier: running_entry.identifier,
delay_type: :short_lived_exit,
worker_host: Map.get(running_entry, :worker_host),
workspace_path: Map.get(running_entry, :workspace_path),
runtime_ms: runtime_ms,
retry_after_ms: retry_after_ms
},
Retry.retry_runtime_metadata(running_entry)
)

case refresh_issue_for_continuation(issue_id) do
{:ok, %Issue{} = refreshed_issue} ->
if Dispatch.retry_candidate_issue?(refreshed_issue, Dispatch.terminal_state_set(), state.tracked) do
Retry.schedule_issue_retry(state, issue_id, next_attempt, metadata)
else
release_issue_claim(state, issue_id)
end

{:ok, :missing} ->
release_issue_claim(state, issue_id)

{:error, reason} ->
Logger.debug("Failed to refresh issue for short-lived retry issue_id=#{issue_id}: #{inspect(reason)}; scheduling retry with backoff")

Retry.schedule_issue_retry(state, issue_id, next_attempt, metadata)
end
end

defp maybe_schedule_short_lived_retry(%State{} = state, _issue_id, _running_entry, _runtime_ms),
do: state

defp handle_retry_issue(%State{} = state, issue_id, attempt, metadata) do
case Tracker.fetch_candidate_issues() do
{:ok, issues} ->
Expand Down Expand Up @@ -1658,8 +1719,6 @@ defmodule SymphonyElixir.Orchestrator do
)
end

defp emit_symphony_run_analytics(_running_entry, _status, _note, _metrics), do: :ok

defp retry_scheduled?(%State{} = state, issue_id) when is_binary(issue_id) do
Map.has_key?(state.retry_attempts, issue_id)
end
Expand Down Expand Up @@ -1760,6 +1819,20 @@ defmodule SymphonyElixir.Orchestrator do

defp apply_worker_token_delta(state, _token_delta), do: state

defp compute_retry_after_ms(running_entry, %State{} = state)
when is_map(running_entry) do
entry_retry_after = Retry.extract_retry_after_ms(Map.get(running_entry, :last_rate_limits))
state_retry_after = Retry.extract_retry_after_ms(state.worker_rate_limits)
max(entry_retry_after, state_retry_after)
Comment on lines +1824 to +1826
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Store an observed-at timestamp with rate-limit snapshots

reset_in_seconds is a relative countdown from when the worker update was emitted, but compute_retry_after_ms/2 reuses the raw snapshot with no observation time. If a run receives reset_in_seconds: 60, keeps working for another 45s, and then exits short-lived, we still schedule a fresh 60s delay instead of the ~15s remaining. Because state.worker_rate_limits is also reused globally, that stale floor can leak into later unrelated retries as well.

Useful? React with 👍 / 👎.

end

defp apply_entry_rate_limits(running_entry, update) when is_map(running_entry) and is_map(update) do
case Metrics.extract_rate_limits(update) do
%{} = rate_limits -> Map.put(running_entry, :last_rate_limits, rate_limits)
_ -> running_entry
end
end

defp apply_worker_rate_limits(%State{} = state, update) when is_map(update) do
case Metrics.extract_rate_limits(update) do
%{} = rate_limits ->
Expand Down
115 changes: 112 additions & 3 deletions orchestrator/elixir/lib/symphony_elixir/orchestrator/retry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,45 @@ defmodule SymphonyElixir.Orchestrator.Retry do
@spec retry_delay(pos_integer(), map()) :: pos_integer()
@doc """
Calculate retry delay based on attempt number and metadata.

Delay types:
- `:continuation` — worker ran long enough and exited normally; short delay
- `:short_lived_exit` — worker exited normally but ran below the short-run
threshold (e.g. hit a rate limit on the first API call); uses failure backoff
with `min_retry_interval_ms` floor
- anything else — failure backoff with `min_retry_interval_ms` floor
"""
def retry_delay(attempt, metadata) when is_integer(attempt) and attempt > 0 and is_map(metadata) do
if metadata[:delay_type] == :continuation and attempt == 1 do
@continuation_retry_delay_ms
base_delay =
if metadata[:delay_type] == :continuation and attempt == 1 do
@continuation_retry_delay_ms
else
failure_retry_delay(attempt)
end

enforce_minimum_interval(base_delay, metadata)
end

@spec enforce_minimum_interval(pos_integer(), map()) :: pos_integer()
@doc """
Apply the configured minimum retry interval as a floor.

Genuine continuation retries (long-running workers that completed some work)
are exempt from the floor — they should resume quickly. Short-lived exits
and failure retries always respect the floor.

When rate limit information is present in `metadata[:retry_after_ms]`, it is
used as an additional floor alongside `min_retry_interval_ms`. The final
delay is the maximum of the base delay, the config floor, and the
rate-limit-derived floor.
"""
def enforce_minimum_interval(delay_ms, metadata) when is_integer(delay_ms) and is_map(metadata) do
if metadata[:delay_type] == :continuation do
delay_ms
else
failure_retry_delay(attempt)
config_floor = Config.settings!().agent.min_retry_interval_ms
rate_limit_floor = Map.get(metadata, :retry_after_ms, 0)
Comment on lines +215 to +219
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve delay metadata when retries are re-queued

This new floor logic only works while metadata[:delay_type] and metadata[:retry_after_ms] survive in the queued retry entry, but schedule_issue_retry/4/pop_retry_attempt_state/3 still persist only the older identifier/error/path fields. After a continuation retry wakes up and handle_active_retry/4 re-queues it because slots are full (or a retry poll fails), the next attempt is reclassified as a generic failure and picks up the 60s floor; any rate-limit-derived floor is also dropped on subsequent requeues.

Useful? React with 👍 / 👎.

Enum.max([delay_ms, config_floor, rate_limit_floor])
end
end

Expand All @@ -197,6 +230,54 @@ defmodule SymphonyElixir.Orchestrator.Retry do
min(@failure_retry_base_ms * (1 <<< max_delay_power), Config.settings!().agent.max_retry_backoff_ms)
end

@spec short_lived_exit?(map()) :: boolean()
@doc """
Determine whether a running entry represents a short-lived run.

A run is short-lived when it completed in less time than the configured
`short_run_threshold_ms`. Short-lived normal exits are treated as transient
failures (e.g. rate limits) rather than genuine work completions.
"""
def short_lived_exit?(running_entry) when is_map(running_entry) do
threshold_ms = Config.settings!().agent.short_run_threshold_ms

case running_entry_runtime_ms(running_entry) do
ms when is_integer(ms) and ms < threshold_ms -> true
_ -> false
end
end

@spec running_entry_runtime_ms(map()) :: non_neg_integer() | nil
@doc """
Compute the runtime in milliseconds for a running entry.
"""
def running_entry_runtime_ms(%{started_at: %DateTime{} = started_at}) do
max(0, DateTime.diff(DateTime.utc_now(), started_at, :millisecond))
end

def running_entry_runtime_ms(_running_entry), do: nil

@spec extract_retry_after_ms(map() | nil) :: non_neg_integer()
@doc """
Extract a retry-after floor (in milliseconds) from rate limit data.

Inspects the `primary` and `secondary` buckets for `reset_in_seconds`
values. When a bucket's `remaining` count is zero (rate limited), its
`reset_in_seconds` is converted to milliseconds. The maximum across
all exhausted buckets is returned.

Returns `0` when no actionable rate limit data is present.
"""
def extract_retry_after_ms(rate_limits) when is_map(rate_limits) do
Enum.max([
bucket_retry_after_ms(rate_limits, ["primary", :primary]),
bucket_retry_after_ms(rate_limits, ["secondary", :secondary]),
0
])
end

def extract_retry_after_ms(_rate_limits), do: 0

@spec normalize_retry_attempt(integer() | any()) :: non_neg_integer()
@doc """
Normalize retry attempt to valid range.
Expand Down Expand Up @@ -229,6 +310,34 @@ defmodule SymphonyElixir.Orchestrator.Retry do
}
end

# Private helpers for rate-limit-aware backoff

defp bucket_retry_after_ms(rate_limits, keys) when is_map(rate_limits) and is_list(keys) do
bucket = Enum.find_value(keys, fn key -> Map.get(rate_limits, key) end)
bucket_reset_ms(bucket)
end

defp bucket_reset_ms(bucket) when is_map(bucket) do
remaining = bucket_remaining(bucket)
reset_seconds = bucket_reset_seconds(bucket)

if is_number(remaining) and remaining <= 0 and is_number(reset_seconds) and reset_seconds > 0 do
round(reset_seconds * 1_000)
else
0
end
end

defp bucket_reset_ms(_bucket), do: 0

defp bucket_remaining(bucket) do
Map.get(bucket, "remaining") || Map.get(bucket, :remaining)
end

defp bucket_reset_seconds(bucket) do
Map.get(bucket, "reset_in_seconds") || Map.get(bucket, :reset_in_seconds)
end

# Private helper functions for picking retry values
defp pick_retry_identifier(issue_id, previous_retry, metadata) do
metadata[:identifier] || Map.get(previous_retry, :identifier) || issue_id
Expand Down
6 changes: 6 additions & 0 deletions orchestrator/elixir/test/support/test_support.exs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ defmodule SymphonyElixir.TestSupport do
max_turns: 20,
max_retry_backoff_ms: 300_000,
max_retries: 10,
min_retry_interval_ms: 60_000,
short_run_threshold_ms: 60_000,
max_concurrent_agents_by_state: %{},
codex_command: "codex app-server",
codex_approval_policy: %{reject: %{sandbox_approval: true, rules: true, mcp_elicitations: true}},
Expand Down Expand Up @@ -215,6 +217,8 @@ defmodule SymphonyElixir.TestSupport do
max_turns = Keyword.get(config, :max_turns)
max_retry_backoff_ms = Keyword.get(config, :max_retry_backoff_ms)
max_retries = Keyword.get(config, :max_retries)
min_retry_interval_ms = Keyword.get(config, :min_retry_interval_ms)
short_run_threshold_ms = Keyword.get(config, :short_run_threshold_ms)
max_concurrent_agents_by_state = Keyword.get(config, :max_concurrent_agents_by_state)
codex_command = Keyword.get(config, :codex_command)
codex_approval_policy = Keyword.get(config, :codex_approval_policy)
Expand Down Expand Up @@ -305,6 +309,8 @@ defmodule SymphonyElixir.TestSupport do
" max_turns: #{yaml_value(max_turns)}",
" max_retry_backoff_ms: #{yaml_value(max_retry_backoff_ms)}",
" max_retries: #{yaml_value(max_retries)}",
" min_retry_interval_ms: #{yaml_value(min_retry_interval_ms)}",
" short_run_threshold_ms: #{yaml_value(short_run_threshold_ms)}",
" max_concurrent_agents_by_state: #{yaml_value(max_concurrent_agents_by_state)}",
"codex:",
" command: #{yaml_value(codex_command)}",
Expand Down
Loading
Loading