Skip to content

Commit e5883ae

Browse files
authored
UI styling / Log fix (#261)
* match height of logs * properly create timestmap in log emission * added working test * add clear index / set/cleanup logic for all ecs tests * remve skip markers * add ecs tests * Enhance logging and debugging capabilities - Added a `--debug` flag to the logs command for enabling debug mode. - Updated logs command to display debug mode status. - Enhanced logging in the SqliteDatasetLoggerAdapter and SqliteEventBus for better traceability of events and errors. - Implemented debug logging in the WebSocketManager and LogsServer to provide detailed insights during WebSocket connections and broadcasts. - Enabled debug mode for all relevant loggers in the logs server system. * Improve WebSocket connection handling - Updated the WebSocket connection logic to prevent multiple connections by checking for both OPEN and CONNECTING states. - This change addresses potential issues in React strict mode where multiple connection attempts could occur. * Refactor SqliteEventBus for async event handling - Updated SqliteEventBus to use asyncio for cross-process event listening, replacing the previous threading implementation. - Changed the processed field in the database from CharField to BooleanField for better data integrity. - Adjusted event processing logic to accommodate the new async structure, ensuring events are handled correctly across processes. - Enhanced test cases to support async operations and validate cross-process event communication. * fix tests * increase timeout
1 parent 8421236 commit e5883ae

18 files changed

+592
-143
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ jobs:
110110
--ignore=tests/pytest/test_svgbench.py \
111111
--ignore=tests/pytest/test_livesvgbench.py \
112112
--ignore=tests/remote_server/test_remote_fireworks_propagate_status.py \
113+
--ignore=tests/logging/test_elasticsearch_direct_http_handler.py \
113114
--ignore=eval_protocol/benchmarks/ \
114115
--cov=eval_protocol --cov-append --cov-report=xml --cov-report=term-missing -v --durations=10
115116

.github/workflows/remote-rollout-processor-propagate-status-test.yml renamed to .github/workflows/elasticsearch-tests.yml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: RemoteRolloutProcessor Propagate Status Test
1+
name: Elasticsearch Tests
22

33
on:
44
push:
@@ -13,8 +13,8 @@ on:
1313
workflow_dispatch: # Allow manual triggering
1414

1515
jobs:
16-
remote-rollout-processor-propagate-status-smoke-test:
17-
name: Fireworks Propagate Status Smoke Test
16+
elasticsearch-tests:
17+
name: Elasticsearch Integration Tests
1818
runs-on: ubuntu-latest
1919

2020
steps:
@@ -36,10 +36,14 @@ jobs:
3636
- name: Install the project
3737
run: uv sync --locked --all-extras --dev
3838

39-
- name: Run RemoteRolloutProcessor Propagate Status Smoke Test
39+
- name: Run Elasticsearch Tests
4040
env:
4141
FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }}
4242
PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning"
4343
run: |
44+
# Run Elasticsearch direct HTTP handler tests
45+
uv run pytest tests/logging/test_elasticsearch_direct_http_handler.py -v --tb=short
46+
47+
# Run RemoteRolloutProcessor Propagate Status Smoke Test (also uses Elasticsearch)
4448
uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py::test_remote_rollout_and_fetch_fireworks_propagate_status \
4549
-v --tb=short

eval_protocol/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ def parse_args(args=None):
300300
# Logs command
301301
logs_parser = subparsers.add_parser("logs", help="Serve logs with file watching and real-time updates")
302302
logs_parser.add_argument("--port", type=int, default=8000, help="Port to bind to (default: 8000)")
303+
logs_parser.add_argument("--debug", action="store_true", help="Enable debug mode")
303304

