Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions graphindex/frontend/src/api/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,22 @@ export const api = {
extendedAsk: (opts) => postJSON('/api/extended_ask', opts),
}

// Long-polling only: the server runs Flask-SocketIO under Werkzeug's
// threading mode, which does not handle the websocket transport reliably
// (server-side returns 500 "write() before start_response"). Polling is the
// supported transport for this deployment.
const SOCKET_OPTS = { transports: ['polling'], upgrade: false }

export function connectSocket(onEvent, onHello) {
const socket = io(BASE || '/', { transports: ['polling'], upgrade: false })
const socket = io(BASE || '/', SOCKET_OPTS)
socket.on('index_event', onEvent)
if (onHello) socket.on('hello', onHello)
return socket
}

// Separate listener for extended_ask streaming (the "ext_event" channel).
export function connectExtSocket(onEvent) {
const socket = io(BASE || '/', { transports: ['polling'], upgrade: false })
const socket = io(BASE || '/', SOCKET_OPTS)
socket.on('ext_event', onEvent)
return socket
}
95 changes: 81 additions & 14 deletions graphindex/graphindex/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

from __future__ import annotations

import logging
import threading
import time
from pathlib import Path

from flask import Flask, send_from_directory
from flask_cors import CORS
from flask_socketio import SocketIO

log = logging.getLogger(__name__)

from ..config import Config
from ..pipeline.events import IndexEvent
from ..pipeline.orchestrator import Indexer
Expand Down Expand Up @@ -46,31 +50,81 @@ def _resolve_frontend_dist() -> Path | None:

_FRONTEND_DIST = _resolve_frontend_dist()

# Event coalescing: cap the number of events forwarded to SocketIO subscribers
# in any single batch, and the minimum interval between batches. Without this,
# a full re-index of a large repo will flood the message queue and the browser.
_BATCH_INTERVAL = 0.05 # seconds
_BATCH_MAX_EVENTS = 500 # events per emit


def create_app(cfg: Config):
app = Flask(__name__, static_folder=None)
CORS(app)
socketio = SocketIO(
app,
cors_allowed_origins="*",
async_mode="threading",
allow_upgrades=False,
transports=["polling"],
)
# ``async_mode="threading"`` + the plain Werkzeug dev server does not handle
# WebSocket transport reliably (causes the "write() before start_response"
# 500s seen in production logs). ``allow_upgrades=False`` blocks the
# polling -> websocket *upgrade*, and the connect handler below rejects any
# client that arrives directly on the websocket transport so engine.io
# never enters that broken code path. Long-polling is fully functional.
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="threading",
allow_upgrades=False)
Comment on lines +69 to +70

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Force polling instead of only disabling upgrades

In the inspected WebUI socket paths, frontend/src/api/client.js still connects with transports: ['websocket', 'polling'], so the browser's first request is a direct /socket.io/?transport=websocket. allow_upgrades=False only removes websocket from the polling handshake's upgrade list; python-engineio still handles an initial transport == 'websocket' by entering the websocket request path. As a result, the Werkzeug WebSocket 500s this patch is trying to avoid can still occur for the bundled frontend; force polling with the Engine.IO/Socket.IO transports=['polling'] option on the server or update the client transport list.

Useful? React with 👍 / 👎.


state = AppState(cfg)
app.config["GRAPHINDEX_STATE"] = state
app.config["GRAPHINDEX_SOCKETIO"] = socketio

# Bridge the event bus -> SocketIO. Indexing events go on "index_event";
# extended_ask events (type starting with "ext_") also go on "ext_event".
# Bridge the event bus -> SocketIO with coalescing so a fast indexer cannot
# flood the WS channel / the browser. Events are buffered and flushed by a
# single background dispatcher thread at most every ``_BATCH_INTERVAL``.
pending_index: list[dict] = []
pending_ext: list[dict] = []
pending_lock = threading.Lock()
flush_event = threading.Event()

