Skip to content

Commit 3700834

Browse files
authored
Fix onboarding flow (#35)
* don't just log and continue anymore—user should know * Add PEP 440 versioning support - Introduced a new module `get_pep440_version.py` to generate PEP 440 compliant version strings based on git information, caching results to minimize repeated calls. - Updated `EvalMetadata` in `models.py` to use the new versioning function for the version field, replacing the previous method of using commit hashes. - Removed dependency on `versioneer` in tests, streamlining version retrieval for evaluations. * Enhance default_single_turn_rollout_processor to log messages - Added logging functionality using `default_logger` to track processed messages in `default_single_turn_rollout_processor`. - Updated the return structure to include the modified row with messages instead of creating a new `EvaluationRow` instance. - Ensured dataset is returned as a list after processing all rows concurrently. * Add pytest as a dependency in pyproject.toml - Included `pytest>=6.0.0` in the main dependencies section to ensure compatibility with testing requirements. - Removed `pytest>=6.0.0` from the dev dependencies to streamline the development environment. * Remove unused imports in utils.py to clean up the codebase. * Add directory utility functions for finding and creating evaluation protocol directories - Introduced `find_eval_protocol_dir` and `find_eval_protocol_datasets_dir` functions to streamline the discovery and creation of the `.eval_protocol` and its `datasets` subdirectory. - Updated `LocalFSDatasetLoggerAdapter` to utilize these new utility functions, simplifying the initialization process for logging directories. * Add PID field to EvaluationRow model - Introduced a new optional field `pid` in the `EvaluationRow` model to store the process ID of the evaluation creator. This addition aids the evaluation watcher in detecting stopped evaluations. * Add 'stopped' status to evaluation protocol and update StatusIndicator component - Extended the `status` enum in `eval-protocol.ts` to include a new 'stopped' state, enhancing the evaluation status tracking. - Updated the `StatusIndicator` component to handle the new 'stopped' status, providing appropriate visual feedback with updated colors and text. * Update uv.lock to modify pytest dependency and revision number - Changed the revision number from 3 to 2. - Added `pytest` to the main dependencies section. - Removed `pytest` from the dev dependencies while retaining its version specification. * Ensure evaluation watcher is running at the start of evaluation tests * Add optional PID field to EvaluationRowSchema - Introduced a new optional field `pid` in the `EvaluationRowSchema` to store the process ID of the evaluation creator. This enhancement supports the evaluation watcher in detecting stopped evaluations, improving overall tracking and management of evaluation processes. * Enhance evaluation logging and error handling - Updated the `load_jsonl` function to include error handling for JSON parsing, logging the line number of any errors encountered. - Modified the `status` field in `EvalMetadata` to be optional, allowing for more flexible evaluation states. - Improved the `LocalFSDatasetLoggerAdapter` to check for existing rows across multiple JSONL files before appending new entries, ensuring no duplicates are logged. - Increased the `word_count` parameter in the `generate_id` function to 5 for more diverse ID generation. - Introduced a new `eval_watcher.py` script to monitor evaluation processes, updating their status if the associated process has terminated. * Refactor eval_watcher.py to use structured logging - Replaced print statements with structured logging using the `get_logger` utility for improved log management and consistency. - Enhanced error handling and status updates within the evaluation watcher, ensuring better tracking of evaluation processes and clearer output during execution. * Add logging utilities for eval_protocol package - Introduced a new module `logging_utils.py` to provide centralized logging configuration and utilities. - Implemented functions for setting up loggers, logging evaluation events, performance metrics, and errors with context. - Enhanced logging consistency across the package by utilizing structured logging practices. * Enhance LocalFSDatasetLoggerAdapter to prevent duplicate row IDs - Updated the `read` method to ensure that no duplicate row IDs are logged when reading from JSONL files in the datasets directory. This improvement enhances data integrity and consistency in the evaluation logging process. * Add singleton lock functionality for process management - Introduced a new module `singleton_lock.py` that implements file-based singleton lock management to ensure only one instance of a process can run at a time. - Added functions for acquiring, releasing, and checking the status of locks, along with mechanisms for handling stale locks. - Implemented tests in `test_singleton_lock.py` and `test_singleton_lock_multiprocessing.py` to validate the lock behavior under various scenarios, including concurrent access and cleanup of stale locks. * works * Enhance JSON line error handling in load_jsonl function - Added regex-based extraction of "row_id" to provide more context in error messages when JSON parsing fails. This improvement aids in debugging by including the problematic row ID in the raised ValueError. * works! * Refactor evaluation_test.py to ensure singleton watcher is initialized - Moved the call to `ensure_singleton_watcher()` into the `evaluation_test` function to ensure the evaluation watcher is running before processing begins. This change enhances the reliability of the evaluation process by ensuring the watcher is active during execution. * Update CI workflow to ignore test_eval_watcher.py in coverage reports - Added an ignore rule for `tests/test_eval_watcher.py` in the coverage command to streamline coverage reporting and focus on relevant tests. * Add signal handler to manage zombie processes in eval_watcher.py - Implemented a signal handler to automatically reap zombie child processes, preventing accumulation and potential resource leaks. - Enhanced process management by setting up the signal handler for SIGCHLD if available, ensuring better stability during evaluation execution. * move * Enhance singleton lock functionality and file locking in LocalFSDatasetLoggerAdapter - Updated `is_process_running` to include a timeout parameter, allowing for more flexible process monitoring. - Implemented file locking mechanisms in `LocalFSDatasetLoggerAdapter` to prevent race conditions during logging operations, ensuring data integrity when multiple processes access log files. - Added methods for acquiring and releasing file locks, improving the robustness of the logging process. * build * Add script alias for eval_protocol CLI in pyproject.toml * Fix import path for braintrust adapters in eval_protocol module * Update broadcast_file_update method to restrict broadcasting to .jsonl files only, preventing unnecessary updates for .lock files. * Fix broadcast_file_update logic to ensure only .jsonl files are broadcasted, preventing unnecessary updates for .lock files. * remove a bunch of stuff * remove ignore test that doesn't exist
1 parent 7b252d3 commit 3700834

21 files changed

+482
-117
lines changed

eval_protocol/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@
1010

1111
import warnings
1212

13-
from .adapters.braintrust import reward_fn_to_scorer, scorer_to_reward_fn
13+
from eval_protocol.adapters.braintrust import reward_fn_to_scorer, scorer_to_reward_fn
14+
1415
from .auth import get_fireworks_account_id, get_fireworks_api_key
1516
from .common_utils import load_jsonl
1617
from .config import RewardKitConfig, get_config, load_config
1718
from .mcp_env import (
1819
AnthropicPolicy,
19-
OpenAIPolicy,
20-
LiteLLMPolicy,
2120
FireworksPolicy,
21+
LiteLLMPolicy,
22+
OpenAIPolicy,
2223
make,
2324
rollout,
2425
test_mcp,

eval_protocol/common_utils.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import json
2-
import logging
2+
import re
33
from typing import Any, Dict, List
44

5-
logger = logging.getLogger(__name__)
6-
75

86
def load_jsonl(file_path: str) -> List[Dict[str, Any]]:
97
"""
@@ -14,23 +12,19 @@ def load_jsonl(file_path: str) -> List[Dict[str, Any]]:
1412
1513
Returns:
1614
A list of dictionaries, where each dictionary is a parsed JSON object from a line.
17-
Returns an empty list if the file is not found or if errors occur during parsing,
18-
with errors logged.
15+
Returns an empty list if the file is not found or if errors occur during parsing.
1916
"""
2017
data: List[Dict[str, Any]] = []
21-
try:
22-
with open(file_path, "r", encoding="utf-8") as f:
23-
for i, line in enumerate(f):
24-
try:
25-
data.append(json.loads(line.strip()))
26-
except json.JSONDecodeError as e:
27-
logger.error(f"Error decoding JSON on line {i+1} in {file_path}: {e} - Line: '{line.strip()}'")
28-
# Optionally, re-raise, or return partial data, or handle as per desired strictness
29-
# For now, we'll log and continue, returning successfully parsed lines.
30-
except FileNotFoundError:
31-
logger.error(f"File not found: {file_path}")
32-
return []
33-
except Exception as e:
34-
logger.error(f"An unexpected error occurred while reading {file_path}: {e}")
35-
return []
18+
with open(file_path, "r", encoding="utf-8") as f:
19+
for line_number, line in enumerate(f):
20+
try:
21+
data.append(json.loads(line.strip()))
22+
except json.JSONDecodeError as e:
23+
print(f"Error parsing JSON line for file {file_path} at line {line_number}")
24+
# attempt to find "row_id" in the line by finding index of "row_id" and performing regex of `"row_id": (.*),`
25+
row_id_index = line.find("row_id")
26+
if row_id_index != -1:
27+
row_id = re.search(r'"row_id": (.*),', line[row_id_index:])
28+
raise ValueError(f"{e.msg} at line {line_number}: {line} ({row_id})")
29+
raise e
3630
return data
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import os
2+
from typing import Optional
3+
4+
# Shared constants for directory discovery
5+
EVAL_PROTOCOL_DIR = ".eval_protocol"
6+
PYTHON_FILES = ["pyproject.toml", "requirements.txt"]
7+
DATASETS_DIR = "datasets"
8+
9+
10+
def find_eval_protocol_dir() -> str:
11+
"""
12+
Find the .eval_protocol directory by looking up the directory tree.
13+
14+
Returns:
15+
Path to the .eval_protocol directory
16+
"""
17+
# recursively look up for a .eval_protocol directory
18+
current_dir = os.path.dirname(os.path.abspath(__file__))
19+
while current_dir != "/":
20+
if os.path.exists(os.path.join(current_dir, EVAL_PROTOCOL_DIR)):
21+
log_dir = os.path.join(current_dir, EVAL_PROTOCOL_DIR)
22+
break
23+
current_dir = os.path.dirname(current_dir)
24+
else:
25+
# if not found, recursively look up until a pyproject.toml or requirements.txt is found
26+
current_dir = os.path.dirname(os.path.abspath(__file__))
27+
while current_dir != "/":
28+
if any(os.path.exists(os.path.join(current_dir, f)) for f in PYTHON_FILES):
29+
log_dir = os.path.join(current_dir, EVAL_PROTOCOL_DIR)
30+
break
31+
current_dir = os.path.dirname(current_dir)
32+
else:
33+
# get the PWD that this python process is running in
34+
log_dir = os.path.join(os.getcwd(), EVAL_PROTOCOL_DIR)
35+
36+
# create the .eval_protocol directory if it doesn't exist
37+
os.makedirs(log_dir, exist_ok=True)
38+
39+
return log_dir
40+
41+
42+
def find_eval_protocol_datasets_dir() -> str:
43+
"""
44+
Find the .eval_protocol/datasets directory by looking up the directory tree.
45+
46+
Returns:
47+
Path to the .eval_protocol/datasets directory
48+
"""
49+
log_dir = find_eval_protocol_dir()
50+
51+
# create the datasets subdirectory
52+
datasets_dir = os.path.join(log_dir, DATASETS_DIR)
53+
os.makedirs(datasets_dir, exist_ok=True)
54+
55+
return datasets_dir

eval_protocol/dataset_logger/local_fs_dataset_logger_adapter.py

Lines changed: 42 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,26 @@
1-
from datetime import datetime, timezone
21
import json
32
import os
4-
import tempfile
5-
import shutil
3+
import time
4+
from datetime import datetime, timezone
5+
from pathlib import Path
66
from typing import TYPE_CHECKING, List, Optional
7+
78
from eval_protocol.common_utils import load_jsonl
89
from eval_protocol.dataset_logger.dataset_logger import DatasetLogger
10+
from eval_protocol.dataset_logger.directory_utils import find_eval_protocol_datasets_dir
911

1012
if TYPE_CHECKING:
1113
from eval_protocol.models import EvaluationRow
1214

1315

1416
class LocalFSDatasetLoggerAdapter(DatasetLogger):
1517
"""
16-
Logger that stores logs in the local filesystem.
18+
Logger that stores logs in the local filesystem with file locking to prevent race conditions.
1719
"""
1820

19-
EVAL_PROTOCOL_DIR = ".eval_protocol"
20-
PYTHON_FILES = ["pyproject.toml", "requirements.txt"]
21-
DATASETS_DIR = "datasets"
22-
2321
def __init__(self):
24-
# recursively look up for a .eval_protocol directory
25-
current_dir = os.path.dirname(os.path.abspath(__file__))
26-
while current_dir != "/":
27-
if os.path.exists(os.path.join(current_dir, self.EVAL_PROTOCOL_DIR)):
28-
self.log_dir = os.path.join(current_dir, self.EVAL_PROTOCOL_DIR)
29-
break
30-
current_dir = os.path.dirname(current_dir)
31-
32-
# if not found, recursively look up until a pyproject.toml or requirements.txt is found
33-
current_dir = os.path.dirname(os.path.abspath(__file__))
34-
while current_dir != "/":
35-
if any(os.path.exists(os.path.join(current_dir, f)) for f in self.PYTHON_FILES):
36-
self.log_dir = os.path.join(current_dir, self.EVAL_PROTOCOL_DIR)
37-
break
38-
current_dir = os.path.dirname(current_dir)
39-
40-
# get the PWD that this python process is running in
41-
self.log_dir = os.path.join(os.getcwd(), self.EVAL_PROTOCOL_DIR)
42-
43-
# create the .eval_protocol directory if it doesn't exist
44-
os.makedirs(self.log_dir, exist_ok=True)
45-
46-
# create the datasets subdirectory
47-
self.datasets_dir = os.path.join(self.log_dir, self.DATASETS_DIR)
48-
os.makedirs(self.datasets_dir, exist_ok=True)
22+
self.log_dir = os.path.dirname(find_eval_protocol_datasets_dir())
23+
self.datasets_dir = find_eval_protocol_datasets_dir()
4924

5025
# ensure that log file exists
5126
if not os.path.exists(self.current_jsonl_path):
@@ -68,44 +43,53 @@ def log(self, row: "EvaluationRow") -> None:
6843
"""Log a row, updating existing row with same ID or appending new row."""
6944
row_id = row.input_metadata.row_id
7045

71-
# Check if row with this ID already exists
72-
if os.path.exists(self.current_jsonl_path):
73-
with open(self.current_jsonl_path, "r") as f:
74-
lines = f.readlines()
75-
76-
# Find the line with matching ID
77-
for i, line in enumerate(lines):
78-
try:
79-
line_data = json.loads(line.strip())
80-
if line_data["input_metadata"]["row_id"] == row_id:
81-
# Update existing row
82-
lines[i] = row.model_dump_json(exclude_none=True) + os.linesep
83-
with open(self.current_jsonl_path, "w") as f:
84-
f.writelines(lines)
85-
return
86-
except json.JSONDecodeError:
87-
continue
88-
89-
# If no existing row found, append new row
46+
# Check if row with this ID already exists in any JSONL file
47+
if os.path.exists(self.datasets_dir):
48+
for filename in os.listdir(self.datasets_dir):
49+
if filename.endswith(".jsonl"):
50+
file_path = os.path.join(self.datasets_dir, filename)
51+
if os.path.exists(file_path):
52+
with open(file_path, "r") as f:
53+
lines = f.readlines()
54+
55+
# Find the line with matching ID
56+
for i, line in enumerate(lines):
57+
try:
58+
line_data = json.loads(line.strip())
59+
if line_data["input_metadata"]["row_id"] == row_id:
60+
# Update existing row
61+
lines[i] = row.model_dump_json(exclude_none=True) + os.linesep
62+
with open(file_path, "w") as f:
63+
f.writelines(lines)
64+
return
65+
except json.JSONDecodeError:
66+
continue
67+
68+
# If no existing row found, append new row to current file
9069
with open(self.current_jsonl_path, "a") as f:
9170
f.write(row.model_dump_json(exclude_none=True) + os.linesep)
9271

9372
def read(self, row_id: Optional[str] = None) -> List["EvaluationRow"]:
94-
"""Read rows from all JSONL files in the datasets directory."""
73+
"""Read rows from all JSONL files in the datasets directory. Also
74+
ensures that there are no duplicate row IDs."""
9575
from eval_protocol.models import EvaluationRow
9676

9777
if not os.path.exists(self.datasets_dir):
9878
return []
9979

10080
all_rows = []
81+
existing_row_ids = set()
10182
for filename in os.listdir(self.datasets_dir):
10283
if filename.endswith(".jsonl"):
10384
file_path = os.path.join(self.datasets_dir, filename)
104-
try:
105-
data = load_jsonl(file_path)
106-
all_rows.extend([EvaluationRow(**r) for r in data])
107-
except Exception:
108-
continue # skip files that can't be read/parsed
85+
data = load_jsonl(file_path)
86+
for r in data:
87+
row = EvaluationRow(**r)
88+
if row.input_metadata.row_id not in existing_row_ids:
89+
existing_row_ids.add(row.input_metadata.row_id)
90+
else:
91+
raise ValueError(f"Duplicate Row ID {row.input_metadata.row_id} already exists")
92+
all_rows.append(row)
10993

11094
if row_id:
11195
# Filter by row_id if specified
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Cache for PEP 440 version string
2+
import subprocess
3+
4+
_version_cache = {"version": None, "base_version": None}
5+
6+
7+
def get_pep440_version(base_version=None):
8+
"""
9+
Generate a PEP 440 compliant version string based on git information.
10+
11+
This function is inspired by versioneer but doesn't require the full versioneer
12+
setup, making it easier for downstream users to adopt without additional dependencies.
13+
14+
The result is cached statically to avoid repeated git calls.
15+
16+
Args:
17+
base_version: The base version string (e.g., "1.0.0"). If None, will try to
18+
find the most recent version tag in git.
19+
20+
Returns:
21+
A PEP 440 compliant version string that includes:
22+
- Development release number (devN) based on commit count since base_version
23+
- Local version identifier with git commit hash
24+
- Dirty indicator if there are uncommitted changes
25+
26+
Examples:
27+
>>> get_pep440_version("1.0.0")
28+
"1.0.0.dev42+g1234567" # 42 commits since 1.0.0, commit hash 1234567
29+
>>> get_pep440_version("1.0.0") # with uncommitted changes
30+
"1.0.0.dev42+g1234567.dirty" # indicates dirty working directory
31+
>>> get_pep440_version("1.0.0") # no git available
32+
"1.0.0+unknown" # indicates git info not available
33+
"""
34+
# Check if we have a cached version for this base_version
35+
if _version_cache["version"] is not None and _version_cache["base_version"] == base_version:
36+
return _version_cache["version"]
37+
try:
38+
# Check if we're in a git repository
39+
subprocess.run(
40+
["git", "rev-parse", "--git-dir"],
41+
check=True,
42+
stdout=subprocess.PIPE,
43+
stderr=subprocess.PIPE,
44+
universal_newlines=True,
45+
)
46+
47+
# If base_version is None, try to find the most recent version tag
48+
if base_version is None:
49+
try:
50+
base_version = subprocess.check_output(
51+
["git", "describe", "--tags", "--abbrev=0"], universal_newlines=True, stderr=subprocess.DEVNULL
52+
).strip()
53+
except subprocess.CalledProcessError:
54+
# No tags found, we'll handle this case specially
55+
base_version = None
56+
57+
# Get commit count since base_version
58+
if base_version is None:
59+
# No base version (no tags), just count all commits
60+
count = subprocess.check_output(
61+
["git", "rev-list", "--count", "HEAD"], universal_newlines=True, stderr=subprocess.DEVNULL
62+
).strip()
63+
base_version = "0.0.0" # Use this for the final version string
64+
else:
65+
try:
66+
count = subprocess.check_output(
67+
["git", "rev-list", "--count", f"{base_version}..HEAD"],
68+
universal_newlines=True,
69+
stderr=subprocess.DEVNULL,
70+
).strip()
71+
# If no commits found, try counting from the beginning
72+
if count == "0" or not count:
73+
count = subprocess.check_output(
74+
["git", "rev-list", "--count", "HEAD"], universal_newlines=True, stderr=subprocess.DEVNULL
75+
).strip()
76+
except subprocess.CalledProcessError:
77+
# If base_version tag doesn't exist, count all commits
78+
count = subprocess.check_output(
79+
["git", "rev-list", "--count", "HEAD"], universal_newlines=True, stderr=subprocess.DEVNULL
80+
).strip()
81+
82+
# Get short commit hash
83+
commit_hash = subprocess.check_output(
84+
["git", "rev-parse", "--short", "HEAD"], universal_newlines=True, stderr=subprocess.DEVNULL
85+
).strip()
86+
87+
# Check for uncommitted changes (dirty working directory)
88+
try:
89+
subprocess.run(
90+
["git", "diff-index", "--quiet", "HEAD", "--"],
91+
check=True,
92+
stdout=subprocess.PIPE,
93+
stderr=subprocess.PIPE,
94+
)
95+
dirty_suffix = ""
96+
except subprocess.CalledProcessError:
97+
dirty_suffix = ".dirty"
98+
99+
# Ensure count is a valid integer
100+
try:
101+
dev_count = int(count)
102+
except (ValueError, TypeError):
103+
dev_count = 0
104+
105+
# Build PEP 440 compliant version string
106+
# Format: <base_version>.dev<count>+g<hash>[.dirty]
107+
version_parts = [base_version]
108+
109+
if dev_count > 0:
110+
version_parts.append(f".dev{dev_count}")
111+
112+
version_parts.append(f"+g{commit_hash}")
113+
114+
if dirty_suffix:
115+
version_parts.append(dirty_suffix)
116+
117+
result = "".join(version_parts)
118+
119+
# Cache the result
120+
_version_cache["version"] = result
121+
_version_cache["base_version"] = base_version
122+
123+
return result
124+
125+
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
126+
# Git is not available or not a git repository
127+
result = f"{base_version}+unknown"
128+
129+
# Cache the result
130+
_version_cache["version"] = result
131+
_version_cache["base_version"] = base_version
132+
133+
return result

0 commit comments

Comments
 (0)