diff --git a/agents/Aevatar.GAgents.ChannelRuntime/AgentBuilderTool.cs b/agents/Aevatar.GAgents.ChannelRuntime/AgentBuilderTool.cs index 6bc01fd7..623f8ad9 100644 --- a/agents/Aevatar.GAgents.ChannelRuntime/AgentBuilderTool.cs +++ b/agents/Aevatar.GAgents.ChannelRuntime/AgentBuilderTool.cs @@ -19,11 +19,23 @@ public sealed class AgentBuilderTool : IAgentTool { private readonly IServiceProvider _serviceProvider; private readonly ILogger? _logger; - - public AgentBuilderTool(IServiceProvider serviceProvider, ILogger? logger = null) + // Per-instance polling budget for actor -> projector -> document store + // propagation. Defaults to ProjectionWaitDefaults (15 s); tests inject + // shrunk values via the constructor instead of mutating a process-global, + // which would race other tests if the test surface ever parallelizes. + private readonly int _projectionWaitAttempts; + private readonly int _projectionWaitDelayMilliseconds; + + public AgentBuilderTool( + IServiceProvider serviceProvider, + ILogger? logger = null, + int projectionWaitAttempts = ProjectionWaitDefaults.Attempts, + int projectionWaitDelayMilliseconds = ProjectionWaitDefaults.DelayMilliseconds) { _serviceProvider = serviceProvider; _logger = logger; + _projectionWaitAttempts = projectionWaitAttempts; + _projectionWaitDelayMilliseconds = projectionWaitDelayMilliseconds; } public string Name => "agent_builder"; @@ -252,6 +264,12 @@ private async Task CreateDailyReportAgentAsync( ?? await actorRuntime.CreateAsync(agentId, ct); var versionBefore = await queryPort.GetStateVersionAsync(agentId, ct) ?? -1; + + // Prime the projection scope BEFORE dispatch — see DeleteAgentAsync for + // the rationale. A late prime can't recover an event the projector + // already missed. + await EnsureUserAgentCatalogProjectionAsync(ct); + var deliveryTarget = ResolveDeliveryTarget(conversationId, agentId); var initialize = new InitializeSkillRunnerCommand { @@ -288,7 +306,6 @@ await actor.HandleEventAsync( BuildDirectEnvelope(actor.Id, new TriggerSkillRunnerExecutionCommand { Reason = "create_agent" }), ct); - await EnsureUserAgentCatalogProjectionAsync(ct); var confirmed = await WaitForCreatedAgentAsync( queryPort, agentId, @@ -398,6 +415,12 @@ private async Task CreateSocialMediaAgentAsync( ?? await actorRuntime.CreateAsync(agentId, ct); var versionBefore = await queryPort.GetStateVersionAsync(agentId, ct) ?? -1; + + // Prime the projection scope BEFORE dispatch — see DeleteAgentAsync for + // the rationale. A late prime can't recover an event the projector + // already missed. + await EnsureUserAgentCatalogProjectionAsync(ct); + var deliveryTarget = ResolveDeliveryTarget(conversationId, agentId); var initialize = new InitializeWorkflowAgentCommand { @@ -422,7 +445,6 @@ private async Task CreateSocialMediaAgentAsync( await actor.HandleEventAsync(BuildDirectEnvelope(actor.Id, initialize), ct); - await EnsureUserAgentCatalogProjectionAsync(ct); var confirmed = await WaitForCreatedAgentAsync( queryPort, agentId, @@ -514,6 +536,18 @@ private async Task DeleteAgentAsync( }); } + // Capture the read-model version before issuing tombstone so the wait can + // distinguish "projection caught up" from "projector did not run yet". + var versionBefore = await queryPort.GetStateVersionAsync(entry.AgentId, ct) ?? -1; + + // Prime the projection scope BEFORE any dispatch. If we primed after + // HandleEventAsync, an idle-deactivated projection grain would have + // already missed the published event and a late activation could not + // recover it (the activation contract is "be alive when the event + // arrives", not "replay missed events"). Activating up front costs at + // most one extra warm-grain round trip. + await EnsureUserAgentCatalogProjectionAsync(ct); + var disableResult = await DispatchAgentLifecycleAsync(entry, actorRuntime, "delete_agent", LifecycleAction.Disable, null, ct); if (disableResult.error != null) return disableResult.error; @@ -527,42 +561,41 @@ await registryActor.HandleEventAsync( BuildDirectEnvelope(registryActor.Id, new UserAgentCatalogTombstoneCommand { AgentId = entry.AgentId }), ct); - for (var attempt = 0; attempt < 10; attempt++) - { - if (attempt > 0) - await Task.Delay(500, ct); + var deleted = await WaitForTombstoneReflectedAsync( + queryPort, + entry.AgentId, + versionBefore, + ct, + _projectionWaitAttempts, + _projectionWaitDelayMilliseconds); - if (await queryPort.GetAsync(entry.AgentId, ct) == null) + var ownerFilter = !string.IsNullOrWhiteSpace(entry.OwnerNyxUserId) + ? entry.OwnerNyxUserId + : await ResolveCurrentUserIdAsync(nyxClient, token, ct); + var agents = await QueryAgentsForOwnerAsync(queryPort, ownerFilter, ct); + + if (deleted) + { + return JsonSerializer.Serialize(new { - var ownerFilter = !string.IsNullOrWhiteSpace(entry.OwnerNyxUserId) - ? entry.OwnerNyxUserId - : await ResolveCurrentUserIdAsync(nyxClient, token, ct); - var agents = await QueryAgentsForOwnerAsync(queryPort, ownerFilter, ct); - return JsonSerializer.Serialize(new - { - status = "deleted", - agent_id = entry.AgentId, - revoked_api_key_id = entry.ApiKeyId, - delete_notice = $"Deleted agent `{entry.AgentId}`. Revoked API key: `{entry.ApiKeyId ?? "n/a"}`.", - agents, - total = agents.Length, - }); - } + status = "deleted", + agent_id = entry.AgentId, + revoked_api_key_id = entry.ApiKeyId, + delete_notice = $"Deleted agent `{entry.AgentId}`. Revoked API key: `{entry.ApiKeyId ?? "n/a"}`.", + agents, + total = agents.Length, + }); } - var acceptedOwnerFilter = !string.IsNullOrWhiteSpace(entry.OwnerNyxUserId) - ? entry.OwnerNyxUserId - : await ResolveCurrentUserIdAsync(nyxClient, token, ct); - var acceptedAgents = await QueryAgentsForOwnerAsync(queryPort, acceptedOwnerFilter, ct); return JsonSerializer.Serialize(new { status = "accepted", agent_id = entry.AgentId, revoked_api_key_id = entry.ApiKeyId, delete_notice = $"Delete submitted for `{entry.AgentId}`. Revoked API key: `{entry.ApiKeyId ?? "n/a"}`.", - agents = acceptedAgents, - total = acceptedAgents.Length, - note = "Delete was submitted but registry tombstone is not yet reflected.", + agents, + total = agents.Length, + note = "Tombstone is propagating. Run /agents in a few seconds to confirm the agent is gone.", }); } @@ -617,12 +650,31 @@ private async Task DisableAgentAsync( string.Equals(entry.value.Status, WorkflowAgentDefaults.StatusDisabled, StringComparison.Ordinal)) return SerializeAgentStatus(entry.value, "Agent is already disabled."); + // Capture baseline version BEFORE dispatch so the wait can distinguish + // "projection has materialized this disable" from "stale read replica + // happens to surface a historical disabled status". Capture must + // precede dispatch — capturing inside the wait helper would race + // against a fast projection that already advanced the version. + var versionBefore = await queryPort.GetStateVersionAsync(entry.value.AgentId, ct) ?? -1; + + // Prime the projection scope BEFORE dispatch — see DeleteAgentAsync for + // the rationale. A late prime can't recover an event the projector + // already missed. + await EnsureUserAgentCatalogProjectionAsync(ct); + var dispatch = await DispatchAgentLifecycleAsync(entry.value, actorRuntime, "disable_agent", LifecycleAction.Disable, null, ct); if (dispatch.error != null) return dispatch.error; - var after = await WaitForAgentStatusAsync(queryPort, entry.value.AgentId, SkillRunnerDefaults.StatusDisabled, ct) ?? entry.value; - return SerializeAgentStatus(after, "Agent disabled. Scheduling paused."); + var observation = await WaitForAgentStatusAsync(queryPort, entry.value.AgentId, versionBefore, SkillRunnerDefaults.StatusDisabled, ct); + if (observation.Confirmed) + return SerializeAgentStatus(observation.Entry!, "Agent disabled. Scheduling paused."); + + // Dual gate never passed — the disable was dispatched but the read + // model has not confirmed the lifecycle change within the wait + // budget. Surface the pre-dispatch entry with an honest propagating + // note so the caller (LLM/user) does not assume the agent is paused. + return SerializeAgentStatus(entry.value, "Disable submitted. Run /agent-status in a few seconds to confirm the agent is paused."); } private async Task EnableAgentAsync( @@ -639,12 +691,25 @@ private async Task EnableAgentAsync( string.Equals(entry.value.Status, WorkflowAgentDefaults.StatusRunning, StringComparison.Ordinal)) return SerializeAgentStatus(entry.value, "Agent is already enabled."); + // See DisableAgentAsync for why versionBefore is captured here (before + // any dispatch) and not inside WaitForAgentStatusAsync. + var versionBefore = await queryPort.GetStateVersionAsync(entry.value.AgentId, ct) ?? -1; + + // Prime the projection scope BEFORE dispatch — see DeleteAgentAsync for + // the rationale. A late prime can't recover an event the projector + // already missed. + await EnsureUserAgentCatalogProjectionAsync(ct); + var dispatch = await DispatchAgentLifecycleAsync(entry.value, actorRuntime, "enable_agent", LifecycleAction.Enable, null, ct); if (dispatch.error != null) return dispatch.error; - var after = await WaitForAgentStatusAsync(queryPort, entry.value.AgentId, SkillRunnerDefaults.StatusRunning, ct) ?? entry.value; - return SerializeAgentStatus(after, "Agent enabled. Scheduling resumed."); + var observation = await WaitForAgentStatusAsync(queryPort, entry.value.AgentId, versionBefore, SkillRunnerDefaults.StatusRunning, ct); + if (observation.Confirmed) + return SerializeAgentStatus(observation.Entry!, "Agent enabled. Scheduling resumed."); + + // See DisableAgentAsync for the rationale on the un-confirmed branch. + return SerializeAgentStatus(entry.value, "Enable submitted. Run /agent-status in a few seconds to confirm the agent is running."); } private static EventEnvelope BuildDirectEnvelope(string targetActorId, IMessage payload) @@ -776,23 +841,88 @@ private async Task EnsureUserAgentCatalogProjectionAsync(CancellationToken ct) await projectionPort.EnsureProjectionForActorAsync(UserAgentCatalogGAgent.WellKnownId, ct); } - private async Task WaitForAgentStatusAsync( + private async Task<(bool Confirmed, UserAgentCatalogEntry? Entry)> WaitForAgentStatusAsync( IUserAgentCatalogQueryPort queryPort, string agentId, + long versionBefore, string expectedStatus, CancellationToken ct) { - for (var attempt = 0; attempt < 10; attempt++) + // Status + version dual-condition (mirrors WaitForCreatedAgentAsync): + // wait until the read model both advances past the caller-captured + // baseline AND surfaces the expected status. Status alone is not + // enough — a stale replica can hold an expected-looking historical + // status (e.g., a previous disable→enable→disable cycle) and pass a + // status-only check while the actor has not yet processed *this* + // dispatch. Conversely, version alone is not enough either — an + // unrelated state event could advance the version without changing + // status. Both conditions together pin "this specific lifecycle + // event has materialized in the read model". Caller must capture + // versionBefore *before* dispatch, otherwise a fast projection that + // already advanced the version would make versionAfter == versionBefore + // and burn the entire budget. Projection scope priming also happens + // in the caller before dispatch (see DisableAgentAsync / + // EnableAgentAsync) — a late prime here cannot recover an event the + // projector already missed. + for (var attempt = 0; attempt < _projectionWaitAttempts; attempt++) { if (attempt > 0) - await Task.Delay(500, ct); + await Task.Delay(_projectionWaitDelayMilliseconds, ct); + + var versionAfter = await queryPort.GetStateVersionAsync(agentId, ct) ?? -1; + if (versionAfter <= versionBefore) + continue; var entry = await queryPort.GetAsync(agentId, ct); if (entry != null && string.Equals(entry.Status, expectedStatus, StringComparison.Ordinal)) - return entry; + return (Confirmed: true, Entry: entry); } - return await queryPort.GetAsync(agentId, ct); + // Budget exhausted: the dual gate never passed. Do NOT fall back to an + // un-gated GetAsync read — that would surface a stale-but-expected- + // looking entry and let callers report success despite the contract + // not being satisfied. Callers must surface honest "submitted / + // propagating" copy when Confirmed is false. + return (Confirmed: false, Entry: null); + } + + /// + /// Polls the read model until the agent's tombstoned state is reflected as a + /// document deletion. The read-model contract guarantees that a tombstoned + /// entry causes to dispatch + /// DeleteAsync; document absence is therefore the authoritative signal. + /// + private static async Task WaitForTombstoneReflectedAsync( + IUserAgentCatalogQueryPort queryPort, + string agentId, + long versionBefore, + CancellationToken ct, + int maxAttempts = ProjectionWaitDefaults.Attempts, + int delayMilliseconds = ProjectionWaitDefaults.DelayMilliseconds) + { + for (var attempt = 0; attempt < maxAttempts; attempt++) + { + if (attempt > 0) + await Task.Delay(delayMilliseconds, ct); + + // GetStateVersionAsync reads the same document; if it is null the + // document has been deleted by the projector. + var versionAfter = await queryPort.GetStateVersionAsync(agentId, ct); + if (versionAfter == null) + return true; + + if (versionAfter.Value <= versionBefore) + continue; + + // Version advanced (a fresh state event reached the projector) but the + // document still exists; if it is the tombstoned entry the projector + // would have deleted it on the same advance, so a non-null entry means + // either an interleaving upsert or a stale read replica - keep waiting. + if (await queryPort.GetAsync(agentId, ct) == null) + return true; + } + + return false; } private async Task<(bool success, string? error)> DispatchAgentLifecycleAsync( diff --git a/agents/Aevatar.GAgents.ChannelRuntime/AgentDeliveryTargetTool.cs b/agents/Aevatar.GAgents.ChannelRuntime/AgentDeliveryTargetTool.cs index 785a682f..effa80fc 100644 --- a/agents/Aevatar.GAgents.ChannelRuntime/AgentDeliveryTargetTool.cs +++ b/agents/Aevatar.GAgents.ChannelRuntime/AgentDeliveryTargetTool.cs @@ -14,10 +14,20 @@ namespace Aevatar.GAgents.ChannelRuntime; public sealed class AgentDeliveryTargetTool : IAgentTool { private readonly IServiceProvider _serviceProvider; - - public AgentDeliveryTargetTool(IServiceProvider serviceProvider) + // Per-instance polling budget (see ProjectionWaitDefaults). Tests inject + // shrunk values via the constructor to exercise the budget-exhausted + // branch without burning the production 15 s. + private readonly int _projectionWaitAttempts; + private readonly int _projectionWaitDelayMilliseconds; + + public AgentDeliveryTargetTool( + IServiceProvider serviceProvider, + int projectionWaitAttempts = ProjectionWaitDefaults.Attempts, + int projectionWaitDelayMilliseconds = ProjectionWaitDefaults.DelayMilliseconds) { _serviceProvider = serviceProvider; + _projectionWaitAttempts = projectionWaitAttempts; + _projectionWaitDelayMilliseconds = projectionWaitDelayMilliseconds; } public string Name => "agent_delivery_targets"; @@ -216,10 +226,10 @@ private async Task UpsertAsync( await actor.HandleEventAsync(envelope); var confirmed = false; - for (var attempt = 0; attempt < 10; attempt++) + for (var attempt = 0; attempt < _projectionWaitAttempts; attempt++) { if (attempt > 0) - await Task.Delay(500, ct); + await Task.Delay(_projectionWaitDelayMilliseconds, ct); var versionAfter = await queryPort.GetStateVersionAsync(agentId.value!, ct) ?? -1; if (versionAfter <= versionBefore) @@ -289,6 +299,15 @@ private async Task DeleteAsync( }); } + // Capture version + ensure projection scope is alive (matches the Upsert path + // above). Without priming, an idle-deactivated projection grain leaves the + // tombstone enqueued with no consumer and the document persists indefinitely. + var versionBefore = await queryPort.GetStateVersionAsync(agentId, ct) ?? -1; + + var projectionPort = _serviceProvider.GetService(); + if (projectionPort != null) + await projectionPort.EnsureProjectionForActorAsync(UserAgentCatalogGAgent.WellKnownId, ct); + var actor = await actorRuntime.GetAsync(UserAgentCatalogGAgent.WellKnownId) ?? await actorRuntime.CreateAsync(UserAgentCatalogGAgent.WellKnownId); @@ -310,12 +329,24 @@ private async Task DeleteAsync( // Tombstone triggers IProjectionWriteDispatcher.DeleteAsync (Channel RFC §7.1.1), // which also removes the document's projected StateVersion. Gate confirmation - // purely on document absence — versionAfter would be null after the delete lands. + // on either document absence or a state-version advance that materializes the + // delete — the prior absence-only check returned false negatives whenever the + // 5 s budget lost the race to projection lag. var confirmed = false; - for (var attempt = 0; attempt < 10; attempt++) + for (var attempt = 0; attempt < _projectionWaitAttempts; attempt++) { if (attempt > 0) - await Task.Delay(500, ct); + await Task.Delay(_projectionWaitDelayMilliseconds, ct); + + var versionAfter = await queryPort.GetStateVersionAsync(agentId, ct); + if (versionAfter == null) + { + confirmed = true; + break; + } + + if (versionAfter.Value <= versionBefore) + continue; if (await queryPort.GetAsync(agentId, ct) == null) { @@ -329,7 +360,7 @@ private async Task DeleteAsync( status = confirmed ? "deleted" : "accepted", agent_id = agentId, delivery_target_id = agentId, - note = confirmed ? "" : "Delete submitted but projection not yet confirmed. Try 'list' after a few seconds.", + note = confirmed ? "" : "Tombstone is propagating. Try 'list' in a few seconds to confirm the delivery target is gone.", }); } diff --git a/agents/Aevatar.GAgents.ChannelRuntime/ProjectionWaitDefaults.cs b/agents/Aevatar.GAgents.ChannelRuntime/ProjectionWaitDefaults.cs new file mode 100644 index 00000000..51b7c9db --- /dev/null +++ b/agents/Aevatar.GAgents.ChannelRuntime/ProjectionWaitDefaults.cs @@ -0,0 +1,12 @@ +namespace Aevatar.GAgents.ChannelRuntime; + +/// +/// Shared default polling budget for tools that wait on the read model after +/// dispatching a write to the user agent catalog actor. 30 attempts × 500 ms +/// = 15 s — covers the production projection lag the prior 5 s budget lost to. +/// +internal static class ProjectionWaitDefaults +{ + public const int Attempts = 30; + public const int DelayMilliseconds = 500; +} diff --git a/test/Aevatar.GAgents.ChannelRuntime.Tests/AgentBuilderToolTests.cs b/test/Aevatar.GAgents.ChannelRuntime.Tests/AgentBuilderToolTests.cs index 3d19f988..97405d87 100644 --- a/test/Aevatar.GAgents.ChannelRuntime.Tests/AgentBuilderToolTests.cs +++ b/test/Aevatar.GAgents.ChannelRuntime.Tests/AgentBuilderToolTests.cs @@ -1508,6 +1508,102 @@ await registryActor.Received(1).HandleEventAsync( } } + [Fact] + public async Task ExecuteAsync_DeleteAgent_ReturnsAcceptedWithPropagatingHint_WhenTombstoneDoesNotReflectWithinBudget() + { + // Production bug class: with the old 5 s polling budget, /delete-agent + // routinely returned "accepted" + "tombstone is not yet reflected" while + // the document was still visible to /agents minutes later. This guard + // proves that when the read model legitimately stays behind, the user- + // facing payload now nudges the user to retry rather than implying the + // delete might not have landed at all. + var queryPort = Substitute.For(); + queryPort.GetAsync("skill-runner-stuck", Arg.Any()) + .Returns(Task.FromResult(new UserAgentCatalogEntry + { + AgentId = "skill-runner-stuck", + AgentType = SkillRunnerDefaults.AgentType, + TemplateName = "daily_report", + ApiKeyId = "key-stuck", + OwnerNyxUserId = "user-1", + })); + // Read-model lags forever in this test: GetStateVersionAsync keeps + // returning the same version (the projector never advances past it), + // and GetAsync keeps surfacing the entry. + queryPort.GetStateVersionAsync("skill-runner-stuck", Arg.Any()) + .Returns(Task.FromResult(7L)); + queryPort.QueryAllAsync(Arg.Any()) + .Returns(Task.FromResult>( + [new UserAgentCatalogEntry { AgentId = "skill-runner-stuck", OwnerNyxUserId = "user-1" }])); + + var skillRunnerActor = Substitute.For(); + skillRunnerActor.Id.Returns("skill-runner-stuck"); + var registryActor = Substitute.For(); + registryActor.Id.Returns(UserAgentCatalogGAgent.WellKnownId); + + var actorRuntime = Substitute.For(); + actorRuntime.GetAsync("skill-runner-stuck").Returns(Task.FromResult(skillRunnerActor)); + actorRuntime.GetAsync(UserAgentCatalogGAgent.WellKnownId).Returns(Task.FromResult(registryActor)); + + var handler = new RoutingJsonHandler(); + handler.Add(HttpMethod.Delete, "/api/v1/api-keys/key-stuck", """{"ok":true}"""); + var nyxClient = new NyxIdApiClient( + new NyxIdToolOptions { BaseUrl = "https://nyx.example.com" }, + new HttpClient(handler) { BaseAddress = new Uri("https://nyx.example.com") }); + + var services = new ServiceCollection(); + services.AddSingleton(queryPort); + services.AddSingleton(actorRuntime); + services.AddSingleton(nyxClient); + // Inject a shrunk wait budget per-instance (3 attempts × 1 ms) so the + // not-reflected branch fires in <100 ms instead of the production + // 15 s. Per-instance state replaces the earlier mutable-static + // approach (codex review r3141706856) so concurrent test classes + // that exercise other AgentBuilderTool paths cannot be poisoned by + // shrunk values leaking through process-global state. + var tool = new AgentBuilderTool( + services.BuildServiceProvider(), + projectionWaitAttempts: 3, + projectionWaitDelayMilliseconds: 1); + + AgentToolRequestContext.CurrentMetadata = new Dictionary + { + [LLMRequestMetadataKeys.NyxIdAccessToken] = "session-token", + }; + try + { + var result = await tool.ExecuteAsync(""" + { + "action": "delete_agent", + "agent_id": "skill-runner-stuck", + "confirm": true + } + """); + + using var doc = JsonDocument.Parse(result); + doc.RootElement.GetProperty("status").GetString().Should().Be("accepted"); + doc.RootElement.GetProperty("revoked_api_key_id").GetString().Should().Be("key-stuck"); + doc.RootElement.GetProperty("delete_notice").GetString() + .Should().Contain("Delete submitted for"); + // The new copy must point users at /agents to verify rather than + // implying the tombstone did not land. + doc.RootElement.GetProperty("note").GetString() + .Should().Contain("propagating") + .And.Contain("/agents"); + + await registryActor.Received(1).HandleEventAsync( + Arg.Is(e => + e.Payload != null && + e.Payload.Is(UserAgentCatalogTombstoneCommand.Descriptor) && + e.Payload.Unpack().AgentId == "skill-runner-stuck"), + Arg.Any()); + } + finally + { + AgentToolRequestContext.CurrentMetadata = null; + } + } + [Fact] public async Task ExecuteAsync_RunAgent_DispatchesManualTrigger() { @@ -1682,6 +1778,205 @@ await workflowAgentActor.Received(1).HandleEventAsync( } } + [Fact] + public async Task ExecuteAsync_DisableAgent_ReturnsStatusFast_WhenProjectionAdvancesOnFirstPoll() + { + // Pins the new version+status dual-gate fast-exit contract: when the + // caller-captured baseline is X and the read model advances to X+1 + // with status==expected on the very first post-dispatch poll, the + // wait helper must exit immediately (<1 s) instead of running the + // full 15 s budget. This guards against two regressions: + // + // 1. Re-introducing a status-only check (codex P3 in this PR's + // thread): would accept a stale replica that already happens to + // hold the expected historical status, returning before the + // dispatch is actually materialized. + // + // 2. Re-introducing the *helper-side* baseline capture (codex P2 in + // PR #413's first review pass): would capture versionBefore + // after dispatch, so a fast projection that already advanced + // the version would make versionAfter == versionBefore on every + // poll and burn the full budget. + // + // Both regressions make this test fail (case 1 by accepting before + // the dispatch, case 2 by deadlocking past the 1 s ceiling). + var queryPort = Substitute.For(); + queryPort.GetAsync("skill-runner-fast", Arg.Any()) + .Returns( + // RequireManagedAgentAsync's existence check sees the pre-disable status. + Task.FromResult(new UserAgentCatalogEntry + { + AgentId = "skill-runner-fast", + AgentType = SkillRunnerDefaults.AgentType, + TemplateName = "daily_report", + Status = SkillRunnerDefaults.StatusRunning, + }), + // Wait helper's first poll sees the materialized disable. + Task.FromResult(new UserAgentCatalogEntry + { + AgentId = "skill-runner-fast", + AgentType = SkillRunnerDefaults.AgentType, + TemplateName = "daily_report", + Status = SkillRunnerDefaults.StatusDisabled, + })); + // Caller's pre-dispatch baseline read returns 42; helper's post- + // dispatch poll sees 43 (the projection materialized the disable on + // the very next state event). Both checks pass on the first iteration. + queryPort.GetStateVersionAsync("skill-runner-fast", Arg.Any()) + .Returns( + Task.FromResult(42L), + Task.FromResult(43L)); + + var skillRunnerActor = Substitute.For(); + skillRunnerActor.Id.Returns("skill-runner-fast"); + var actorRuntime = Substitute.For(); + actorRuntime.GetAsync("skill-runner-fast").Returns(Task.FromResult(skillRunnerActor)); + + var services = new ServiceCollection(); + services.AddSingleton(queryPort); + services.AddSingleton(actorRuntime); + services.AddSingleton(new NyxIdApiClient( + new NyxIdToolOptions { BaseUrl = "https://nyx.example.com" }, + new HttpClient(new RoutingJsonHandler()) + { + BaseAddress = new Uri("https://nyx.example.com"), + })); + var tool = new AgentBuilderTool(services.BuildServiceProvider()); + + AgentToolRequestContext.CurrentMetadata = new Dictionary + { + [LLMRequestMetadataKeys.NyxIdAccessToken] = "session-token", + }; + try + { + var stopwatch = System.Diagnostics.Stopwatch.StartNew(); + var result = await tool.ExecuteAsync(""" + { + "action": "disable_agent", + "agent_id": "skill-runner-fast" + } + """); + stopwatch.Stop(); + + using var doc = JsonDocument.Parse(result); + doc.RootElement.GetProperty("status").GetString().Should().Be(SkillRunnerDefaults.StatusDisabled); + // 1 s ceiling: any regression that prevents a dual-gate first-poll + // exit would burn the full ProjectionWaitAttempts × + // ProjectionWaitDelayMilliseconds budget (15 s by default). + stopwatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(1)); + } + finally + { + AgentToolRequestContext.CurrentMetadata = null; + } + } + + [Fact] + public async Task ExecuteAsync_DisableAgent_KeepsWaitingWhenStatusMatchesButVersionStale() + { + // Stale-replica defense: a read replica can surface a historically + // expected status (e.g., a previous disable→enable→disable cycle + // left the entry's last-projected status as Disabled in some replica) + // while the current actor has not yet processed *this* dispatch. + // Status-only polling would accept this replica and return prematurely + // before the dispatch materializes. The dual gate keeps waiting + // until version advances. + var queryPort = Substitute.For(); + queryPort.GetAsync("skill-runner-stale", Arg.Any()) + .Returns( + // RequireManagedAgentAsync sees the canonical Running state + // because that is what the caller observed when issuing the + // disable. (A different replica surfaces stale Disabled below.) + Task.FromResult(new UserAgentCatalogEntry + { + AgentId = "skill-runner-stale", + AgentType = SkillRunnerDefaults.AgentType, + TemplateName = "daily_report", + Status = SkillRunnerDefaults.StatusRunning, + }), + // Helper's terminal fallback (after budget exhausts) returns + // a stale-but-expected-looking Disabled. With status-only + // polling the wait would have returned this entry on the + // first iteration. With the dual gate the version stays at + // baseline, so the version check short-circuits before the + // status check is even reached. + Task.FromResult(new UserAgentCatalogEntry + { + AgentId = "skill-runner-stale", + AgentType = SkillRunnerDefaults.AgentType, + TemplateName = "daily_report", + Status = SkillRunnerDefaults.StatusDisabled, + })); + // Caller baseline = 7; replica's view never advances past 7. Helper + // must keep iterating; we shrink the budget so the test finishes fast. + queryPort.GetStateVersionAsync("skill-runner-stale", Arg.Any()) + .Returns(Task.FromResult(7L)); + + var skillRunnerActor = Substitute.For(); + skillRunnerActor.Id.Returns("skill-runner-stale"); + var actorRuntime = Substitute.For(); + actorRuntime.GetAsync("skill-runner-stale").Returns(Task.FromResult(skillRunnerActor)); + + var services = new ServiceCollection(); + services.AddSingleton(queryPort); + services.AddSingleton(actorRuntime); + services.AddSingleton(new NyxIdApiClient( + new NyxIdToolOptions { BaseUrl = "https://nyx.example.com" }, + new HttpClient(new RoutingJsonHandler()) + { + BaseAddress = new Uri("https://nyx.example.com"), + })); + // Shrunk budget so the version-stale path finishes in <100 ms. + var tool = new AgentBuilderTool( + services.BuildServiceProvider(), + projectionWaitAttempts: 3, + projectionWaitDelayMilliseconds: 1); + + AgentToolRequestContext.CurrentMetadata = new Dictionary + { + [LLMRequestMetadataKeys.NyxIdAccessToken] = "session-token", + }; + try + { + var result = await tool.ExecuteAsync(""" + { + "action": "disable_agent", + "agent_id": "skill-runner-stale" + } + """); + + using var doc = JsonDocument.Parse(result); + + // Path-level assertion: the helper exhausted the injected + // 3-attempt budget instead of returning on the first status + // match: 1 caller baseline + 3 helper iterations = 4 calls. + // With status-only polling the helper would have returned on + // iteration 0 without ever calling GetStateVersionAsync, so + // total would be 1. Tightly coupled to the injected budget by + // design — that is what pins the contract. + await queryPort.Received(4).GetStateVersionAsync("skill-runner-stale", Arg.Any()); + + // Outcome-level assertion: when the dual gate never passes, the + // user-facing payload must NOT claim success. The wait helper + // returns Confirmed=false (no un-gated GetAsync fallback), and + // DisableAgentAsync surfaces the pre-dispatch entry plus an + // honest "submitted / propagating" note. A regression that + // re-introduces the un-gated final read OR drops the + // confirmed/unconfirmed branching makes this test fail by + // surfacing "Scheduling paused" + status=Disabled despite the + // dual gate having been violated. + doc.RootElement.GetProperty("status").GetString().Should().Be(SkillRunnerDefaults.StatusRunning); + var note = doc.RootElement.GetProperty("note").GetString(); + note.Should().Contain("Disable submitted") + .And.Contain("/agent-status") + .And.NotContain("Scheduling paused"); + } + finally + { + AgentToolRequestContext.CurrentMetadata = null; + } + } + [Fact] public async Task ExecuteAsync_DisableAgent_DispatchesDisableAndReturnsStatus() { @@ -1706,6 +2001,12 @@ public async Task ExecuteAsync_DisableAgent_DispatchesDisableAndReturnsStatus() ScheduleCron = "0 9 * * *", ScheduleTimezone = "UTC", })); + // Caller's pre-dispatch baseline read returns 5; helper's post-dispatch + // poll sees 6, satisfying the new version+status dual gate. + queryPort.GetStateVersionAsync("skill-runner-1", Arg.Any()) + .Returns( + Task.FromResult(5L), + Task.FromResult(6L)); var skillRunnerActor = Substitute.For(); skillRunnerActor.Id.Returns("skill-runner-1"); @@ -1778,6 +2079,12 @@ public async Task ExecuteAsync_EnableAgent_DispatchesEnableAndReturnsStatus() ScheduleCron = "0 9 * * *", ScheduleTimezone = "UTC", })); + // Caller's pre-dispatch baseline read returns 5; helper's post-dispatch + // poll sees 6, satisfying the new version+status dual gate. + queryPort.GetStateVersionAsync("skill-runner-1", Arg.Any()) + .Returns( + Task.FromResult(5L), + Task.FromResult(6L)); var skillRunnerActor = Substitute.For(); skillRunnerActor.Id.Returns("skill-runner-1"); @@ -1850,6 +2157,12 @@ public async Task ExecuteAsync_DisableAgent_DispatchesWorkflowDisableAndReturnsS ScheduleCron = "0 9 * * *", ScheduleTimezone = "UTC", })); + // Caller's pre-dispatch baseline read returns 5; helper's post-dispatch + // poll sees 6, satisfying the new version+status dual gate. + queryPort.GetStateVersionAsync("workflow-agent-1", Arg.Any()) + .Returns( + Task.FromResult(5L), + Task.FromResult(6L)); var workflowAgentActor = Substitute.For(); workflowAgentActor.Id.Returns("workflow-agent-1");