-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmulti.js
More file actions
executable file
·2048 lines (1834 loc) · 77.2 KB
/
Copy pathmulti.js
File metadata and controls
executable file
·2048 lines (1834 loc) · 77.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env node
// bukowski multi-agent terminal - v1.1
const path = require('path');
const fs = require('fs');
// Bootstrap module
const {
SOCKETS_DIR,
SOCKET_DISCOVERY_FILE,
LEGACY_SOCKET_FILE,
FIPA_REMINDER,
findClaudePath,
findCodexPath,
createAgentTypes,
getFIPAPromptArgs,
resolveAgentType,
channelsEnabled,
loadQuotes,
showSplash,
parseArgs
} = require('./src/bootstrap');
const os = require('os');
const { Session } = require('./src/core/Session');
const { Agent } = require('./src/core/Agent');
const { ChatAgent } = require('./src/core/ChatAgent');
const { DashboardAgent } = require('./src/core/DashboardAgent');
const { LayoutManager } = require('./src/layout/LayoutManager');
const { Compositor } = require('./src/core/Compositor');
const { InputRouter } = require('./src/input/InputRouter');
const { IPCHub } = require('./src/ipc/IPCHub');
const { FIPAHub } = require('./src/acl/FIPAHub');
const { TabBar } = require('./src/ui/TabBar');
const { ChatPane } = require('./src/ui/ChatPane');
const { ConversationList } = require('./src/ui/ConversationList');
const { ConversationPicker } = require('./src/ui/ConversationPicker');
const { LayoutNode } = require('./src/layout/LayoutNode');
const { RegisterManager } = require('./src/input/RegisterManager');
const { findLatestSession } = require('./src/utils/agentSessions');
const { OverlayManager } = require('./src/ui/OverlayManager');
const { MCPServer } = require('./src/mcp/MCPServer');
const { PeerRegistry } = require('./src/federation/PeerRegistry');
const { FederationHub } = require('./src/federation/FederationHub');
const { FIPAMessage: FIPAMessageClass } = require('./src/acl/FIPAMessage');
const {
extractSelectedText,
extractLines,
extractWord,
extractToEndOfLine,
extractFromStartOfLine,
isWordChar,
moveWordForward,
moveWordEnd,
moveWordBackward,
findCharOnLine
} = require('./src/utils/bufferText');
const {
cellColFromCharIdx,
charIdxFromCellCol,
lineCellCount,
lastGraphemeCellCol,
stepGraphemeLeft,
stepGraphemeRight,
} = require('./src/utils/cellCoord');
const { TerminalManager } = require('./src/core/TerminalManager');
const { CommandExecutor } = require('./src/core/CommandExecutor');
const { ActionDispatcher } = require('./src/handlers');
// Initialize agent types with discovered CLI paths
const claudePath = findClaudePath();
const codexPath = findCodexPath();
const AGENT_TYPES = createAgentTypes(claudePath, codexPath);
// Lazy-initialized UI singletons (set later in startup)
let chatPane = null;
let compositor = null;
let peerRegistry = null;
let federationHub = null;
// Load quotes for splash screen
const quotesPath = path.join(__dirname, 'quotes.txt');
const QUOTES = loadQuotes(quotesPath);
const cliArgs = parseArgs();
// Optional debug logging to a file (captures console logs/errors) without polluting stdout
let logStream = null;
function enableFileLogging() {
const logFilePath = process.env.BUKOWSKI_LOG_FILE || 'bukowski.log';
logStream = fs.createWriteStream(path.resolve(logFilePath), { flags: 'a' });
console.log = (...args) => {
try {
logStream.write(`${new Date().toISOString()} [INFO] ${args.join(' ')}\n`);
} catch { /* ignore logging errors */ }
};
console.error = (...args) => {
try {
logStream.write(`${new Date().toISOString()} [ERROR] ${args.join(' ')}\n`);
} catch { /* ignore logging errors */ }
};
}
enableFileLogging();
// Activate file logging so console.log/error go to bukowski.log instead of stderr
enableFileLogging();
// Single-pane mode: exec single.js and exit
if (cliArgs.single) {
const singlePath = path.join(__dirname, 'single.js');
const { spawnSync } = require('child_process');
const result = spawnSync(process.execPath, [singlePath, ...cliArgs.agentArgs], {
stdio: 'inherit',
cwd: process.cwd(),
env: process.env
});
process.exit(result.status || 0);
}
// Terminal manager - handles setup/cleanup and signal handlers
const terminal = new TerminalManager(SOCKET_DISCOVERY_FILE, LEGACY_SOCKET_FILE);
terminal.registerSignalHandlers();
// Main async startup
(async () => {
// Enter alt screen
process.stdout.write('\x1b[?1049h');
// Show splash
showSplash(QUOTES);
const SPLASH_DURATION = parseInt(process.env.BUKOWSKI_SPLASH) || 2000;
// Wait for splash duration
await new Promise(resolve => setTimeout(resolve, SPLASH_DURATION));
// Continue with main initialization
let session;
let restoredSession = false;
let pendingSessionData = null; // Raw session data to restore after FIPAHub is created
// Try to restore session if requested
if (cliArgs.restore) {
try {
const { LayoutNode } = require('./src/layout/LayoutNode');
// Load session without conversations (will restore those after FIPAHub exists)
if (cliArgs.restore === 'latest') {
session = await Session.loadLatest(Agent, LayoutNode);
} else {
session = await Session.loadByIdOrName(cliArgs.restore, Agent, LayoutNode);
}
if (session) {
restoredSession = true;
// Load raw data to get conversations
const sessionDir = Session.getSessionDir();
const filepath = path.join(sessionDir, `${session.id}.json`);
try {
pendingSessionData = JSON.parse(fs.readFileSync(filepath, 'utf-8'));
} catch { /* ignore */ }
}
} catch (err) {
// Failed to restore, will create new session
// Log to stderr so it doesn't interfere with terminal
process.stderr.write(`Failed to restore session: ${err.message}\n`);
}
}
// Create new session if not restored
if (!session) {
const sessionName = cliArgs.sessionName || process.env.BUKOWSKI_SESSION || 'Main';
session = new Session(sessionName);
// Create initial Claude agent
// Inject FIPA reminder if user didn't provide their own prompt
const claudeConfig = AGENT_TYPES.claude;
const initialArgs = [...claudeConfig.args, ...cliArgs.agentArgs];
const fipaArgs = getFIPAPromptArgs(AGENT_TYPES, 'claude', initialArgs);
const claude = new Agent({
id: 'claude-1',
name: claudeConfig.name,
type: 'claude',
command: claudeConfig.command,
args: [...initialArgs, ...fipaArgs],
autostart: true
});
session.addAgent(claude);
session.layout = new (require('./src/layout/LayoutNode').Pane)(claude.id);
}
// Initialize layout manager
const layoutManager = new LayoutManager(session);
// Restore focus or default to first pane
if (restoredSession && session.focusedPaneId) {
layoutManager.focusedPaneId = session.focusedPaneId;
} else {
const panes = layoutManager.getAllPanes();
if (panes.length > 0) {
layoutManager.focusedPaneId = panes[0].id;
}
}
// Create TabBar
const tabBar = new TabBar();
// Start IPC hub
const ipcHub = new IPCHub(session);
try {
await ipcHub.start();
session.ipcHub = ipcHub;
} catch (err) {
// IPC is optional - continue without it
console.error('Warning: IPC hub failed to start:', err.message);
}
// Chat error buffer (needs to exist before FIPAHub init so catch blocks can push)
const pendingChatErrors = [];
const maxPendingChatErrors = 200;
const stripAnsi = (value) => String(value).replace(/\x1b\[[0-9;]*m/g, '');
// Start FIPA Hub
const fipaHub = new FIPAHub(ipcHub);
try {
// Restore conversations from saved session if available
if (pendingSessionData?.conversations) {
fipaHub.conversations.restoreFromJSON(pendingSessionData.conversations);
}
// Restore chat agents from saved session (they were skipped during initial load
// because FIPAHub didn't exist yet)
if (pendingSessionData?.agents) {
for (const agentData of pendingSessionData.agents) {
if (agentData.type === 'chat' && agentData.conversationId) {
// Check if we don't already have this agent
if (!session.getAgent(agentData.id)) {
const chatAgent = ChatAgent.fromJSON(agentData, fipaHub.conversations, fipaHub);
// Include federated peers so @chat broadcasts reach them too.
chatAgent.setAvailableAgents(
(typeof getReachableAgents === 'function')
? getReachableAgents()
: session.getAllAgents().filter(a => a.type !== 'chat')
);
session.addAgent(chatAgent);
flushPendingChatErrors();
}
}
}
}
} catch (err) {
console.error('Warning: FIPA hub failed to initialize:', err.message);
}
function getChatAgents() {
return session.getAllAgents().filter(agent => agent.type === 'chat' && typeof agent.addErrorMessage === 'function');
}
function flushPendingChatErrors() {
if (pendingChatErrors.length === 0) return;
const chatAgents = getChatAgents();
if (chatAgents.length === 0) return;
const lines = pendingChatErrors.splice(0, pendingChatErrors.length);
for (const line of lines) {
for (const agent of chatAgents) {
agent.addErrorMessage(line);
}
}
}
function broadcastChatError(text) {
const cleaned = stripAnsi(text || '').replace(/\r/g, '');
const lines = cleaned.split('\n').map(line => line.trimEnd()).filter(Boolean);
if (lines.length === 0) return;
const chatAgents = getChatAgents();
if (chatAgents.length === 0) {
pendingChatErrors.push(...lines);
if (pendingChatErrors.length > maxPendingChatErrors) {
pendingChatErrors.splice(0, pendingChatErrors.length - maxPendingChatErrors);
}
return;
}
for (const line of lines) {
for (const agent of chatAgents) {
agent.addErrorMessage(line);
}
}
}
// Broadcast informational/system announcements to all chat surfaces
function broadcastSystemMessage(text) {
if (!text) return;
if (chatPane) {
chatPane.addSystemMessage(text);
}
for (const agent of getChatAgents()) {
if (typeof agent.addSystemMessage === 'function') {
agent.addSystemMessage(text);
}
}
if (compositor) {
compositor.scheduleDraw();
}
}
// Agent output logging to bukowski.log (only active when enableFileLogging() called)
const agentLogBuffers = new Map();
const agentLogQueue = [];
let agentLogScheduled = false;
const ANSI_CSI_RE = /\x1b\[[0-9;]*[A-Za-z]/g;
const ANSI_OSC_RE = /\x1b\][^\x07]*\x07/g;
const NEWLINE_RE = /\r?\n/;
// Per-agent dedup state: when a TUI repaints the same line many times in a
// row (status bars, test progress, "Cooking…" tickers) we collapse the runs
// into a single trailing "× N" marker emitted on the next distinct line.
// Without this, bukowski.log grows by GB on long sessions.
const agentLogLastLine = new Map(); // agentId -> last cleaned line written
const agentLogRepeats = new Map(); // agentId -> repeat count since last write
function flushAgentLogQueue() {
agentLogScheduled = false;
if (agentLogQueue.length === 0 || !logStream) return;
const entries = agentLogQueue.splice(0, agentLogQueue.length);
const timestamp = new Date().toISOString();
for (const { agentId, line } of entries) {
try {
const clean = line.replace(ANSI_CSI_RE, '').replace(ANSI_OSC_RE, '');
if (!clean.trim()) continue;
if (agentLogLastLine.get(agentId) === clean) {
agentLogRepeats.set(agentId, (agentLogRepeats.get(agentId) || 0) + 1);
continue;
}
const repeats = agentLogRepeats.get(agentId) || 0;
if (repeats > 0) {
logStream.write(`${timestamp} [AGENT ${agentId}] (× ${repeats + 1})\n`);
agentLogRepeats.set(agentId, 0);
}
logStream.write(`${timestamp} [AGENT ${agentId}] ${clean}\n`);
agentLogLastLine.set(agentId, clean);
} catch { /* ignore logging errors */ }
}
}
function logAgentData(agentId, chunk) {
if (!chunk || !logStream) return;
const buffer = agentLogBuffers.get(agentId) || '';
const text = buffer + (typeof chunk === 'string' ? chunk : chunk.toString());
const parts = text.split(NEWLINE_RE);
agentLogBuffers.set(agentId, parts.pop() || '');
for (const line of parts) {
if (line) agentLogQueue.push({ agentId, line });
}
if (!agentLogScheduled && agentLogQueue.length > 0) {
agentLogScheduled = true;
setImmediate(flushAgentLogQueue);
}
}
// Respawn an agent once without resume args when a stale session ID causes launch failure.
function respawnAgentWithoutResume(agent) {
const pane = layoutManager.findPaneByAgent(agent.id);
const typeConfig = AGENT_TYPES[agent.type];
if (!pane || !typeConfig) return false;
// Build fresh args without resume, plus FIPA prompt
// Rebuild command too: saved command may be stale (e.g. 'node' from a prior install
// with a local claude entrypoint, while current install uses 'claude' from PATH)
const baseArgs = typeConfig.args || [];
const combinedArgs = [...baseArgs];
const fipaArgs = getFIPAPromptArgs(AGENT_TYPES, agent.type, combinedArgs);
agent.command = typeConfig.command;
agent.args = [...combinedArgs, ...fipaArgs];
agent.agentSessionId = null;
agent.exitCode = null;
agent.status = 'stopped';
agent.spawn(pane.bounds.width, pane.bounds.height);
setupAgentHandlers(agent);
agent.terminal?.write('\r\n\x1b[33m[resume failed; respawned fresh without session]\x1b[0m\r\n');
broadcastChatError(`${agent.id} resume failed; respawned fresh session`);
return true;
}
function handleAgentExit(agent, exitCode) {
if (exitCode !== 0) {
// If resume failed due to stale session, retry once without resume args
if (agent.agentSessionId && !agent._retriedWithoutResume) {
agent._retriedWithoutResume = true;
const restarted = respawnAgentWithoutResume(agent);
if (restarted) {
compositor.scheduleDraw();
return;
}
}
// Keep the pane open on errors so the user can see the failure.
const msg = `\r\n\x1b[31m[process exited with code ${exitCode}]\x1b[0m\r\n`;
agent.terminal?.write(msg);
if (!agent._reportedExit) {
agent._reportedExit = true;
broadcastChatError(`${agent.id} exited with code ${exitCode}`);
}
compositor.scheduleDraw();
return;
}
// Find and close the pane for this agent
const pane = layoutManager.findPaneByAgent(agent.id);
if (pane) {
// Focus this pane first so closePane() closes the right one
layoutManager.focusPane(pane.id);
const allPanes = layoutManager.getAllPanes();
if (allPanes.length === 1) {
// Last pane - quit entirely
terminal.cleanup();
if (ipcHub) ipcHub.stop();
session.destroy();
process.exit(exitCode);
} else {
// Close just this pane
const paneId = pane.id;
layoutManager.closePane();
compositor.cleanupPane(paneId); // Clear reflow timers and state
// Announce agent departure in chat (skip virtual panes)
if (agent.type !== 'chat' && agent.type !== 'dashboard') {
broadcastSystemMessage(`${agent.name} (${agent.id}) left the session`);
}
// Stop the dashboard's auto-refresh timer when its pane closes.
if (agent.type === 'dashboard' && typeof agent.destroy === 'function') {
agent.destroy();
}
session.removeAgent(agent.id);
handleResize();
}
}
}
const stderrWrite = process.stderr.write.bind(process.stderr);
let stderrBuffer = '';
process.stderr.write = (chunk, encoding, callback) => {
const text = typeof chunk === 'string' ? chunk : chunk.toString(encoding || 'utf8');
stderrBuffer += text;
const parts = stderrBuffer.split(/\r?\n/);
stderrBuffer = parts.pop() || '';
for (const line of parts) {
broadcastChatError(line);
}
return stderrWrite(chunk, encoding, callback);
};
// Start MCP Server for agent tool communication. The project-dashboard store
// is shared across instances on this box via ~/.bukowski/dashboard (re-read
// per op), so every instance gets one unless explicitly disabled.
let dashboardStore = null;
if (process.env.BUKOWSKI_NO_DASHBOARD !== '1') {
try {
const { DashboardStore } = require('./src/dashboard/DashboardStore');
dashboardStore = new DashboardStore();
} catch (err) {
console.error('[dashboard] disabled:', err.message);
}
}
// Restore dashboard panes saved in the session (virtual, PTY-less; skipped
// during the initial agent load because the store didn't exist yet). The
// layout already holds the pane referencing `dashboard-N`; we just need the
// agent in the session so getAgent() resolves it.
if (pendingSessionData?.agents) {
for (const agentData of pendingSessionData.agents) {
if (agentData.type === 'dashboard' && !session.getAgent(agentData.id)) {
session.addAgent(DashboardAgent.fromJSON(agentData, dashboardStore));
}
}
}
const mcpServer = new MCPServer(session, fipaHub, ipcHub, dashboardStore);
try {
const socketPath = await mcpServer.start();
// Set socket path in process.env so spawned agents inherit it
// This ensures agents connect to THIS instance's MCP server, not another instance's
process.env.BUKOWSKI_MCP_SOCKET = socketPath;
// Auto-register the bukowski MCP entry with whichever CLIs the user
// actually has (claude/codex/gemini), so a fresh checkout + `node
// multi.js` boots into a working FIPA setup without a separate
// `bukowski-mcp install` step. Idempotent — only writes when the
// entry is missing or its bridge path is stale, so subsequent
// startups are a cheap config read. Honors BUKOWSKI_NO_AUTO_INSTALL
// and the ~/.bukowski/.no-auto-install marker dropped by an
// explicit `bukowski-mcp uninstall`.
try {
const { autoInstallIfNeeded } = require('./src/mcp/install');
autoInstallIfNeeded();
} catch (err) {
console.error('mcp auto-install skipped:', err.message);
}
// Wire FIPAHub messages to MCP message queue and PTY injection
const fipaPromptDelayMs = parseInt(process.env.BUKOWSKI_FIPA_PROMPT_DELAY_MS, 10) || 100;
const fipaSubmitDelayMs = parseInt(process.env.BUKOWSKI_FIPA_SUBMIT_DELAY_MS, 10) || 80; // Legacy fixed delay
let fipaEchoTimeoutMs = parseInt(process.env.BUKOWSKI_FIPA_ECHO_TIMEOUT_MS, 10) || 1000; // Max wait for echo
const fipaQuietMs = parseInt(process.env.BUKOWSKI_FIPA_QUIET_MS, 10) || 250; // Required PTY-quiet window before injecting
const fipaQuietMaxWaitMs = parseInt(process.env.BUKOWSKI_FIPA_QUIET_MAX_WAIT_MS, 10) || 3000; // Max time to wait for quiet
const ANSI_STRIP_RE = /\x1b\[[0-9;?]*[A-Za-z]|\x1b\][^\x07]*\x07|[\x00-\x08\x0b-\x1f\x7f]/g;
// Per-agent FIFO to prevent overlapping injections from racing.
const fipaInjectQueues = new Map(); // agentId -> Promise chain
// Last time we saw bytes from each agent's PTY; used to detect quiet windows
// before injecting. Keyed by agent.id; populated via tapPtyForFipaQuiet().
const lastPtyDataAt = new Map();
const ptyTapped = new Set();
function tapPtyForFipaQuiet(agent) {
if (!agent?.pty) return;
if (ptyTapped.has(agent.id)) return;
ptyTapped.add(agent.id);
lastPtyDataAt.set(agent.id, Date.now());
agent.pty.onData(() => { lastPtyDataAt.set(agent.id, Date.now()); });
}
function awaitPtyQuiet(agent, quietMs, maxWaitMs) {
return new Promise((resolve) => {
const start = Date.now();
const tick = () => {
const last = lastPtyDataAt.get(agent.id) || 0;
if (Date.now() - last >= quietMs) return resolve(true);
if (Date.now() - start >= maxWaitMs) return resolve(false);
setTimeout(tick, Math.max(20, Math.min(quietMs, 50)));
};
tick();
});
}
function enqueueFipaInject(agent, prompt) {
if (!agent?.pty) return;
const prev = fipaInjectQueues.get(agent.id) || Promise.resolve();
const task = () => injectFipaWithEcho(agent, prompt).catch(() => { /* swallow to keep chain alive */ });
const next = prev.then(task, task);
fipaInjectQueues.set(agent.id, next);
}
// Inject text into an agent PTY: wait for a quiet window, write the prompt
// exactly once, then wait for the echo before sending Enter. On echo
// timeout, send Enter anyway — TUIs like Claude Code paint the input box
// with cursor moves so the literal prompt rarely lands as a contiguous
// run in the PTY data stream, but the bytes themselves did get there.
// Never write the prompt twice (would duplicate text in the input box).
async function injectFipaWithEcho(agent, prompt) {
if (!agent?.pty) return;
tapPtyForFipaQuiet(agent);
const quiet = await awaitPtyQuiet(agent, fipaQuietMs, fipaQuietMaxWaitMs);
// If the TUI never went quiet (e.g. /compact spinner, animated tool
// output), skip injection entirely. The message is already in the MCP
// queue; Claude's hooks will surface it on the next turn boundary.
// Writing into a busy redraw produces scattered character ghosts in
// the input box and corrupts the user's view.
if (!quiet) {
console.log(`[fipa-autoinject] skip ${agent.id}: PTY never quiet (${fipaQuietMaxWaitMs}ms); message stays in queue`);
try {
broadcastSystemMessage(
`[fipa] PTY inject skipped for ${agent.id} (TUI busy ${fipaQuietMaxWaitMs}ms); message is in queue, ` +
`agent will see it via hook at next tool/turn boundary`
);
} catch { /* ignore */ }
return;
}
await new Promise((resolve) => {
let done = false;
let buffer = '';
const maxBuffer = Math.max(prompt.length * 4, 1024);
const settle = (echoed, disposable, timer) => {
if (done) return;
done = true;
if (timer) clearTimeout(timer);
if (disposable?.dispose) disposable.dispose();
if (!echoed) {
console.log(`[fipa-autoinject] echo timeout for ${agent.id}; sending Enter (TUI may have repainted)`);
}
agent.pty.write('\r');
resolve();
};
const timer = setTimeout(() => settle(false, dataListener, timer), fipaEchoTimeoutMs);
const dataListener = agent.pty.onData((data) => {
if (done) return;
buffer += data;
if (buffer.length > maxBuffer) buffer = buffer.slice(-maxBuffer);
// Strip ANSI/control bytes from the rolling buffer before substring
// search — TUIs repaint with cursor moves and color codes that
// otherwise prevent the prompt from ever appearing contiguous.
const flat = buffer.replace(ANSI_STRIP_RE, '');
if (flat.includes(prompt)) settle(true, dataListener, timer);
});
agent.pty.write(prompt);
});
}
// Canonical "deliver a FIPA message to a local recipient" path. Used
// by both the local fipa:sent emitter and the federation `forward`
// handler. Without the federation branch, messages arriving from
// peer bukowskis hit IPCHub.injectFederatedMessage which can't reach
// MCP-bridge-based agents (they live in mcpServer.messageQueues, not
// ipcHub.agentSockets) — silent drop.
function deliverFipaToLocal(message, to) {
if (!to || !message) return;
mcpServer.queueMessage(to, message);
const agent = session.getAgent(to);
// Visibility for the "queue grew under an id no one will ever pull"
// case — happens when the sender targets a federated id that lives on
// another bukowski (we get fipa:sent locally as a side effect; the real
// delivery is the wire forward) or when the id is just wrong. Without
// this log a misrouted send looked exactly like success.
if (!agent && !mcpServer.externalAgents.has(to)) {
const fed = federationHub?.resolveRemote?.(to);
if (!fed) {
console.log(`[fipa] queueMessage(${to}) has no local target and no federation route`);
}
}
// PTY-backed coding agents need an in-pane nudge so they react to
// the message (Claude has hooks; codex/gemini get the formatted
// prompt typed into their input). ChatAgents are NOT poked here —
// they subscribe to ConversationManager.message:received and render
// the sender's name + content directly. Calling agent.write() on a
// ChatAgent would shove "[FIPA inform from X]" into the user's
// input buffer, which is what the previous version did wrong.
// claude is woken out-of-turn by the notifications/claude/channel push
// (queueMessage above), which injects the message as a <channel> block —
// so the primitive PTY keystroke nudge is suppressed for it (that injection
// is the whole point of channels). codex/gemini have no channel path, so
// they always get the PTY nudge. (If channels are disabled via
// BUKOWSKI_NO_CHANNELS, claude falls back to the PTY nudge too.)
const claudeViaChannel = agent?.type === 'claude' && channelsEnabled();
if (agent?.pty && message.sender?.name !== to && !claudeViaChannel) {
let prompt;
if (agent.type === 'claude') {
const sender = message.sender?.name || 'unknown';
const perf = message.performative || 'inform';
prompt = `[FIPA ${perf} from ${sender}] (see context)`;
} else {
prompt = formatFIPAForPTY(message);
if (agent.type === 'gemini') {
prompt = prompt.replace(/!/g, '.');
}
}
setTimeout(() => {
enqueueFipaInject(agent, prompt);
}, fipaPromptDelayMs);
}
}
fipaHub.on('fipa:sent', ({ message, to }) => deliverFipaToLocal(message, to));
// Surface silent drops. Until now `delivery:failed` and `error` events on
// ipcHub/fipaHub had no listeners — a routeMessage that couldn't find the
// recipient just emitted into the void and the sender's tool call still
// returned `{success:true}`, leaving the user with no signal that the
// message vanished. These listeners surface the failure in the chat pane.
const _failNotice = (() => {
let last = 0;
return (text) => {
const now = Date.now();
if (now - last < 500) return; // light debounce
last = now;
broadcastSystemMessage(`[fipa] ${text}`);
};
})();
ipcHub.on('delivery:failed', ({ messageId, to, reason }) => {
_failNotice(`delivery failed: to=${to} reason=${reason} id=${messageId}`);
});
ipcHub.on('error', (err) => {
_failNotice(`ipcHub error: ${err?.message || err}`);
});
fipaHub.on('error', (err) => {
_failNotice(`fipaHub error: ${err?.message || err}`);
});
// Format FIPA message for PTY injection (no trailing newline - sent separately)
// Short messages: include content. Long messages: just notify to check inbox.
function formatFIPAForPTY(message) {
const sender = message.sender?.name || 'unknown';
const perf = message.performative || 'inform';
// null/undefined content is legitimate (e.g. fipa_agree) — render as empty
// rather than letting JSON.stringify(undefined) return undefined and crash .replace below.
let content;
if (typeof message.content === 'string') {
content = message.content;
} else if (message.content == null) {
content = '';
} else {
content = JSON.stringify(message.content, null, 2);
}
// Escape newlines for single-line input
const escaped = content.replace(/\n/g, ' ');
// Short messages: include full content
// Long messages: just notify, tell them to check inbox
const MAX_INLINE = 200;
if (escaped.length <= MAX_INLINE) {
return `[FIPA ${perf} from ${sender}]: ${escaped}`;
} else {
const preview = escaped.slice(0, 80) + '...';
return `[FIPA ${perf} from ${sender}]: ${preview} (use get_pending_messages for full text)`;
}
}
// Write socket path to discovery files for MCP bridge
try {
// Create sockets directory (recursive to also create .bukowski if needed)
fs.mkdirSync(SOCKETS_DIR, { recursive: true });
// Per-PID socket file for ancestor matching
fs.writeFileSync(SOCKET_DISCOVERY_FILE, socketPath, 'utf-8');
// Legacy discovery file for backwards compatibility
fs.writeFileSync(LEGACY_SOCKET_FILE, socketPath, 'utf-8');
// Env var for child agents (primary discovery method)
process.env.BUKOWSKI_MCP_SOCKET = socketPath;
// Track socket path for cleanup (only delete legacy if still ours)
terminal.setSocketPath(socketPath);
} catch {
// Ignore - discovery file is optional
}
// Snapshot of every agent the chat pane can address — local session
// agents plus federated agents reachable via peers. Used at chat-pane
// creation and re-pushed to all existing chat agents whenever the
// federated roster changes, so `@chat` broadcasts and the target
// picker reflect what's actually out there.
function getReachableAgents() {
// Plain objects (not Agent instances) so each entry can carry a
// `source` tag — the chat pane uses it to distinguish @chat
// (local-only broadcast) from @swarm (federated broadcast).
const local = session.getAllAgents()
.filter(a => a.type !== 'chat')
.map(a => ({ id: a.id, name: a.name, type: a.type, source: 'session' }));
const federated = federationHub?.remoteAgents
? Array.from(federationHub.remoteAgents.entries()).map(([fid, info]) => ({
id: fid,
name: fid,
type: info.type,
source: 'federated'
}))
: [];
return [...local, ...federated];
}
function syncChatAgentsRoster() {
const reachable = getReachableAgents();
for (const agent of session.getAllAgents()) {
if (agent.type === 'chat' && typeof agent.setAvailableAgents === 'function') {
try { agent.setAvailableAgents(reachable); } catch { /* ignore */ }
}
}
}
// Peer discovery + federation transport + roster sync + routing.
// Phase 1: PeerRegistry advertises this bukowski under a host name
// derived from cwd and learns about siblings.
// Phase 2: FederationHub opens a listen socket and dials siblings.
// Phase 3: Local session agents are propagated via the roster; remote
// agents are kept in FederationHub.remoteAgents.
// Phase 4: IPCHub consults the federation when a target isn't local,
// and FederationHub.forward events deliver inbound back
// through IPCHub.injectFederatedMessage.
try {
peerRegistry = new PeerRegistry({
sessionId: session.id,
ipcSocket: ipcHub.getSocketPath?.() || null,
mcpSocket: socketPath
});
const resolvedHost = peerRegistry.start();
process.env.BUKOWSKI_HOST = resolvedHost;
peerRegistry.on('peer:appeared', (peer) => {
broadcastSystemMessage(`peer ${peer.host} (pid ${peer.pid}) joined`);
});
peerRegistry.on('peer:gone', (peer) => {
broadcastSystemMessage(`peer ${peer.host} (pid ${peer.pid}) left`);
});
// Federated agent id for a local session agent. ChatAgents and
// external bridge clients aren't federated (they stay confined to
// their connected bukowski).
const isFederatable = (agent) => !!agent && agent.type !== 'chat' && !!agent.pty;
const federatedIdFor = (agent) => `${agent.type}-${resolvedHost}-${_localCount(agent)}`;
// Stable per-(type,host) numbering across the session's lifetime.
// We use the existing local id's trailing number when present,
// falling back to the order of addAgent calls.
const _typeCounters = new Map();
function _localCount(agent) {
// Prefer the numeric suffix the local Agent already has (e.g.
// 'claude-1' -> 1); otherwise mint one.
const m = /-(\d+)$/.exec(agent.id || '');
if (m) return parseInt(m[1], 10);
const n = (_typeCounters.get(agent.type) || 0) + 1;
_typeCounters.set(agent.type, n);
return n;
}
function snapshotLocalRoster() {
const ptyAgents = session.getAllAgents()
.filter(isFederatable)
.map(a => ({ localId: a.id, type: a.type, federatedId: federatedIdFor(a) }));
// External bridge clients (codex/gemini connecting purely through the
// MCP bridge — no pty, no session pane) must federate too, else peers
// can route TO them (their host learns our roster) but never learn of
// THEM: list_agents omits them and replies fail "Unknown agent". Their
// assigned id already embeds the host (e.g. codex-azra-agent-1) and is
// both the externalAgents key and the messageQueue key, so localId and
// federatedId are the same.
const bridgeAgents = (mcpServer.getExternalAgents?.() || [])
.map(a => ({ localId: a.id, type: a.type, federatedId: a.id }));
return [...ptyAgents, ...bridgeAgents];
}
federationHub = new FederationHub({
host: resolvedHost,
sessionId: session.id,
peerRegistry,
getLocalRoster: snapshotLocalRoster
});
federationHub.on('peer:connected', ({ host, direction }) => {
broadcastSystemMessage(`federation: connected to ${host} (${direction})`);
});
federationHub.on('peer:disconnected', ({ host, reason }) => {
broadcastSystemMessage(`federation: disconnected from ${host} (${reason})`);
// Fail any FIPA requests waiting on agents that lived on this peer,
// instead of letting them sit on the default 30s timeout.
try {
const n = fipaHub.failPendingForHost(host);
if (n > 0) broadcastSystemMessage(`federation: ${n} in-flight request(s) to ${host} failed`);
} catch { /* ignore */ }
});
// Inbound forwards from peers. `to` was already rewritten by the
// sender's FederationHub down to the recipient's local id. For
// FIPA payloads (the common case) we reconstruct the FIPAMessage,
// track it in the conversation manager so ChatPane sees it, and
// hand it to the same delivery helper used by local sends — that
// queues on mcpServer for the agent's bridge to poll and fires
// the PTY wakeup nudge. Non-FIPA payloads fall back to IPCHub.
federationHub.on('forward', ({ payload }) => {
if (!payload || !payload.ipcMessage) return;
const ipcMsg = payload.ipcMessage;
const fipaData = ipcMsg.payload?._fipaMessage;
if (fipaData) {
let fipaMessage;
try { fipaMessage = FIPAMessageClass.fromJSON(fipaData); }
catch (err) {
console.error('federation: failed to parse incoming FIPA:', err.message);
return;
}
// Verify the forward was actually meant for an agent on THIS
// bukowski. The wire's `to` is a bare local id ("claude-1") that
// matches more than one bukowski; `_federatedTo` (set by the
// sender's FederationHub) carries the unique federated id. If it
// doesn't resolve to a local agent here, the sender mis-routed
// (e.g. two bukowskis raced on host resolution and the registry
// didn't see the collision in time). Drop instead of letting the
// wrong claude-1 claim a sibling's message.
let resolvedLocalId = null;
const federatedTo = ipcMsg._federatedTo;
if (federatedTo) {
const found = session.getAllAgents().find(
a => isFederatable(a) && federatedIdFor(a) === federatedTo
);
if (found) {
resolvedLocalId = found.id;
} else if (mcpServer.externalAgents.has(federatedTo)) {
// External bridge agent: its federated id IS its local id and
// its messageQueue key, so deliver straight to that queue (the
// bridge polls get_pending_messages).
resolvedLocalId = federatedTo;
} else {
broadcastSystemMessage(
`federation: dropped misrouted forward (to ${federatedTo}; no local match)`
);
return;
}
} else {
// Older sender without `_federatedTo` — fall back to the
// wire's `to` and hope for the best.
resolvedLocalId = ipcMsg.to;
}
try { fipaHub.conversations.handleMessage(fipaMessage); } catch { /* ignore */ }
deliverFipaToLocal(fipaMessage, resolvedLocalId);
} else {
try { ipcHub.injectFederatedMessage(ipcMsg); }
catch (err) { console.error('federation: inbound delivery failed:', err.message); }
}
});
// Coordination-event federation: fan locally-published events out to
// peers, and inject peer events into the local bus. Most useful
// subscriptions (a peer's deploy lifecycle, agent status, dashboard
// mutations) cross a federation boundary, so without this the bus only
// carries an instance's own echoes. 'published' fires for local-origin
// events only (injectRemote suppresses it), so this never bounces a
// received event back — belt-and-suspenders with the wire hop guard.
if (mcpServer.eventBus) {
mcpServer.eventBus.on('published', (ev) => {
try { federationHub.broadcastEvent(ev); } catch { /* advisory */ }
});
federationHub.on('event', ({ event }) => {
// Inject only — no re-fan. The peer set is a full mesh (every pair
// has one direct connection), so the origin's single broadcast
// already reached every instance; relaying would just deliver
// duplicates. The wire hop guard + injectRemote's no-re-emit remain
// as belt-and-suspenders if the topology ever stops being complete.
try { mcpServer.eventBus.injectRemote(event); } catch { /* advisory */ }
});
}
// Attach the federation router to IPCHub so non-local targets
// route through the wire instead of failing.
ipcHub.attachFederation({
resolveRemote: (id) => federationHub.resolveRemote(id),
forwardIpcMessage: (msg) => federationHub.forwardIpcMessage(msg),
federateSenderId: (localId) => {
if (!localId) return localId;
const agent = session.getAgent(localId);
if (agent && isFederatable(agent)) return federatedIdFor(agent);
return localId; // already federated, or non-federatable sender
}
});
// Propagate local session changes to peers.
const onAgentAdded = (agent) => {
if (isFederatable(agent)) {
federationHub.announceLocalAgent({
localId: agent.id, type: agent.type, federatedId: federatedIdFor(agent)
});
}
syncChatAgentsRoster();
};
const onAgentRemoved = (agent) => {
if (isFederatable(agent)) {
federationHub.announceLocalRemoval({
localId: agent.id, type: agent.type, federatedId: federatedIdFor(agent)
});
}
syncChatAgentsRoster();
};
session.on('agent:added', onAgentAdded);
session.on('agent:removed', onAgentRemoved);
// Re-sync chat panes whenever the federated roster changes so
// their @chat broadcast (and target picker) sees peer agents.
federationHub.on('peer:connected', () => syncChatAgentsRoster());
federationHub.on('peer:disconnected', () => syncChatAgentsRoster());
federationHub.on('roster', () => syncChatAgentsRoster());
const fedSocketPath = await federationHub.start();
// Re-advertise with fedSocket so siblings know how to dial us.
peerRegistry.update({ fedSocket: fedSocketPath });
// MCPServer surfaces federated agents in list_agents.
try { mcpServer.attachFederation(federationHub); } catch { /* ignore */ }
terminal.onShutdown(() => {
try { session.off('agent:added', onAgentAdded); } catch { /* ignore */ }
try { session.off('agent:removed', onAgentRemoved); } catch { /* ignore */ }
try { ipcHub.attachFederation(null); } catch { /* ignore */ }
try { mcpServer.attachFederation(null); } catch { /* ignore */ }
try { federationHub.stop(); } catch { /* ignore */ }
try { peerRegistry.stop(); } catch { /* ignore */ }
});
} catch (err) {
console.error('Warning: peer registry/federation failed to start:', err.message);
}
} catch (err) {
console.error('Warning: MCP server failed to start:', err.message);
}
// Create FIPA UI components
const conversationList = new ConversationList(fipaHub.conversations);
chatPane = new ChatPane(fipaHub.conversations, {
localHost: peerRegistry ? peerRegistry.getHost() : null
});
// Wire MCPServer agent connect/disconnect notifications to chat
mcpServer.on('external_agent:connected', ({ agentId, agentType }) => {
broadcastSystemMessage(`${agentId} joined the session`);
// Federate the bridge client so peers can resolve + reply to it. Its
// assigned id already embeds the host, so localId === federatedId.
if (federationHub) {
try {
federationHub.announceLocalAgent({
localId: agentId, type: agentType || agentId.split('-')[0], federatedId: agentId
});
} catch { /* federation optional */ }
}
});
mcpServer.on('external_agent:disconnected', (agentId) => {