Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 80 additions & 11 deletions dotnet/src/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public sealed partial class CopilotClient : IDisposable, IAsyncDisposable
/// </summary>
private const int MinProtocolVersion = 3;
private static readonly TimeSpan s_stderrPumpShutdownTimeout = TimeSpan.FromSeconds(5);
private static readonly TimeSpan s_runtimeShutdownTimeout = TimeSpan.FromSeconds(10);

/// <summary>
/// Provides a thread-safe collection of active Copilot sessions, indexed by session identifier.
Expand Down Expand Up @@ -291,7 +292,7 @@ async Task<Connection> StartCoreAsync(CancellationToken ct)

if (connection is not null)
{
await CleanupConnectionAsync(connection, errors: null);
await CleanupConnectionAsync(connection, errors: null, gracefulRuntimeShutdown: false);
}
else if (cliProcess is not null)
{
Expand All @@ -312,6 +313,7 @@ async Task<Connection> StartCoreAsync(CancellationToken ct)
/// This method performs graceful cleanup:
/// <list type="number">
/// <item>Closes all active sessions (releases in-memory resources)</item>
/// <item>Requests runtime shutdown for SDK-owned CLI processes</item>
/// <item>Closes the JSON-RPC connection</item>
/// <item>Terminates the CLI server process (if spawned by this client)</item>
/// </list>
Expand Down Expand Up @@ -346,7 +348,7 @@ public async Task StopAsync()

_sessions.Clear();

await CleanupConnectionAsync(errors);
await CleanupConnectionAsync(errors, gracefulRuntimeShutdown: true);

ThrowErrors(errors);
}
Expand Down Expand Up @@ -378,7 +380,7 @@ public async Task ForceStopAsync()
_sessions.Clear();

var errors = new List<Exception>();
await CleanupConnectionAsync(errors);
await CleanupConnectionAsync(errors, gracefulRuntimeShutdown: false);
ThrowErrors(errors);
}

Expand All @@ -398,7 +400,7 @@ private static void ThrowErrors(List<Exception>? errors)
}
}

private async Task CleanupConnectionAsync(List<Exception>? errors)
private async Task CleanupConnectionAsync(List<Exception>? errors, bool gracefulRuntimeShutdown)
{
var connectionTask = _connectionTask;
if (connectionTask is null)
Expand All @@ -419,11 +421,36 @@ private async Task CleanupConnectionAsync(List<Exception>? errors)
return;
}

await CleanupConnectionAsync(ctx, errors);
await CleanupConnectionAsync(ctx, errors, gracefulRuntimeShutdown);
}