304305
# Upload command
305306
upload_parser = subparsers.add_parser(

eval_protocol/cli_commands/logs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def logs_command(args):
1616
print(f"🌐 URL: http://localhost:{port}")
1717
print(f"🔌 WebSocket: ws://localhost:{port}/ws")
1818
print(f"👀 Watching paths: {['current directory']}")
19+
print(f"🔍 Debug mode: {args.debug}")
1920
print("Press Ctrl+C to stop the server")
2021
print("-" * 50)
2122

@@ -25,7 +26,7 @@ def logs_command(args):
2526
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
2627

2728
try:
28-
serve_logs(port=args.port, elasticsearch_config=elasticsearch_config)
29+
serve_logs(port=args.port, elasticsearch_config=elasticsearch_config, debug=args.debug)
2930
return 0
3031
except KeyboardInterrupt:
3132
print("\n🛑 Server stopped by user")

eval_protocol/dataset_logger/sqlite_dataset_logger_adapter.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,19 @@ def __init__(self, db_path: Optional[str] = None, store: Optional[SqliteEvaluati
2323

2424
def log(self, row: "EvaluationRow") -> None:
2525
data = row.model_dump(exclude_none=True, mode="json")
26+
rollout_id = data.get("execution_metadata", {}).get("rollout_id", "unknown")
27+
logger.debug(f"[EVENT_BUS_EMIT] Starting to log row with rollout_id: {rollout_id}")
28+
2629
self._store.upsert_row(data=data)
30+
logger.debug(f"[EVENT_BUS_EMIT] Successfully stored row in database for rollout_id: {rollout_id}")
31+
2732
try:
33+
logger.debug(f"[EVENT_BUS_EMIT] Emitting event '{LOG_EVENT_TYPE}' for rollout_id: {rollout_id}")
2834
event_bus.emit(LOG_EVENT_TYPE, EvaluationRow(**data))
35+
logger.debug(f"[EVENT_BUS_EMIT] Successfully emitted event for rollout_id: {rollout_id}")
2936
except Exception as e:
3037
# Avoid breaking storage due to event emission issues
31-
logger.error(f"Failed to emit row_upserted event: {e}")
38+
logger.error(f"[EVENT_BUS_EMIT] Failed to emit row_upserted event for rollout_id {rollout_id}: {e}")
3239
pass
3340

3441
def read(self, rollout_id: Optional[str] = None) -> List["EvaluationRow"]:
Lines changed: 67 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
import os
13
import threading
24
import time
35
from typing import Any, Optional
@@ -16,17 +18,14 @@ def __init__(self, db_path: Optional[str] = None):
1618

1719
# Use the same database as the evaluation row store
1820
if db_path is None:
19-
import os
20-
2121
from eval_protocol.directory_utils import find_eval_protocol_dir
2222

2323
eval_protocol_dir = find_eval_protocol_dir()
2424
db_path = os.path.join(eval_protocol_dir, "logs.db")
2525

26-
self._db = SqliteEventBusDatabase(db_path)
26+
self._db: SqliteEventBusDatabase = SqliteEventBusDatabase(db_path)
2727
self._running = False
28-
self._listener_thread: Optional[threading.Thread] = None
29-
self._process_id = str(uuid4())
28+
self._process_id = str(os.getpid())
3029

3130
def emit(self, event_type: str, data: Any) -> None:
3231
"""Emit an event to all subscribers.
@@ -35,75 +34,93 @@ def emit(self, event_type: str, data: Any) -> None:
3534
event_type: Type of event (e.g., "log")
3635
data: Event data
3736
"""
37+
logger.debug(f"[CROSS_PROCESS_EMIT] Emitting event type: {event_type}")
38+
3839
# Call local listeners immediately
40+
logger.debug(f"[CROSS_PROCESS_EMIT] Calling {len(self._listeners)} local listeners")
3941
super().emit(event_type, data)
42+
logger.debug("[CROSS_PROCESS_EMIT] Completed local listener calls")
4043

4144
# Publish to cross-process subscribers
45+
logger.debug("[CROSS_PROCESS_EMIT] Publishing to cross-process subscribers")
4246
self._publish_cross_process(event_type, data)
47+
logger.debug("[CROSS_PROCESS_EMIT] Completed cross-process publish")
4348

4449
def _publish_cross_process(self, event_type: str, data: Any) -> None:
4550
"""Publish event to cross-process subscribers via database."""
46-
self._db.publish_event(event_type, data, self._process_id)
51+
logger.debug(f"[CROSS_PROCESS_PUBLISH] Publishing event {event_type} to database")
52+
try:
53+
self._db.publish_event(event_type, data, self._process_id)
54+
logger.debug(f"[CROSS_PROCESS_PUBLISH] Successfully published event {event_type} to database")
55+
except Exception as e:
56+
logger.error(f"[CROSS_PROCESS_PUBLISH] Failed to publish event {event_type} to database: {e}")
4757

4858
def start_listening(self) -> None:
4959
"""Start listening for cross-process events."""
5060
if self._running:
61+
logger.debug("[CROSS_PROCESS_LISTEN] Already listening, skipping start")
5162
return
5263

64+
logger.debug("[CROSS_PROCESS_LISTEN] Starting cross-process event listening")
5365
self._running = True
54-
self._start_database_listener()
66+
loop = asyncio.get_running_loop()
67+
loop.create_task(self._database_listener_task())
68+
logger.debug("[CROSS_PROCESS_LISTEN] Started async database listener task")
5569

5670
def stop_listening(self) -> None:
5771
"""Stop listening for cross-process events."""
72+
logger.debug("[CROSS_PROCESS_LISTEN] Stopping cross-process event listening")
5873
self._running = False
59-
if self._listener_thread and self._listener_thread.is_alive():
60-
self._listener_thread.join(timeout=1)
61-
62-
def _start_database_listener(self) -> None:
63-
"""Start database-based event listener."""
64-
65-
def database_listener():
66-
last_cleanup = time.time()
67-
68-
while self._running:
69-
try:
70-
# Get unprocessed events from other processes
71-
events = self._db.get_unprocessed_events(self._process_id)
72-
73-
for event in events:
74-
if not self._running:
75-
break
76-
77-
try:
78-
# Handle the event
79-
self._handle_cross_process_event(event["event_type"], event["data"])
8074

81-
# Mark as processed
82-
self._db.mark_event_processed(event["event_id"])
83-
84-
except Exception as e:
85-
logger.debug(f"Failed to process event {event['event_id']}: {e}")
86-
87-
# Clean up old events every hour
88-
current_time = time.time()
89-
if current_time - last_cleanup >= 3600:
90-
self._db.cleanup_old_events()
91-
last_cleanup = current_time
92-
93-
# Small sleep to prevent busy waiting
94-
time.sleep(0.1)
95-
96-
except Exception as e:
97-
logger.debug(f"Database listener error: {e}")
98-
time.sleep(1)
75+
async def _database_listener_task(self) -> None:
76+
"""Single database listener task that processes events and recreates itself."""
77+
if not self._running:
78+
# this should end the task loop
79+
logger.debug("[CROSS_PROCESS_LISTENER] Stopping database listener task")
80+
return
9981

100-
self._listener_thread = threading.Thread(target=database_listener, daemon=True)
101-
self._listener_thread.start()
82+
# Get unprocessed events from other processes
83+
events = self._db.get_unprocessed_events(str(self._process_id))
84+
if events:
85+
logger.debug(f"[CROSS_PROCESS_LISTENER] Found {len(events)} unprocessed events")
86+
else:
87+
logger.debug(f"[CROSS_PROCESS_LISTENER] No unprocessed events found for process {self._process_id}")
88+
89+
for event in events:
90+
logger.debug(
91+
f"[CROSS_PROCESS_LISTENER] Processing event {event['event_id']} of type {event['event_type']}"
92+
)
93+
# Handle the event
94+
self._handle_cross_process_event(event["event_type"], event["data"])
95+
logger.debug(f"[CROSS_PROCESS_LISTENER] Successfully processed event {event['event_id']}")
96+
97+
# Mark as processed
98+
self._db.mark_event_processed(event["event_id"])
99+
logger.debug(f"[CROSS_PROCESS_LISTENER] Marked event {event['event_id']} as processed")
100+
101+
# Clean up old events every hour
102+
current_time = time.time()
103+
if not hasattr(self, "_last_cleanup"):
104+
self._last_cleanup = current_time
105+
elif current_time - self._last_cleanup >= 3600:
106+
logger.debug("[CROSS_PROCESS_LISTENER] Cleaning up old events")
107+
self._db.cleanup_old_events()
108+
self._last_cleanup = current_time
109+
110+
# Schedule the next task if still running
111+
await asyncio.sleep(1.0)
112+
loop = asyncio.get_running_loop()
113+
loop.create_task(self._database_listener_task())
102114

103115
def _handle_cross_process_event(self, event_type: str, data: Any) -> None:
104116
"""Handle events received from other processes."""
105-
for listener in self._listeners:
117+
logger.debug(f"[CROSS_PROCESS_HANDLE] Handling cross-process event type: {event_type}")
118+
logger.debug(f"[CROSS_PROCESS_HANDLE] Calling {len(self._listeners)} listeners")
119+
120+
for i, listener in enumerate(self._listeners):
106121
try:
122+
logger.debug(f"[CROSS_PROCESS_HANDLE] Calling listener {i}")
107123
listener(event_type, data)
124+
logger.debug(f"[CROSS_PROCESS_HANDLE] Successfully called listener {i}")
108125
except Exception as e:
109-
logger.debug(f"Cross-process event listener failed for {event_type}: {e}")
126+
logger.debug(f"[CROSS_PROCESS_HANDLE] Cross-process event listener {i} failed for {event_type}: {e}")

eval_protocol/event_bus/sqlite_event_bus_database.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Any, List
33
from uuid import uuid4
44

5-
from peewee import CharField, DateTimeField, Model, SqliteDatabase
5+
from peewee import BooleanField, CharField, DateTimeField, Model, SqliteDatabase
66
from playhouse.sqlite_ext import JSONField
77

88
from eval_protocol.event_bus.logger import logger
@@ -25,7 +25,7 @@ class Event(BaseModel): # type: ignore
2525
data = JSONField()
2626
timestamp = DateTimeField()
2727
process_id = CharField()
28-
processed = CharField(default="false") # Track if event has been processed
28+
processed = BooleanField(default=False) # Track if event has been processed
2929

3030
self._Event = Event
3131
self._db.connect()
@@ -46,7 +46,7 @@ def publish_event(self, event_type: str, data: Any, process_id: str) -> None:
4646
data=serialized_data,
4747
timestamp=time.time(),
4848
process_id=process_id,
49-
processed="false",
49+
processed=False,
5050
)
5151
except Exception as e:
5252
logger.warning(f"Failed to publish event to database: {e}")
@@ -56,7 +56,7 @@ def get_unprocessed_events(self, process_id: str) -> List[dict]:
5656
try:
5757
query = (
5858
self._Event.select()
59-
.where((self._Event.process_id != process_id) & (self._Event.processed == "false"))
59+
.where((self._Event.process_id != process_id) & (~self._Event.processed))
6060
.order_by(self._Event.timestamp)
6161
)
6262

@@ -80,16 +80,14 @@ def get_unprocessed_events(self, process_id: str) -> List[dict]:
8080
def mark_event_processed(self, event_id: str) -> None:
8181
"""Mark an event as processed."""
8282
try:
83-
self._Event.update(processed="true").where(self._Event.event_id == event_id).execute()
83+
self._Event.update(processed=True).where(self._Event.event_id == event_id).execute()
8484
except Exception as e:
8585
logger.debug(f"Failed to mark event as processed: {e}")
8686

8787
def cleanup_old_events(self, max_age_hours: int = 24) -> None:
8888
"""Clean up old processed events."""
8989
try:
9090
cutoff_time = time.time() - (max_age_hours * 3600)
91-
self._Event.delete().where(
92-
(self._Event.processed == "true") & (self._Event.timestamp < cutoff_time)
93-
).execute()
91+
self._Event.delete().where((self._Event.processed) & (self._Event.timestamp < cutoff_time)).execute()
9492
except Exception as e:
9593
logger.debug(f"Failed to cleanup old events: {e}")

eval_protocol/log_utils/elasticsearch_client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,25 @@ def delete_index(self) -> bool:
100100
except Exception:
101101
return False
102102

103+
def clear_index(self) -> bool:
104+
"""Clear all documents from the index.
105+
106+
Returns:
107+
bool: True if successful, False otherwise
108+
"""
109+
try:
110+
# Delete all documents by query
111+
response = self._make_request(
112+
"POST", f"{self.index_url}/_delete_by_query", json_data={"query": {"match_all": {}}}
113+
)
114+
if response.status_code == 200:
115+
# Refresh the index to ensure changes are visible
116+
refresh_response = self._make_request("POST", f"{self.index_url}/_refresh")
117+
return refresh_response.status_code == 200
118+
return False
119+
except Exception:
120+
return False
121+
103122
def get_mapping(self) -> Optional[Dict[str, Any]]:
104123
"""Get the index mapping.
105124

eval_protocol/log_utils/elasticsearch_direct_http_handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
from concurrent.futures import ThreadPoolExecutor
44
from typing import Optional, Any, Dict
5-
from datetime import datetime
5+
from datetime import datetime, timezone
66

77
from eval_protocol.types.remote_rollout_processor import ElasticsearchConfig
88
from .elasticsearch_client import ElasticsearchClient
@@ -36,8 +36,8 @@ def configure(self, elasticsearch_config: ElasticsearchConfig) -> None:
3636
def emit(self, record: logging.LogRecord) -> None:
3737
"""Emit a log record by scheduling it for async transmission."""
3838
try:
39-
# Create proper ISO 8601 timestamp
40-
timestamp = datetime.fromtimestamp(record.created).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
39+
# Create proper ISO 8601 timestamp in UTC
40+
timestamp = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
4141

4242
rollout_id = self._get_rollout_id(record)
4343
logger.debug(f"Emitting log record: {record.getMessage()} with rollout_id: {rollout_id}")

0 commit comments

Comments
 (0)