def _forward(evt: IndexEvent) -> None:
d = evt.to_dict()
if evt.type.startswith("ext_"):
socketio.emit("ext_event", d)
else:
socketio.emit("index_event", d)

with pending_lock:
bucket = pending_ext if evt.type.startswith("ext_") else pending_index
bucket.append(d)
# cap unbounded growth if the dispatcher falls behind
if len(bucket) > _BATCH_MAX_EVENTS * 20:
del bucket[:-_BATCH_MAX_EVENTS * 10]
flush_event.set()

def _dispatcher() -> None:
# The dispatcher MUST keep running for the lifetime of the server.
# A single uncaught exception from socketio.emit() (e.g. transient
# network / client-disconnect / library error) would otherwise kill
# the thread and silently stop forwarding all future events.
while True:
try:
flush_event.wait()
flush_event.clear()
time.sleep(_BATCH_INTERVAL) # coalesce a burst
with pending_lock:
idx_batch = pending_index[:_BATCH_MAX_EVENTS]
del pending_index[:len(idx_batch)]
ext_batch = pending_ext[:_BATCH_MAX_EVENTS]
del pending_ext[:len(ext_batch)]
more = bool(pending_index or pending_ext)
for d in idx_batch:
try:
socketio.emit("index_event", d)
except Exception as exc:
log.warning("socketio.emit(index_event) failed: %s", exc)
for d in ext_batch:
try:
socketio.emit("ext_event", d)
except Exception as exc:
log.warning("socketio.emit(ext_event) failed: %s", exc)
if more:
flush_event.set() # keep draining
except Exception as exc:
log.exception("socketio dispatcher loop error: %s", exc)
time.sleep(1.0) # avoid hot-spin on a repeating failure

threading.Thread(target=_dispatcher, daemon=True,
name="socketio-dispatcher").start()
state.bus.subscribe(_forward)

# Background extended_ask runner (single-flight).
Expand Down Expand Up @@ -129,8 +183,21 @@ def _job():

@socketio.on("connect")
def _on_connect():
# Reject any client that arrived on the websocket transport directly
# (i.e. without going through polling first). ``allow_upgrades=False``
# already disables polling -> ws upgrades; this guard closes the
# remaining hole where a client whose transports list starts with
# 'websocket' would still hit the broken Werkzeug WS path.
from flask import request as _flask_request
try:
transport = _flask_request.args.get("transport", "")
except Exception:
transport = ""
if transport == "websocket":
return False # engine.io interprets False as "reject connection"
socketio.emit("hello", {"indexing": state.indexing,
"repo": str(cfg.repo_path)})
return None

# ---- frontend (SPA) ----
@app.get("/")
Expand Down
104 changes: 84 additions & 20 deletions graphindex/graphindex/api/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@

Holds the singletons the API/sockets operate on and (re)builds the read-side
components after an index run so freshly indexed data is served immediately.

