diff --git a/go/internal/e2e/compaction_e2e_test.go b/go/internal/e2e/compaction_e2e_test.go index e09a33b4f..2740c5542 100644 --- a/go/internal/e2e/compaction_e2e_test.go +++ b/go/internal/e2e/compaction_e2e_test.go @@ -1,15 +1,16 @@ package e2e import ( + "errors" "strings" "testing" + "time" copilot "github.com/github/copilot-sdk/go" "github.com/github/copilot-sdk/go/internal/e2e/testharness" ) func TestCompactionE2E(t *testing.T) { - t.Skip("Compaction tests are skipped due to flakiness — re-enable once stabilized") ctx := testharness.NewTestContext(t) client := ctx.NewClient() t.Cleanup(func() { client.ForceStop() }) @@ -33,19 +34,43 @@ func TestCompactionE2E(t *testing.T) { t.Fatalf("Failed to create session: %v", err) } - var compactionStartEvents []copilot.SessionEvent - var compactionCompleteEvents []copilot.SessionEvent - - session.On(func(event copilot.SessionEvent) { - switch event.Data.(type) { + // The first prompt leaves the session below the compaction processor's minimum + // message count. The second prompt is therefore the first deterministic point + // at which low thresholds can trigger compaction. Subscribe before any prompts + // are sent so we never miss the events. The complete-event subscription filters + // for Success==true so any transient failed compaction event the daemon may emit + // before a successful retry is ignored (mirrors the dotnet/rust references). + startCh := make(chan copilot.SessionEvent, 1) + completeCh := make(chan copilot.SessionEvent, 1) + errCh := make(chan error, 1) + unsubscribe := session.On(func(event copilot.SessionEvent) { + switch d := event.Data.(type) { case *copilot.SessionCompactionStartData: - compactionStartEvents = append(compactionStartEvents, event) + select { + case startCh <- event: + default: + } case *copilot.SessionCompactionCompleteData: - compactionCompleteEvents = append(compactionCompleteEvents, event) + if !d.Success { + return + } + select { + case completeCh <- event: + default: + } + case *copilot.SessionErrorData: + msg := d.Message + if msg == "" { + msg = "session error" + } + select { + case errCh <- errors.New(msg): + default: + } } }) + defer unsubscribe() - // Send multiple messages to fill up the context window _, err = session.SendAndWait(t.Context(), copilot.MessageOptions{Prompt: "Tell me a story about a dragon. Be detailed."}) if err != nil { t.Fatalf("Failed to send first message: %v", err) @@ -56,29 +81,64 @@ func TestCompactionE2E(t *testing.T) { t.Fatalf("Failed to send second message: %v", err) } - _, err = session.SendAndWait(t.Context(), copilot.MessageOptions{Prompt: "Now describe the dragon's treasure in great detail."}) - if err != nil { - t.Fatalf("Failed to send third message: %v", err) + const compactionTimeout = 60 * time.Second + + var startEvent copilot.SessionEvent + select { + case startEvent = <-startCh: + case err := <-errCh: + t.Fatalf("Session error waiting for session.compaction_start event: %v", err) + case <-time.After(compactionTimeout): + t.Fatalf("Timed out waiting for session.compaction_start event") + } + + var completeEvent copilot.SessionEvent + select { + case completeEvent = <-completeCh: + case err := <-errCh: + t.Fatalf("Session error waiting for session.compaction_complete event: %v", err) + case <-time.After(compactionTimeout): + t.Fatalf("Timed out waiting for session.compaction_complete event") } - // Should have triggered compaction at least once - if len(compactionStartEvents) < 1 { - t.Errorf("Expected at least 1 compaction_start event, got %d", len(compactionStartEvents)) + startData, ok := startEvent.Data.(*copilot.SessionCompactionStartData) + if !ok { + t.Fatalf("Expected SessionCompactionStartData, got %T", startEvent.Data) } - if len(compactionCompleteEvents) < 1 { - t.Errorf("Expected at least 1 compaction_complete event, got %d", len(compactionCompleteEvents)) + if startData.ConversationTokens == nil || *startData.ConversationTokens <= 0 { + t.Errorf("Expected compaction to report conversation tokens at start, got %v", startData.ConversationTokens) } - // Compaction should have succeeded - if len(compactionCompleteEvents) > 0 { - lastComplete := compactionCompleteEvents[len(compactionCompleteEvents)-1] - d, ok := lastComplete.Data.(*copilot.SessionCompactionCompleteData) - if !ok || !d.Success { - t.Errorf("Expected compaction to succeed") - } - if ok && d.TokensRemoved != nil && *d.TokensRemoved <= 0 { - t.Errorf("Expected tokensRemoved > 0, got %v", *d.TokensRemoved) - } + completeData, ok := completeEvent.Data.(*copilot.SessionCompactionCompleteData) + if !ok { + t.Fatalf("Expected SessionCompactionCompleteData, got %T", completeEvent.Data) + } + if !completeData.Success { + t.Errorf("Expected compaction to succeed, error=%v", completeData.Error) + } + if completeData.CompactionTokensUsed == nil { + t.Errorf("Expected compaction tokens-used data") + } else if completeData.CompactionTokensUsed.InputTokens == nil || *completeData.CompactionTokensUsed.InputTokens <= 0 { + t.Errorf("Expected compaction call to consume input tokens, got %v", completeData.CompactionTokensUsed.InputTokens) + } + summary := "" + if completeData.SummaryContent != nil { + summary = *completeData.SummaryContent + } + summary = strings.ToLower(summary) + if !strings.Contains(summary, "") { + t.Errorf("Expected summary to contain , got: %q", summary) + } + if !strings.Contains(summary, "") { + t.Errorf("Expected summary to contain , got: %q", summary) + } + if !strings.Contains(summary, "") { + t.Errorf("Expected summary to contain , got: %q", summary) + } + + _, err = session.SendAndWait(t.Context(), copilot.MessageOptions{Prompt: "Now describe the dragon's treasure in great detail."}) + if err != nil { + t.Fatalf("Failed to send third message: %v", err) } // Verify session still works after compaction @@ -86,8 +146,17 @@ func TestCompactionE2E(t *testing.T) { if err != nil { t.Fatalf("Failed to send verification message: %v", err) } - if ad, ok := answer.Data.(*copilot.AssistantMessageData); !ok || !strings.Contains(strings.ToLower(ad.Content), "dragon") { - t.Errorf("Expected answer to contain 'dragon', got %v", answer.Data) + ad, ok := answer.Data.(*copilot.AssistantMessageData) + if !ok { + t.Fatalf("Expected assistant message data, got %T", answer.Data) + } + content := strings.ToLower(ad.Content) + // Should remember it was about a dragon (context preserved via summary) + if !strings.Contains(content, "kaedrith") { + t.Errorf("Expected answer to mention 'Kaedrith', got: %q", ad.Content) + } + if !strings.Contains(content, "dragon") { + t.Errorf("Expected answer to mention 'dragon', got: %q", ad.Content) } }) diff --git a/nodejs/test/e2e/compaction.e2e.test.ts b/nodejs/test/e2e/compaction.e2e.test.ts index 02e14470f..7c07d2f0e 100644 --- a/nodejs/test/e2e/compaction.e2e.test.ts +++ b/nodejs/test/e2e/compaction.e2e.test.ts @@ -1,9 +1,40 @@ import { describe, expect, it } from "vitest"; -import { SessionEvent, approveAll } from "../../src/index.js"; +import { approveAll, type CopilotSession, type SessionEvent } from "../../src/index.js"; import { createSdkTestContext } from "./harness/sdkTestContext.js"; -// TODO: Compaction tests are skipped due to flakiness — re-enable once stabilized -describe.skip("Compaction", async () => { +const compactionTimeoutMs = 60_000; + +function getNextSessionEvent( + session: CopilotSession, + eventType: TEventType, + description: string, + predicate: (event: Extract) => boolean = () => true +): Promise> { + return new Promise((resolve, reject) => { + let unsubscribe: () => void = () => {}; + const timeout = setTimeout(() => { + unsubscribe(); + reject(new Error(`Timed out waiting for ${description}`)); + }, compactionTimeoutMs); + + unsubscribe = session.on((event) => { + if (event.type === eventType) { + const typedEvent = event as Extract; + if (predicate(typedEvent)) { + clearTimeout(timeout); + unsubscribe(); + resolve(typedEvent); + } + } else if (event.type === "session.error") { + clearTimeout(timeout); + unsubscribe(); + reject(new Error(`${event.data.message}\n${event.data.stack}`)); + } + }); + }); +} + +describe("Compaction", async () => { const { copilotClient: client } = await createSdkTestContext(); it("should trigger compaction with low threshold and emit events", async () => { @@ -19,48 +50,56 @@ describe.skip("Compaction", async () => { }, }); - const events: SessionEvent[] = []; - session.on((event) => { - events.push(event); - }); + // The first prompt leaves the session below the compaction processor's minimum + // message count. The second prompt is therefore the first deterministic point + // at which low thresholds can trigger compaction. Register event waiters before + // any prompts are sent so we never miss the events. + const compactionStartedP = getNextSessionEvent( + session, + "session.compaction_start", + "session.compaction_start" + ); + // Wait specifically for a *successful* compaction_complete so that any transient + // failed compaction event the daemon may emit before a successful retry is ignored + // (mirrors the dotnet/rust references). + const compactionCompletedP = getNextSessionEvent( + session, + "session.compaction_complete", + "successful session.compaction_complete", + (event) => event.data.success + ); - // Send multiple messages to fill up the context window - // With such low thresholds, even a few messages should trigger compaction await session.sendAndWait({ prompt: "Tell me a story about a dragon. Be detailed.", }); await session.sendAndWait({ prompt: "Continue the story with more details about the dragon's castle.", }); - await session.sendAndWait({ - prompt: "Now describe the dragon's treasure in great detail.", - }); - // Check for compaction events - const compactionStartEvents = events.filter((e) => e.type === "session.compaction_start"); - const compactionCompleteEvents = events.filter( - (e) => e.type === "session.compaction_complete" - ); + const [startEvent, completeEvent] = await Promise.all([ + compactionStartedP, + compactionCompletedP, + ]); - // Should have triggered compaction at least once - expect(compactionStartEvents.length).toBeGreaterThanOrEqual(1); - expect(compactionCompleteEvents.length).toBeGreaterThanOrEqual(1); + expect(startEvent.data.conversationTokens ?? 0).toBeGreaterThan(0); + expect(completeEvent.data.success).toBe(true); + expect(completeEvent.data.compactionTokensUsed).toBeDefined(); + expect(completeEvent.data.compactionTokensUsed?.inputTokens ?? 0).toBeGreaterThan(0); + const summary = (completeEvent.data.summaryContent ?? "").toLowerCase(); + expect(summary).toContain(""); + expect(summary).toContain(""); + expect(summary).toContain(""); - // Compaction should have succeeded - const lastCompactionComplete = - compactionCompleteEvents[compactionCompleteEvents.length - 1]; - expect(lastCompactionComplete.data.success).toBe(true); - - // Should have removed some tokens - if (lastCompactionComplete.data.tokensRemoved !== undefined) { - expect(lastCompactionComplete.data.tokensRemoved).toBeGreaterThan(0); - } + await session.sendAndWait({ + prompt: "Now describe the dragon's treasure in great detail.", + }); // Verify the session still works after compaction const answer = await session.sendAndWait({ prompt: "What was the story about?" }); - expect(answer?.data.content).toBeDefined(); + const content = (answer?.data.content ?? "").toLowerCase(); // Should remember it was about a dragon (context preserved via summary) - expect(answer?.data.content?.toLowerCase()).toContain("dragon"); + expect(content).toContain("kaedrith"); + expect(content).toContain("dragon"); }, 120000); it("should not emit compaction events when infinite sessions disabled", async () => { diff --git a/python/e2e/test_compaction_e2e.py b/python/e2e/test_compaction_e2e.py index b06a0312f..85af017ae 100644 --- a/python/e2e/test_compaction_e2e.py +++ b/python/e2e/test_compaction_e2e.py @@ -1,22 +1,26 @@ """E2E Compaction Tests""" +import asyncio + import pytest -from copilot.generated.session_events import SessionEventType +from copilot.generated.session_events import ( + SessionCompactionCompleteData, + SessionCompactionStartData, + SessionErrorData, + SessionEventType, +) from copilot.session import PermissionHandler from .testharness import E2ETestContext pytestmark = [ pytest.mark.asyncio(loop_scope="module"), - pytest.mark.skip( - reason="Compaction tests are skipped due to flakiness — re-enable once stabilized" - ), ] class TestCompaction: - @pytest.mark.timeout(120) + @pytest.mark.timeout(180) async def test_should_trigger_compaction_with_low_threshold_and_emit_events( self, ctx: E2ETestContext ): @@ -32,42 +36,86 @@ async def test_should_trigger_compaction_with_low_threshold_and_emit_events( }, ) - compaction_start_events = [] - compaction_complete_events = [] - - def on_event(event): - if event.type == SessionEventType.SESSION_COMPACTION_START: - compaction_start_events.append(event) - if event.type == SessionEventType.SESSION_COMPACTION_COMPLETE: - compaction_complete_events.append(event) - - session.on(on_event) - - # Send multiple messages to fill up the context window - await session.send_and_wait("Tell me a story about a dragon. Be detailed.") - await session.send_and_wait( - "Continue the story with more details about the dragon's castle." + # The first prompt leaves the session below the compaction processor's minimum + # message count. The second prompt is therefore the first deterministic point + # at which low thresholds can trigger compaction. Register event waiters before + # any prompts are sent so we never miss the events. + loop = asyncio.get_event_loop() + compaction_started_future: asyncio.Future = loop.create_future() + # Wait specifically for a *successful* compaction_complete so that any transient + # failed compaction event the daemon may emit before a successful retry is ignored + # (mirrors the dotnet/rust references). + compaction_completed_future: asyncio.Future = loop.create_future() + + def _on_compaction_event(event): + if ( + not compaction_started_future.done() + and event.type == SessionEventType.SESSION_COMPACTION_START + and isinstance(event.data, SessionCompactionStartData) + ): + compaction_started_future.set_result(event) + elif ( + not compaction_completed_future.done() + and event.type == SessionEventType.SESSION_COMPACTION_COMPLETE + and isinstance(event.data, SessionCompactionCompleteData) + and event.data.success + ): + compaction_completed_future.set_result(event) + elif isinstance(event.data, SessionErrorData): + msg = event.data.message or "session error" + if not compaction_started_future.done(): + compaction_started_future.set_exception(RuntimeError(msg)) + if not compaction_completed_future.done(): + compaction_completed_future.set_exception(RuntimeError(msg)) + + unsubscribe_compaction = session.on(_on_compaction_event) + + try: + await session.send_and_wait("Tell me a story about a dragon. Be detailed.") + await session.send_and_wait( + "Continue the story with more details about the dragon's castle." + ) + + start_event = await asyncio.wait_for(compaction_started_future, timeout=60.0) + complete_event = await asyncio.wait_for(compaction_completed_future, timeout=60.0) + except BaseException: + if not compaction_started_future.done(): + compaction_started_future.cancel() + if not compaction_completed_future.done(): + compaction_completed_future.cancel() + raise + finally: + unsubscribe_compaction() + + assert start_event.type == SessionEventType.SESSION_COMPACTION_START + assert isinstance(start_event.data, SessionCompactionStartData) + assert (start_event.data.conversation_tokens or 0) > 0, ( + "Expected compaction to report conversation tokens at start" ) - await session.send_and_wait("Now describe the dragon's treasure in great detail.") - - # Should have triggered compaction at least once - assert len(compaction_start_events) >= 1, "Expected at least 1 compaction_start event" - assert len(compaction_complete_events) >= 1, "Expected at least 1 compaction_complete event" - # Compaction should have succeeded - last_complete = compaction_complete_events[-1] - assert last_complete.data.success is True, "Expected compaction to succeed" + assert complete_event.type == SessionEventType.SESSION_COMPACTION_COMPLETE + assert isinstance(complete_event.data, SessionCompactionCompleteData) + assert complete_event.data.success is True, "Expected compaction to succeed" + assert complete_event.data.compaction_tokens_used is not None, ( + "Expected compaction tokens-used data" + ) + assert (complete_event.data.compaction_tokens_used.input_tokens or 0) > 0, ( + "Expected compaction call to consume input tokens" + ) + summary = (complete_event.data.summary_content or "").lower() + assert "" in summary, "Expected summary to contain " + assert "" in summary, "Expected summary to contain " + assert "" in summary, "Expected summary to contain " - # Should have removed some tokens - if last_complete.data.tokens_removed is not None: - assert last_complete.data.tokens_removed > 0, "Expected tokensRemoved > 0" + await session.send_and_wait("Now describe the dragon's treasure in great detail.") # Verify the session still works after compaction answer = await session.send_and_wait("What was the story about?") assert answer is not None - assert answer.data.content is not None + content = (answer.data.content or "").lower() # Should remember it was about a dragon (context preserved via summary) - assert "dragon" in answer.data.content.lower() + assert "kaedrith" in content, f"Expected answer to mention 'Kaedrith', got: {content!r}" + assert "dragon" in content, f"Expected answer to mention 'dragon', got: {content!r}" async def test_should_not_emit_compaction_events_when_infinite_sessions_disabled( self, ctx: E2ETestContext