private async Task CleanupConnectionAsync(Connection ctx, List<Exception>? errors)
private async Task CleanupConnectionAsync(Connection ctx, List<Exception>? errors, bool gracefulRuntimeShutdown)
{
var runtimeShutdownCompleted = false;
if (gracefulRuntimeShutdown && ctx.CliProcess is not null)
{
var runtimeShutdownTimestamp = Stopwatch.GetTimestamp();
try
{
using var cancellation = new CancellationTokenSource(s_runtimeShutdownTimeout);
await ctx.Server.Runtime.ShutdownAsync(cancellation.Token);
runtimeShutdownCompleted = true;
LoggingHelpers.LogTiming(_logger, LogLevel.Debug, null,
"CopilotClient.StopAsync runtime shutdown complete. Elapsed={Elapsed}",
runtimeShutdownTimestamp);
}
catch (Exception ex) when (ex is OperationCanceledException
or InvalidOperationException
or ObjectDisposedException
or IOException
or SocketException)
{
LoggingHelpers.LogTiming(_logger, LogLevel.Debug, ex,
"CopilotClient.StopAsync runtime shutdown failed. Elapsed={Elapsed}",
runtimeShutdownTimestamp);
}
}

try { ctx.Rpc.Dispose(); }
catch (Exception ex) { AddCleanupError(errors, ex, _logger); }

Expand All @@ -439,22 +466,62 @@ private async Task CleanupConnectionAsync(Connection ctx, List<Exception>? error

if (ctx.CliProcess is { } childProcess)
{
await CleanupCliProcessAsync(childProcess, ctx.StderrPump, errors, _logger);
await CleanupCliProcessAsync(childProcess, ctx.StderrPump, errors, _logger, runtimeShutdownCompleted);
}
}

private static async Task CleanupCliProcessAsync(Process childProcess, ProcessStderrPump? stderrPump, List<Exception>? errors, ILogger? logger)
private static async Task CleanupCliProcessAsync(Process childProcess, ProcessStderrPump? stderrPump, List<Exception>? errors, ILogger? logger, bool waitForGracefulExit = false)
{
stderrPump?.Cancel();

try
{
if (!childProcess.HasExited)
{
if (waitForGracefulExit)
{
var shutdownWaitTimestamp = Stopwatch.GetTimestamp();
try
{
await childProcess.WaitForExitAsync().WaitAsync(s_runtimeShutdownTimeout);
}
catch (TimeoutException ex)
{
if (logger is not null)
{
LoggingHelpers.LogTiming(logger, LogLevel.Debug, ex,
"Timed out waiting for runtime process to exit after graceful shutdown. Elapsed={Elapsed}, Timeout={Timeout}",
shutdownWaitTimestamp,
s_runtimeShutdownTimeout);
}
}
}

if (childProcess.HasExited)
{
return;
}

childProcess.Kill(entireProcessTree: true);
// Kill is asynchronous; wait for the root CLI process to exit so cleanup callers
// do not observe StopAsync/DisposeAsync completion while it is still tearing down.
await childProcess.WaitForExitAsync();
var killWaitTimestamp = Stopwatch.GetTimestamp();
try
{
await childProcess.WaitForExitAsync().WaitAsync(s_runtimeShutdownTimeout);
}
catch (TimeoutException ex)
{
if (logger is not null)
{
LoggingHelpers.LogTiming(logger, LogLevel.Debug, ex,
"Timed out waiting for runtime process to exit after kill. Elapsed={Elapsed}, Timeout={Timeout}",
killWaitTimestamp,
s_runtimeShutdownTimeout);
}

AddCleanupError(errors, ex, logger);
}
}
}
catch (Exception ex)
Expand Down Expand Up @@ -2002,9 +2069,10 @@ private async Task<Connection> ConnectToServerAsync(Process? cliProcess, string?
"CopilotClient.ConnectToServerAsync transport setup complete. Elapsed={Elapsed}",
setupTimestamp);

_serverRpc = new ServerRpc(rpc);
var connection = new Connection(rpc, cliProcess, networkStream, stderrPump);
_serverRpc = connection.Server;

return new Connection(rpc, cliProcess, networkStream, stderrPump);
return connection;
}
catch
{
Expand Down Expand Up @@ -2208,6 +2276,7 @@ private class Connection(
{
public Process? CliProcess => cliProcess;
public JsonRpc Rpc => rpc;
public ServerRpc Server => field ?? Interlocked.CompareExchange(ref field, new(rpc), null) ?? field;
public NetworkStream? NetworkStream => networkStream;
public ProcessStderrPump? StderrPump => stderrPump;
public StringBuilder? StderrBuffer => stderrPump?.Buffer;
Expand Down
50 changes: 12 additions & 38 deletions dotnet/test/E2E/TelemetryExportE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ public async Task Should_Export_File_Telemetry_For_Sdk_Interactions()
await session.DisposeAsync();
await client.StopAsync();

var entries = await ReadTelemetryEntriesAsync(
telemetryPath,
entries => entries.Any(entry => GetTypeName(entry) == "span" &&
GetStringAttribute(entry, "gen_ai.operation.name") == "invoke_agent"));
var entries = await ReadTelemetryEntriesAsync(telemetryPath);
var spans = entries.Where(entry => GetTypeName(entry) == "span").ToList();

Assert.NotEmpty(spans);
Expand Down Expand Up @@ -89,46 +86,23 @@ public async Task Should_Export_File_Telemetry_For_Sdk_Interactions()
static string EchoTelemetryMarker(string value) => value;
}

private static async Task<IReadOnlyList<JsonElement>> ReadTelemetryEntriesAsync(
string path,
Func<IReadOnlyList<JsonElement>, bool> isComplete)
private static async Task<IReadOnlyList<JsonElement>> ReadTelemetryEntriesAsync(string path)
{
IReadOnlyList<JsonElement> entries = [];
await TestHelper.WaitForConditionAsync(
async () =>
{
entries = await ReadTelemetryEntriesOnceAsync(path);
return entries.Count > 0 && isComplete(entries);
},
timeout: TimeSpan.FromSeconds(30),
timeoutMessage: $"Timed out waiting for telemetry records in '{path}'.",
transientExceptionFilter: exception => TestHelper.IsTransientFileSystemException(exception) || exception is JsonException);

return entries;

static async Task<IReadOnlyList<JsonElement>> ReadTelemetryEntriesOnceAsync(string path)
var entries = new List<JsonElement>();
using var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read);
using var reader = new StreamReader(stream);
while (await reader.ReadLineAsync() is { } line)
{
if (!File.Exists(path) || new FileInfo(path).Length == 0)
if (string.IsNullOrWhiteSpace(line))
{
return [];
continue;
}

var entries = new List<JsonElement>();
using var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete);
using var reader = new StreamReader(stream);
while (await reader.ReadLineAsync() is { } line)
{
if (string.IsNullOrWhiteSpace(line))
{
continue;
}

using var document = JsonDocument.Parse(line);
entries.Add(document.RootElement.Clone());
}

return entries;
using var document = JsonDocument.Parse(line);
entries.Add(document.RootElement.Clone());
}

return entries;
}

private static string? GetTraceId(JsonElement entry) => GetStringProperty(entry, "traceId");
Expand Down
Loading
Loading