The read-side components (db / vectors / embedder / search_engine / chat /
ask_engine) are exposed as a single immutable snapshot
(:class:`_ReadState`), referenced atomically by ``self._state``. Public
attributes ``db``, ``vectors``, ... proxy to the current snapshot via
``__getattr__``, so a request thread that captured one of them at the start
of a request keeps using a consistent set even if :meth:`reload` swaps in a
new snapshot mid-request.
"""

from __future__ import annotations

import threading
from dataclasses import dataclass
from typing import Any

from ..config import Config
from ..embedding import get_embedder
Expand All @@ -18,31 +28,62 @@
from ..storage.compsrc import CompSrc


@dataclass(frozen=True)
class _ReadState:
"""Immutable bundle of read-side components built together."""
db: GraphDB
vectors: VectorStore
embedder: Any
search_engine: SearchEngine
chat: Any
ask_engine: AskEngine


class AppState:
def __init__(self, cfg: Config):
self.cfg = cfg
cfg.ensure_dirs()
self.bus = EventBus()
self.index_lock = threading.Lock()
self.ask_lock = threading.Lock()
# Guards the *swap* of self._state (writer-side only). Readers do not
# take this lock; they capture self._state into a local snapshot
# (which is just a pointer assignment — atomic under the GIL).
self._reload_lock = threading.Lock()
self.indexing = False
self.asking = False
self.watcher = None
self.last_extended = None
self.compsrc = CompSrc(cfg.repo_path)
self._open()

def _open(self) -> None:
self.db = GraphDB(self.cfg.db_path)
dim = self.db.get_meta("embed_dim", self.cfg.embed_dim) or self.cfg.embed_dim
self.vectors = VectorStore(self.cfg.vectors_path, dim)
# Embedder for query-time semantic search (lazy/real or fallback).
self.embedder = get_embedder(self.cfg)
self.search_engine = SearchEngine(self.db, self.vectors, self.embedder)
# Chat model is loaded lazily on first ask; building the engine is cheap.
self.chat = get_chat(self.cfg)
self.ask_engine = AskEngine(self.cfg, self.db, self.vectors,
self.embedder, chat=self.chat)
self._state: _ReadState = self._build()

# ---- public attribute proxy ------------------------------------------
def __getattr__(self, name: str) -> Any:
# Only invoked for attributes not found on the instance — i.e. the
# read-side fields, which live on the current snapshot. Note: each
# access reads the *current* snapshot. Callers that need a consistent
# view across multiple fields should call :meth:`snapshot` once.
if name in _ReadState.__dataclass_fields__:
return getattr(self._state, name)
raise AttributeError(name)

def snapshot(self) -> _ReadState:
"""Return the current read-side snapshot (consistent set of components)."""
return self._state

# ---- construction ----------------------------------------------------
def _build(self) -> _ReadState:
"""Construct a fresh set of read-side components. Pure: no self mutation."""
db = GraphDB(self.cfg.db_path)
dim = db.get_meta("embed_dim", self.cfg.embed_dim) or self.cfg.embed_dim
vectors = VectorStore(self.cfg.vectors_path, dim)
embedder = get_embedder(self.cfg)
search_engine = SearchEngine(db, vectors, embedder)
chat = get_chat(self.cfg)
ask_engine = AskEngine(self.cfg, db, vectors, embedder, chat=chat)
return _ReadState(db=db, vectors=vectors, embedder=embedder,
search_engine=search_engine, chat=chat,
ask_engine=ask_engine)

def ensure_chat(self):
"""Retry chat-model discovery and keep AskEngine wired to it."""
Expand All @@ -53,20 +94,43 @@ def ensure_chat(self):
return self.chat
Comment on lines 88 to 94

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid direct writes to proxied read-side fields in ensure_chat.

Line 91 and Line 92 assign self.chat / self.ask_engine on the instance, which shadows __getattr__-proxied snapshot fields and breaks atomic snapshot semantics after the first call. This path can also mix generations by reading self.db, self.vectors, and self.embedder via separate lookups during a concurrent reload.

🔧 Proposed fix
-from dataclasses import dataclass
+from dataclasses import dataclass, replace
@@
     def ensure_chat(self):
-        """Retry chat-model discovery and keep AskEngine wired to it."""
-        if self.chat is None:
-            self.chat = get_chat(self.cfg)
-            self.ask_engine = AskEngine(self.cfg, self.db, self.vectors,
-                                        self.embedder, chat=self.chat)
-        return self.chat
+        """Retry chat-model discovery and atomically publish updated snapshot."""
+        snap = self._state
+        if snap.chat is not None:
+            return snap.chat
+        chat = get_chat(self.cfg)
+        with self._reload_lock:
+            cur = self._state
+            if cur.chat is None:
+                self._state = replace(
+                    cur,
+                    chat=chat,
+                    ask_engine=AskEngine(
+                        self.cfg, cur.db, cur.vectors, cur.embedder, chat=chat
+                    ),
+                )
+            return self._state.chat
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@graphindex/graphindex/api/state.py` around lines 88 - 94, The ensure_chat
method currently writes directly to proxied read-side fields (self.chat,
self.ask_engine) which shadows __getattr__-proxied snapshot fields and can mix
generations; fix by avoiding instance attribute assignments: read dependent
fields once into locals (e.g., db = self.db; vectors = self.vectors; embedder =
self.embedder), create chat = get_chat(self.cfg) and ask_engine =
AskEngine(self.cfg, db, vectors, embedder) locally, then publish them back into
the read-side snapshot using the component's atomic update/publish mechanism
(instead of setting self.chat/self.ask_engine directly) so the update is atomic
and does not break proxy semantics.


def build_extended(self, opts: dict, bus) -> ExtendedAsk:
"""Construct an ExtendedAsk orchestrator with caps from the request."""
self.ensure_chat()
"""Construct an ExtendedAsk orchestrator with caps from the request.

