Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions packages/happy-cli/src/claude/utils/OutgoingMessageQueue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { OutgoingMessageQueue } from './OutgoingMessageQueue';

describe('OutgoingMessageQueue', () => {
let sentMessages: any[];
let queue: OutgoingMessageQueue;

beforeEach(() => {
sentMessages = [];
queue = new OutgoingMessageQueue((msg) => sentMessages.push(msg));
});

afterEach(() => {
queue.destroy();
});

// Small delay to let the async lock chain and setTimeout(0) scheduling resolve
const tick = (ms = 50) => new Promise(r => setTimeout(r, ms));

it('should send non-delayed messages immediately', async () => {
queue.enqueue({ type: 'text', content: 'hello' });
await tick();

expect(sentMessages).toHaveLength(1);
expect(sentMessages[0].content).toBe('hello');
});

it('should send multiple non-delayed messages in order', async () => {
queue.enqueue({ type: 'text', content: 'first' });
queue.enqueue({ type: 'text', content: 'second' });
queue.enqueue({ type: 'text', content: 'third' });
await tick();

expect(sentMessages).toHaveLength(3);
expect(sentMessages[0].content).toBe('first');
expect(sentMessages[1].content).toBe('second');
expect(sentMessages[2].content).toBe('third');
});

it('should delay messages with delay option', async () => {
queue.enqueue({ type: 'text', content: 'delayed' }, { delay: 100 });
await tick();
expect(sentMessages).toHaveLength(0);

// Wait for delay to expire
await tick(150);

expect(sentMessages).toHaveLength(1);
expect(sentMessages[0].content).toBe('delayed');
});

it('should NOT block released messages behind unreleased ones', async () => {
// This is the core fix for head-of-line blocking (#639)
queue.enqueue({ type: 'text', content: 'delayed-tool-call' }, {
delay: 200,
toolCallIds: ['tool-1']
});
queue.enqueue({ type: 'text', content: 'immediate-result' });
await tick();

// The immediate message should be sent even though delayed one is in queue
expect(sentMessages).toHaveLength(1);
expect(sentMessages[0].content).toBe('immediate-result');

// After delay expires, the delayed message should also be sent
await tick(250);

expect(sentMessages).toHaveLength(2);
expect(sentMessages[1].content).toBe('delayed-tool-call');
});

it('should release delayed messages via releaseToolCall', async () => {
queue.enqueue({ type: 'text', content: 'tool-call-msg' }, {
delay: 500,
toolCallIds: ['tool-1']
});
await tick();
expect(sentMessages).toHaveLength(0);

// Release via tool call ID (before delay expires)
await queue.releaseToolCall('tool-1');
await tick();

expect(sentMessages).toHaveLength(1);
expect(sentMessages[0].content).toBe('tool-call-msg');
});

it('should not send system type messages', async () => {
queue.enqueue({ type: 'system', content: 'internal' });
await tick();

expect(sentMessages).toHaveLength(0);
});

it('should flush all messages immediately', async () => {
queue.enqueue({ type: 'text', content: 'delayed1' }, { delay: 500 });
queue.enqueue({ type: 'text', content: 'delayed2' }, { delay: 500 });
queue.enqueue({ type: 'text', content: 'immediate' });
await tick();

// Only immediate should have been sent (delayed ones skipped)
expect(sentMessages).toHaveLength(1);
expect(sentMessages[0].content).toBe('immediate');

await queue.flush();
await tick();

expect(sentMessages).toHaveLength(3);
});

it('should handle interleaved delayed and immediate messages', async () => {
queue.enqueue({ type: 'text', content: 'tool-call-1' }, { delay: 200, toolCallIds: ['t1'] });
queue.enqueue({ type: 'text', content: 'sidechain-result' });
queue.enqueue({ type: 'text', content: 'tool-call-2' }, { delay: 200, toolCallIds: ['t2'] });
await tick();

// The sidechain result should be sent immediately
expect(sentMessages).toHaveLength(1);
expect(sentMessages[0].content).toBe('sidechain-result');

// After 200ms both tool calls should be released
await tick(250);

expect(sentMessages).toHaveLength(3);
});
});
28 changes: 16 additions & 12 deletions packages/happy-cli/src/claude/utils/OutgoingMessageQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,36 @@ export class OutgoingMessageQueue {
}

/**
* Process queue - send messages in ID order that are released
* Process queue - send released messages in ID order, skipping
* unreleased items so they don't block later ready messages.
* (Internal implementation without lock)
*/
private processQueueInternal(): void {
// Sort by ID to ensure order
this.queue.sort((a, b) => a.id - b.id);

// Process from front of queue
while (this.queue.length > 0) {
const item = this.queue[0];

// If not released yet, stop processing (maintain order)

// Send all released items, skipping unreleased ones
let i = 0;
while (i < this.queue.length) {
const item = this.queue[i];

// Skip unreleased items — they'll be sent when released
if (!item.released) {
break;
i++;
continue;
}

// Send if not already sent
if (!item.sent) {
if (item.logMessage.type !== 'system') {
this.sendFunction(item.logMessage);
}
item.sent = true;
}

// Remove from queue
this.queue.shift();

// Remove sent item from queue
this.queue.splice(i, 1);
// Don't increment i since splice shifts remaining items
}
}

Expand Down