Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 169 additions & 39 deletions agents/Aevatar.GAgents.ChannelRuntime/AgentBuilderTool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,23 @@ public sealed class AgentBuilderTool : IAgentTool
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<AgentBuilderTool>? _logger;

public AgentBuilderTool(IServiceProvider serviceProvider, ILogger<AgentBuilderTool>? logger = null)
// Per-instance polling budget for actor -> projector -> document store
// propagation. Defaults to ProjectionWaitDefaults (15 s); tests inject
// shrunk values via the constructor instead of mutating a process-global,
// which would race other tests if the test surface ever parallelizes.
private readonly int _projectionWaitAttempts;
private readonly int _projectionWaitDelayMilliseconds;

public AgentBuilderTool(
IServiceProvider serviceProvider,
ILogger<AgentBuilderTool>? logger = null,
int projectionWaitAttempts = ProjectionWaitDefaults.Attempts,
int projectionWaitDelayMilliseconds = ProjectionWaitDefaults.DelayMilliseconds)
{
_serviceProvider = serviceProvider;
_logger = logger;
_projectionWaitAttempts = projectionWaitAttempts;
_projectionWaitDelayMilliseconds = projectionWaitDelayMilliseconds;
}

public string Name => "agent_builder";
Expand Down Expand Up @@ -252,6 +264,12 @@ private async Task<string> CreateDailyReportAgentAsync(
?? await actorRuntime.CreateAsync<SkillRunnerGAgent>(agentId, ct);

var versionBefore = await queryPort.GetStateVersionAsync(agentId, ct) ?? -1;

// Prime the projection scope BEFORE dispatch — see DeleteAgentAsync for
// the rationale. A late prime can't recover an event the projector
// already missed.
await EnsureUserAgentCatalogProjectionAsync(ct);

var deliveryTarget = ResolveDeliveryTarget(conversationId, agentId);
var initialize = new InitializeSkillRunnerCommand
{
Expand Down Expand Up @@ -288,7 +306,6 @@ await actor.HandleEventAsync(
BuildDirectEnvelope(actor.Id, new TriggerSkillRunnerExecutionCommand { Reason = "create_agent" }),
ct);

await EnsureUserAgentCatalogProjectionAsync(ct);
var confirmed = await WaitForCreatedAgentAsync(
queryPort,
agentId,
Expand Down Expand Up @@ -398,6 +415,12 @@ private async Task<string> CreateSocialMediaAgentAsync(
?? await actorRuntime.CreateAsync<WorkflowAgentGAgent>(agentId, ct);

var versionBefore = await queryPort.GetStateVersionAsync(agentId, ct) ?? -1;

// Prime the projection scope BEFORE dispatch — see DeleteAgentAsync for
// the rationale. A late prime can't recover an event the projector
// already missed.
await EnsureUserAgentCatalogProjectionAsync(ct);

var deliveryTarget = ResolveDeliveryTarget(conversationId, agentId);
var initialize = new InitializeWorkflowAgentCommand
{
Expand All @@ -422,7 +445,6 @@ private async Task<string> CreateSocialMediaAgentAsync(

await actor.HandleEventAsync(BuildDirectEnvelope(actor.Id, initialize), ct);

await EnsureUserAgentCatalogProjectionAsync(ct);
var confirmed = await WaitForCreatedAgentAsync(
queryPort,
agentId,
Expand Down Expand Up @@ -514,6 +536,18 @@ private async Task<string> DeleteAgentAsync(
});
}

// Capture the read-model version before issuing tombstone so the wait can
// distinguish "projection caught up" from "projector did not run yet".
var versionBefore = await queryPort.GetStateVersionAsync(entry.AgentId, ct) ?? -1;

// Prime the projection scope BEFORE any dispatch. If we primed after
// HandleEventAsync, an idle-deactivated projection grain would have
// already missed the published event and a late activation could not
// recover it (the activation contract is "be alive when the event
// arrives", not "replay missed events"). Activating up front costs at
// most one extra warm-grain round trip.
await EnsureUserAgentCatalogProjectionAsync(ct);

var disableResult = await DispatchAgentLifecycleAsync(entry, actorRuntime, "delete_agent", LifecycleAction.Disable, null, ct);
if (disableResult.error != null)
return disableResult.error;
Expand All @@ -527,42 +561,41 @@ await registryActor.HandleEventAsync(
BuildDirectEnvelope(registryActor.Id, new UserAgentCatalogTombstoneCommand { AgentId = entry.AgentId }),
ct);

for (var attempt = 0; attempt < 10; attempt++)
{
if (attempt > 0)
await Task.Delay(500, ct);
var deleted = await WaitForTombstoneReflectedAsync(
queryPort,
entry.AgentId,
versionBefore,
ct,
_projectionWaitAttempts,
_projectionWaitDelayMilliseconds);

if (await queryPort.GetAsync(entry.AgentId, ct) == null)
var ownerFilter = !string.IsNullOrWhiteSpace(entry.OwnerNyxUserId)
? entry.OwnerNyxUserId
: await ResolveCurrentUserIdAsync(nyxClient, token, ct);
var agents = await QueryAgentsForOwnerAsync(queryPort, ownerFilter, ct);

if (deleted)
{
return JsonSerializer.Serialize(new
{
var ownerFilter = !string.IsNullOrWhiteSpace(entry.OwnerNyxUserId)
? entry.OwnerNyxUserId
: await ResolveCurrentUserIdAsync(nyxClient, token, ct);
var agents = await QueryAgentsForOwnerAsync(queryPort, ownerFilter, ct);
return JsonSerializer.Serialize(new
{
status = "deleted",
agent_id = entry.AgentId,
revoked_api_key_id = entry.ApiKeyId,
delete_notice = $"Deleted agent `{entry.AgentId}`. Revoked API key: `{entry.ApiKeyId ?? "n/a"}`.",
agents,
total = agents.Length,
});
}
status = "deleted",
agent_id = entry.AgentId,
revoked_api_key_id = entry.ApiKeyId,
delete_notice = $"Deleted agent `{entry.AgentId}`. Revoked API key: `{entry.ApiKeyId ?? "n/a"}`.",
agents,
total = agents.Length,
});
}

var acceptedOwnerFilter = !string.IsNullOrWhiteSpace(entry.OwnerNyxUserId)
? entry.OwnerNyxUserId
: await ResolveCurrentUserIdAsync(nyxClient, token, ct);
var acceptedAgents = await QueryAgentsForOwnerAsync(queryPort, acceptedOwnerFilter, ct);
return JsonSerializer.Serialize(new
{
status = "accepted",
agent_id = entry.AgentId,
revoked_api_key_id = entry.ApiKeyId,
delete_notice = $"Delete submitted for `{entry.AgentId}`. Revoked API key: `{entry.ApiKeyId ?? "n/a"}`.",
agents = acceptedAgents,
total = acceptedAgents.Length,
note = "Delete was submitted but registry tombstone is not yet reflected.",
agents,
total = agents.Length,
note = "Tombstone is propagating. Run /agents in a few seconds to confirm the agent is gone.",
});
}

Expand Down Expand Up @@ -617,12 +650,31 @@ private async Task<string> DisableAgentAsync(
string.Equals(entry.value.Status, WorkflowAgentDefaults.StatusDisabled, StringComparison.Ordinal))
return SerializeAgentStatus(entry.value, "Agent is already disabled.");

// Capture baseline version BEFORE dispatch so the wait can distinguish
// "projection has materialized this disable" from "stale read replica
// happens to surface a historical disabled status". Capture must
// precede dispatch — capturing inside the wait helper would race
// against a fast projection that already advanced the version.
var versionBefore = await queryPort.GetStateVersionAsync(entry.value.AgentId, ct) ?? -1;

// Prime the projection scope BEFORE dispatch — see DeleteAgentAsync for
// the rationale. A late prime can't recover an event the projector
// already missed.
await EnsureUserAgentCatalogProjectionAsync(ct);

var dispatch = await DispatchAgentLifecycleAsync(entry.value, actorRuntime, "disable_agent", LifecycleAction.Disable, null, ct);
if (dispatch.error != null)
return dispatch.error;

var after = await WaitForAgentStatusAsync(queryPort, entry.value.AgentId, SkillRunnerDefaults.StatusDisabled, ct) ?? entry.value;
return SerializeAgentStatus(after, "Agent disabled. Scheduling paused.");
var observation = await WaitForAgentStatusAsync(queryPort, entry.value.AgentId, versionBefore, SkillRunnerDefaults.StatusDisabled, ct);
if (observation.Confirmed)
return SerializeAgentStatus(observation.Entry!, "Agent disabled. Scheduling paused.");

// Dual gate never passed — the disable was dispatched but the read
// model has not confirmed the lifecycle change within the wait
// budget. Surface the pre-dispatch entry with an honest propagating
// note so the caller (LLM/user) does not assume the agent is paused.
return SerializeAgentStatus(entry.value, "Disable submitted. Run /agent-status in a few seconds to confirm the agent is paused.");
}

private async Task<string> EnableAgentAsync(
Expand All @@ -639,12 +691,25 @@ private async Task<string> EnableAgentAsync(
string.Equals(entry.value.Status, WorkflowAgentDefaults.StatusRunning, StringComparison.Ordinal))
return SerializeAgentStatus(entry.value, "Agent is already enabled.");

// See DisableAgentAsync for why versionBefore is captured here (before
// any dispatch) and not inside WaitForAgentStatusAsync.
var versionBefore = await queryPort.GetStateVersionAsync(entry.value.AgentId, ct) ?? -1;

// Prime the projection scope BEFORE dispatch — see DeleteAgentAsync for
// the rationale. A late prime can't recover an event the projector
// already missed.
await EnsureUserAgentCatalogProjectionAsync(ct);

var dispatch = await DispatchAgentLifecycleAsync(entry.value, actorRuntime, "enable_agent", LifecycleAction.Enable, null, ct);
if (dispatch.error != null)
return dispatch.error;

var after = await WaitForAgentStatusAsync(queryPort, entry.value.AgentId, SkillRunnerDefaults.StatusRunning, ct) ?? entry.value;
return SerializeAgentStatus(after, "Agent enabled. Scheduling resumed.");
var observation = await WaitForAgentStatusAsync(queryPort, entry.value.AgentId, versionBefore, SkillRunnerDefaults.StatusRunning, ct);
if (observation.Confirmed)
return SerializeAgentStatus(observation.Entry!, "Agent enabled. Scheduling resumed.");

// See DisableAgentAsync for the rationale on the un-confirmed branch.
return SerializeAgentStatus(entry.value, "Enable submitted. Run /agent-status in a few seconds to confirm the agent is running.");
}

private static EventEnvelope BuildDirectEnvelope(string targetActorId, IMessage payload)
Expand Down Expand Up @@ -776,23 +841,88 @@ private async Task EnsureUserAgentCatalogProjectionAsync(CancellationToken ct)
await projectionPort.EnsureProjectionForActorAsync(UserAgentCatalogGAgent.WellKnownId, ct);
}

private async Task<UserAgentCatalogEntry?> WaitForAgentStatusAsync(
private async Task<(bool Confirmed, UserAgentCatalogEntry? Entry)> WaitForAgentStatusAsync(
IUserAgentCatalogQueryPort queryPort,
string agentId,
long versionBefore,
string expectedStatus,
CancellationToken ct)
{
for (var attempt = 0; attempt < 10; attempt++)
// Status + version dual-condition (mirrors WaitForCreatedAgentAsync):
// wait until the read model both advances past the caller-captured
// baseline AND surfaces the expected status. Status alone is not
// enough — a stale replica can hold an expected-looking historical
// status (e.g., a previous disable→enable→disable cycle) and pass a
// status-only check while the actor has not yet processed *this*
// dispatch. Conversely, version alone is not enough either — an
// unrelated state event could advance the version without changing
// status. Both conditions together pin "this specific lifecycle
// event has materialized in the read model". Caller must capture
// versionBefore *before* dispatch, otherwise a fast projection that
// already advanced the version would make versionAfter == versionBefore
// and burn the entire budget. Projection scope priming also happens
// in the caller before dispatch (see DisableAgentAsync /
// EnableAgentAsync) — a late prime here cannot recover an event the
// projector already missed.
for (var attempt = 0; attempt < _projectionWaitAttempts; attempt++)
{
if (attempt > 0)
await Task.Delay(500, ct);
await Task.Delay(_projectionWaitDelayMilliseconds, ct);

var versionAfter = await queryPort.GetStateVersionAsync(agentId, ct) ?? -1;
if (versionAfter <= versionBefore)
continue;

var entry = await queryPort.GetAsync(agentId, ct);
if (entry != null && string.Equals(entry.Status, expectedStatus, StringComparison.Ordinal))
return entry;
return (Confirmed: true, Entry: entry);
}

return await queryPort.GetAsync(agentId, ct);
// Budget exhausted: the dual gate never passed. Do NOT fall back to an
// un-gated GetAsync read — that would surface a stale-but-expected-
// looking entry and let callers report success despite the contract
// not being satisfied. Callers must surface honest "submitted /
// propagating" copy when Confirmed is false.
return (Confirmed: false, Entry: null);
}

/// <summary>
/// Polls the read model until the agent's tombstoned state is reflected as a
/// document deletion. The read-model contract guarantees that a tombstoned
/// entry causes <see cref="UserAgentCatalogProjector"/> to dispatch
/// <c>DeleteAsync</c>; document absence is therefore the authoritative signal.
/// </summary>
private static async Task<bool> WaitForTombstoneReflectedAsync(
IUserAgentCatalogQueryPort queryPort,
string agentId,
long versionBefore,
CancellationToken ct,
int maxAttempts = ProjectionWaitDefaults.Attempts,
int delayMilliseconds = ProjectionWaitDefaults.DelayMilliseconds)
{
for (var attempt = 0; attempt < maxAttempts; attempt++)
{
if (attempt > 0)
await Task.Delay(delayMilliseconds, ct);

// GetStateVersionAsync reads the same document; if it is null the
// document has been deleted by the projector.
var versionAfter = await queryPort.GetStateVersionAsync(agentId, ct);
if (versionAfter == null)
return true;

if (versionAfter.Value <= versionBefore)
continue;

// Version advanced (a fresh state event reached the projector) but the
// document still exists; if it is the tombstoned entry the projector
// would have deleted it on the same advance, so a non-null entry means
// either an interleaving upsert or a stale read replica - keep waiting.
if (await queryPort.GetAsync(agentId, ct) == null)
return true;
}

return false;
}

private async Task<(bool success, string? error)> DispatchAgentLifecycleAsync(
Expand Down
47 changes: 39 additions & 8 deletions agents/Aevatar.GAgents.ChannelRuntime/AgentDeliveryTargetTool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@ namespace Aevatar.GAgents.ChannelRuntime;
public sealed class AgentDeliveryTargetTool : IAgentTool
{
private readonly IServiceProvider _serviceProvider;

public AgentDeliveryTargetTool(IServiceProvider serviceProvider)
// Per-instance polling budget (see ProjectionWaitDefaults). Tests inject
// shrunk values via the constructor to exercise the budget-exhausted
// branch without burning the production 15 s.
private readonly int _projectionWaitAttempts;
private readonly int _projectionWaitDelayMilliseconds;

public AgentDeliveryTargetTool(
IServiceProvider serviceProvider,
int projectionWaitAttempts = ProjectionWaitDefaults.Attempts,
int projectionWaitDelayMilliseconds = ProjectionWaitDefaults.DelayMilliseconds)
{
_serviceProvider = serviceProvider;
_projectionWaitAttempts = projectionWaitAttempts;
_projectionWaitDelayMilliseconds = projectionWaitDelayMilliseconds;
}

public string Name => "agent_delivery_targets";
Expand Down Expand Up @@ -216,10 +226,10 @@ private async Task<string> UpsertAsync(
await actor.HandleEventAsync(envelope);

var confirmed = false;
for (var attempt = 0; attempt < 10; attempt++)
for (var attempt = 0; attempt < _projectionWaitAttempts; attempt++)
{
if (attempt > 0)
await Task.Delay(500, ct);
await Task.Delay(_projectionWaitDelayMilliseconds, ct);

var versionAfter = await queryPort.GetStateVersionAsync(agentId.value!, ct) ?? -1;
if (versionAfter <= versionBefore)
Expand Down Expand Up @@ -289,6 +299,15 @@ private async Task<string> DeleteAsync(
});
}

// Capture version + ensure projection scope is alive (matches the Upsert path
// above). Without priming, an idle-deactivated projection grain leaves the
// tombstone enqueued with no consumer and the document persists indefinitely.
var versionBefore = await queryPort.GetStateVersionAsync(agentId, ct) ?? -1;

var projectionPort = _serviceProvider.GetService<UserAgentCatalogProjectionPort>();
if (projectionPort != null)
await projectionPort.EnsureProjectionForActorAsync(UserAgentCatalogGAgent.WellKnownId, ct);

var actor = await actorRuntime.GetAsync(UserAgentCatalogGAgent.WellKnownId)
?? await actorRuntime.CreateAsync<UserAgentCatalogGAgent>(UserAgentCatalogGAgent.WellKnownId);

Expand All @@ -310,12 +329,24 @@ private async Task<string> DeleteAsync(

// Tombstone triggers IProjectionWriteDispatcher.DeleteAsync (Channel RFC §7.1.1),
// which also removes the document's projected StateVersion. Gate confirmation
// purely on document absence — versionAfter would be null after the delete lands.
// on either document absence or a state-version advance that materializes the
// delete — the prior absence-only check returned false negatives whenever the
// 5 s budget lost the race to projection lag.
var confirmed = false;
for (var attempt = 0; attempt < 10; attempt++)
for (var attempt = 0; attempt < _projectionWaitAttempts; attempt++)
{
if (attempt > 0)
await Task.Delay(500, ct);
await Task.Delay(_projectionWaitDelayMilliseconds, ct);

var versionAfter = await queryPort.GetStateVersionAsync(agentId, ct);
if (versionAfter == null)
{
confirmed = true;
break;
}

if (versionAfter.Value <= versionBefore)
continue;

if (await queryPort.GetAsync(agentId, ct) == null)
{
Expand All @@ -329,7 +360,7 @@ private async Task<string> DeleteAsync(
status = confirmed ? "deleted" : "accepted",
agent_id = agentId,
delivery_target_id = agentId,
note = confirmed ? "" : "Delete submitted but projection not yet confirmed. Try 'list' after a few seconds.",
note = confirmed ? "" : "Tombstone is propagating. Try 'list' in a few seconds to confirm the delivery target is gone.",
});
}

Expand Down
Loading
Loading