Reads from a single snapshot to avoid mixing old + new components if
a reload is racing this call.
"""
snap = self._state
return ExtendedAsk(
self.cfg, self.db, self.vectors, self.embedder, chat=self.chat, bus=bus,
self.cfg, snap.db, snap.vectors, snap.embedder, chat=snap.chat, bus=bus,
keyword_rounds=int(opts.get("keyword_rounds", 2)),
keywords_per_round=int(opts.get("keywords_per_round", 4)),
agents_per_round=int(opts.get("agents_per_round", 3)),
max_rounds=int(opts.get("max_rounds", 10)),
)

def reload(self) -> None:
"""Re-open read components (call after an index run completes)."""
"""Re-open read components after an index run.

Builds the new snapshot OUTSIDE the swap lock (so request threads
aren't blocked while we load the embedder / chat model), then takes
the lock just to perform the atomic pointer swap and capture the
old snapshot. The old DB connection is closed on a delayed daemon
timer so any in-flight request that already captured it can finish.
"""
new_state = self._build() # heavy work, no lock
with self._reload_lock: # short critical section
old_state = self._state
self._state = new_state
try:
self.db.close()
timer = threading.Timer(2.0, lambda: _safe_close(old_state.db))
timer.daemon = True # don't block interpreter shutdown
timer.start()
except Exception:
pass
self._open()
_safe_close(old_state.db)
Comment on lines +125 to +129

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Replace fixed-delay DB close with usage-aware lifecycle.

Line 125 uses a hardcoded 2-second delayed close for old_state.db. Long-running in-flight work can exceed that window and then fail on a closed DB handle. This is a real race under concurrent reload + request execution.

🧰 Tools
🪛 Ruff (0.15.15)

[warning] 128-128: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@graphindex/graphindex/api/state.py` around lines 125 - 129, The current logic
uses a fixed 2s delayed close (threading.Timer + _safe_close(old_state.db))
which can close the DB while in-flight requests still use it; instead implement
a usage-aware lifecycle: add a reference count or an "in_use" tracking API on
the state/db (e.g., acquire()/release() or increment/decrement a counter on
old_state/db) and defer calling _safe_close(old_state.db) until the usage count
reaches zero (or wait on an Event that release() sets), removing the fixed
Timer; update callers to call acquire() when they begin using state.db and
release() when done so reload can safely close the old_state only when no users
remain.



def _safe_close(db) -> None:
try:
db.close()
except Exception:
pass
5 changes: 4 additions & 1 deletion graphindex/graphindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ def serve(repo, host, port, backend, watch):
if watch:
from graphindex.watcher import RepoWatcher
state = app.config["GRAPHINDEX_STATE"]
w = RepoWatcher(cfg, bus=state.bus)
# Share the API's single-flight index_lock so the watcher and the
# foreground /api/index runner never collide on the same DB/vectors.
w = RepoWatcher(cfg, bus=state.bus, index_lock=state.index_lock,
on_reload=state.reload)
w.start()
state.watcher = w
url = f"http://{cfg.host}:{cfg.port}"
Expand Down
Loading
Loading