Skip to content

Commit e1da45a

Browse files
committed
watcher to set status to stopped for certain tests
1 parent f755623 commit e1da45a

File tree

2 files changed

+101
-2
lines changed

2 files changed

+101
-2
lines changed

eval_protocol/dataset_logger/sqlite_dataset_logger_adapter.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import os
2-
from turtle import st
3-
from typing import TYPE_CHECKING, List, Optional
2+
from typing import List, Optional
43

54
from eval_protocol.dataset_logger.dataset_logger import LOG_EVENT_TYPE, DatasetLogger
65
from eval_protocol.dataset_logger.sqlite_evaluation_row_store import SqliteEvaluationRowStore

eval_protocol/utils/logs_server.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from queue import Queue
99
from typing import TYPE_CHECKING, Any, List, Optional
1010

11+
import psutil
1112
import uvicorn
1213
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
1314

@@ -108,6 +109,97 @@ def stop_broadcast_loop(self):
108109
self._broadcast_task.cancel()
109110

110111

112+
class EvaluationWatcher:
113+
"""Monitors running evaluations and updates their status when processes stop."""
114+
115+
def __init__(self, websocket_manager: "WebSocketManager"):
116+
self.websocket_manager = websocket_manager
117+
self._running = False
118+
self._thread: Optional[threading.Thread] = None
119+
self._stop_event = threading.Event()
120+
121+
def start(self):
122+
"""Start the evaluation watcher thread."""
123+
if self._running:
124+
return
125+
126+
self._running = True
127+
self._stop_event.clear()
128+
self._thread = threading.Thread(target=self._watch_loop, daemon=True)
129+
self._thread.start()
130+
logger.info("Evaluation watcher started")
131+
132+
def stop(self):
133+
"""Stop the evaluation watcher thread."""
134+
if not self._running:
135+
return
136+
137+
self._running = False
138+
self._stop_event.set()
139+
if self._thread and self._thread.is_alive():
140+
self._thread.join(timeout=5)
141+
logger.info("Evaluation watcher stopped")
142+
143+
def _watch_loop(self):
144+
"""Main loop that checks for stopped processes every 2 seconds."""
145+
while self._running and not self._stop_event.is_set():
146+
try:
147+
self._check_running_evaluations()
148+
# Wait 2 seconds before next check
149+
self._stop_event.wait(2)
150+
except Exception as e:
151+
logger.error(f"Error in evaluation watcher loop: {e}")
152+
# Continue running even if there's an error
153+
time.sleep(1)
154+
155+
def _check_running_evaluations(self):
156+
"""Check all running evaluations and update status for stopped processes."""
157+
try:
158+
logs = default_logger.read()
159+
updated_rows = []
160+
161+
for row in logs:
162+
if self._should_update_status(row):
163+
logger.info(f"Updating status to 'stopped' for row {row.input_metadata.row_id} (PID {row.pid})")
164+
if row.eval_metadata is not None:
165+
row.eval_metadata.status = "stopped"
166+
updated_rows.append(row)
167+
168+
# Log all updated rows
169+
for row in updated_rows:
170+
default_logger.log(row)
171+
# Broadcast the update to connected clients
172+
self.websocket_manager.broadcast_row_upserted(row)
173+
174+
except Exception as e:
175+
logger.error(f"Error checking running evaluations: {e}")
176+
177+
def _should_update_status(self, row: "EvaluationRow") -> bool:
178+
"""Check if a row's status should be updated to 'stopped'."""
179+
# Check if the row has running status and a PID
180+
if row.eval_metadata and row.eval_metadata.status == "running" and row.pid is not None:
181+
182+
# Check if the process is still running
183+
try:
184+
process = psutil.Process(row.pid)
185+
# Check if process is still running
186+
if not process.is_running():
187+
return True
188+
except psutil.NoSuchProcess:
189+
# Process no longer exists
190+
return True
191+
except psutil.AccessDenied:
192+
# Can't access process info, assume it's stopped
193+
logger.warning(f"Access denied to process {row.pid}, assuming stopped")
194+
return True
195+
except Exception as e:
196+
logger.error(f"Error checking process {row.pid}: {e}")
197+
# On error, assume process is still running to be safe
198+
return False
199+
200+
return False
201+
202+
111203
class LogsServer(ViteServer):
112204
"""
113205
Enhanced server for serving Vite-built SPA with file watching and WebSocket support.
@@ -131,6 +223,9 @@ def __init__(
131223

132224
super().__init__(build_dir, host, port, index_file)
133225

226+
# Initialize evaluation watcher
227+
self.evaluation_watcher = EvaluationWatcher(self.websocket_manager)
228+
134229
# Add WebSocket endpoint
135230
self._setup_websocket_routes()
136231

@@ -191,6 +286,9 @@ async def run_async(self):
191286
# Start the broadcast loop
192287
self.websocket_manager.start_broadcast_loop()
193288

289+
# Start the evaluation watcher
290+
self.evaluation_watcher.start()
291+
194292
config = uvicorn.Config(
195293
self.app,
196294
host=self.host,
@@ -204,6 +302,8 @@ async def run_async(self):
204302
except KeyboardInterrupt:
205303
logger.info("Shutting down LogsServer...")
206304
finally:
305+
# Clean up evaluation watcher
306+
self.evaluation_watcher.stop()
207307
# Clean up broadcast loop
208308
self.websocket_manager.stop_broadcast_loop()
209309

0 commit comments

Comments
 (0)