Skip to content

Commit 1283ce2

Browse files
authored
ElasticSearch logging in RemoteRolloutProcessor (part 2) (#242)
* save * Enhance ElasticsearchIndexManager and tests to include rollout_id field - Updated _has_correct_timestamp_mapping method to check for rollout_id as a keyword field in Elasticsearch mappings. - Added rollout_id fixture to tests for setting up environment variable during test execution. - Modified test functions to include rollout_id in log messages and assertions, ensuring proper indexing and retrieval from Elasticsearch. - Implemented new tests to verify the presence and correctness of rollout_id in logged messages. * using tracing.fireworks.ai works * Added Status model import to eval_protocol and included it in the __all__ exports for better accessibility. * Enhance Elasticsearch logging to include status information - Added a method to extract status information from log records in ElasticSearchDirectHttpHandler. - Updated the data structure sent to Elasticsearch to include status_code, status_message, and status_details if present. - Modified ElasticsearchIndexManager to validate the mapping of new status fields. - Implemented tests to verify logging of status information and searching by status code in Elasticsearch. * Refactor Elasticsearch integration to use dedicated client - Replaced direct HTTP requests with an ElasticsearchClient for handling interactions with Elasticsearch in ElasticSearchDirectHttpHandler and ElasticsearchIndexManager. - Updated configuration handling to encapsulate Elasticsearch settings within a single config object. - Simplified index management and document indexing processes, improving code maintainability and readability. - Adjusted tests to utilize the new client for searching and verifying log entries, enhancing test reliability. * rename to match official capitalization * Refactor Elasticsearch configuration handling - Updated Elasticsearch integration to use ElasticSearchConfig from the types module, improving consistency across the codebase. - Removed the local ElasticsearchConfig dataclass in favor of the new model, streamlining configuration management. - Adjusted related classes and tests to utilize the updated configuration structure, enhancing maintainability and readability. * Refactor Elasticsearch configuration naming for consistency - Updated all instances of ElasticSearchConfig to ElasticsearchConfig across the codebase to align with official naming conventions. - Ensured that related classes and methods reflect this change, enhancing code clarity and maintainability.
1 parent 003a140 commit 1283ce2

File tree

10 files changed

+722
-166
lines changed

10 files changed

+722
-166
lines changed

eval_protocol/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
)
2525
from .data_loader import DynamicDataLoader, InlineDataLoader
2626
from . import mcp, rewards
27-
from .models import EvaluateResult, Message, MetricResult, EvaluationRow, InputMetadata
27+
from .models import EvaluateResult, Message, MetricResult, EvaluationRow, InputMetadata, Status
2828
from .playback_policy import PlaybackPolicyBase
2929
from .resources import create_llm_resource
3030
from .reward_function import RewardFunction
@@ -63,6 +63,7 @@
6363
warnings.filterwarnings("default", category=DeprecationWarning, module="eval_protocol")
6464

6565
__all__ = [
66+
"Status",
6667
"RemoteRolloutProcessor",
6768
"InputMetadata",
6869
"EvaluationRow",
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
"""
2+
Centralized Elasticsearch client for all Elasticsearch API operations.
3+
4+
This module provides a unified interface for all Elasticsearch operations
5+
used throughout the codebase, including index management, document operations,
6+
and search functionality.
7+
"""
8+
9+
import json
10+
import requests
11+
from typing import Any, Dict, List, Optional, Union
12+
from urllib.parse import urlparse
13+
from eval_protocol.types.remote_rollout_processor import ElasticsearchConfig
14+
15+
16+
class ElasticsearchClient:
17+
"""Centralized client for all Elasticsearch operations."""
18+
19+
def __init__(self, config: ElasticsearchConfig):
20+
"""Initialize the Elasticsearch client.
21+
22+
Args:
23+
config: Elasticsearch configuration
24+
"""
25+
self.config = config
26+
self.base_url = config.url.rstrip("/")
27+
self.index_url = f"{self.base_url}/{config.index_name}"
28+
self._headers = {"Content-Type": "application/json", "Authorization": f"ApiKey {config.api_key}"}
29+
30+
def _make_request(
31+
self,
32+
method: str,
33+
url: str,
34+
json_data: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
35+
params: Optional[Dict[str, Any]] = None,
36+
timeout: int = 30,
37+
) -> requests.Response:
38+
"""Make an HTTP request to Elasticsearch.
39+
40+
Args:
41+
method: HTTP method (GET, POST, PUT, DELETE, HEAD)
42+
url: Full URL for the request
43+
json_data: JSON data to send in request body
44+
params: Query parameters
45+
timeout: Request timeout in seconds
46+
47+
Returns:
48+
requests.Response object
49+
50+
Raises:
51+
requests.RequestException: If the request fails
52+
"""
53+
return requests.request(
54+
method=method,
55+
url=url,
56+
headers=self._headers,
57+
json=json_data,
58+
params=params,
59+
verify=self.config.verify_ssl,
60+
timeout=timeout,
61+
)
62+
63+
# Index Management Operations
64+
65+
def create_index(self, mapping: Dict[str, Any]) -> bool:
66+
"""Create an index with the specified mapping.
67+
68+
Args:
69+
mapping: Index mapping configuration
70+
71+
Returns:
72+
bool: True if successful, False otherwise
73+
"""
74+
try:
75+
response = self._make_request("PUT", self.index_url, json_data=mapping)
76+
return response.status_code in [200, 201]
77+
except Exception:
78+
return False
79+
80+
def index_exists(self) -> bool:
81+
"""Check if the index exists.
82+
83+
Returns:
84+
bool: True if index exists, False otherwise
85+
"""
86+
try:
87+
response = self._make_request("HEAD", self.index_url)
88+
return response.status_code == 200
89+
except Exception:
90+
return False
91+
92+
def delete_index(self) -> bool:
93+
"""Delete the index.
94+
95+
Returns:
96+
bool: True if successful, False otherwise
97+
"""
98+
try:
99+
response = self._make_request("DELETE", self.index_url)
100+
return response.status_code in [200, 404] # 404 means index doesn't exist
101+
except Exception:
102+
return False
103+
104+
def get_mapping(self) -> Optional[Dict[str, Any]]:
105+
"""Get the index mapping.
106+
107+
Returns:
108+
Dict containing mapping data, or None if failed
109+
"""
110+
try:
111+
response = self._make_request("GET", f"{self.index_url}/_mapping")
112+
if response.status_code == 200:
113+
return response.json()
114+
return None
115+
except Exception:
116+
return None
117+
118+
def get_index_stats(self) -> Optional[Dict[str, Any]]:
119+
"""Get index statistics.
120+
121+
Returns:
122+
Dict containing index statistics, or None if failed
123+
"""
124+
try:
125+
response = self._make_request("GET", f"{self.index_url}/_stats")
126+
if response.status_code == 200:
127+
return response.json()
128+
return None
129+
except Exception:
130+
return None
131+
132+
# Document Operations
133+
134+
def index_document(self, document: Dict[str, Any], doc_id: Optional[str] = None) -> bool:
135+
"""Index a document.
136+
137+
Args:
138+
document: Document to index
139+
doc_id: Optional document ID
140+
141+
Returns:
142+
bool: True if successful, False otherwise
143+
"""
144+
try:
145+
if doc_id:
146+
url = f"{self.index_url}/_doc/{doc_id}"
147+
else:
148+
url = f"{self.index_url}/_doc"
149+
150+
response = self._make_request("POST", url, json_data=document)
151+
return response.status_code in [200, 201]
152+
except Exception:
153+
return False
154+
155+
def bulk_index_documents(self, documents: List[Dict[str, Any]]) -> bool:
156+
"""Bulk index multiple documents.
157+
158+
Args:
159+
documents: List of documents to index
160+
161+
Returns:
162+
bool: True if successful, False otherwise
163+
"""
164+
try:
165+
# Prepare bulk request body
166+
bulk_body = []
167+
for doc in documents:
168+
bulk_body.append({"index": {}})
169+
bulk_body.append(doc)
170+
171+
response = self._make_request("POST", f"{self.index_url}/_bulk", json_data=bulk_body)
172+
return response.status_code == 200
173+
except Exception:
174+
return False
175+
176+
# Search Operations
177+
178+
def search(
179+
self, query: Dict[str, Any], size: int = 10, from_: int = 0, sort: Optional[List[Dict[str, Any]]] = None
180+
) -> Optional[Dict[str, Any]]:
181+
"""Search documents in the index.
182+
183+
Args:
184+
query: Elasticsearch query
185+
size: Number of results to return
186+
from_: Starting offset
187+
sort: Sort specification
188+
189+
Returns:
190+
Dict containing search results, or None if failed
191+
"""
192+
try:
193+
search_body = {"query": query, "size": size, "from": from_}
194+
195+
if sort:
196+
search_body["sort"] = sort
197+
198+
response = self._make_request("POST", f"{self.index_url}/_search", json_data=search_body)
199+
200+
if response.status_code == 200:
201+
return response.json()
202+
return None
203+
except Exception:
204+
return None
205+
206+
def search_by_term(self, field: str, value: Any, size: int = 10) -> Optional[Dict[str, Any]]:
207+
"""Search documents by exact term match.
208+
209+
Args:
210+
field: Field name to search
211+
value: Value to match
212+
size: Number of results to return
213+
214+
Returns:
215+
Dict containing search results, or None if failed
216+
"""
217+
query = {"term": {field: value}}
218+
return self.search(query, size=size)
219+
220+
def search_by_match(self, field: str, value: str, size: int = 10) -> Optional[Dict[str, Any]]:
221+
"""Search documents by text match.
222+
223+
Args:
224+
field: Field name to search
225+
value: Text to match
226+
size: Number of results to return
227+
228+
Returns:
229+
Dict containing search results, or None if failed
230+
"""
231+
query = {"match": {field: value}}
232+
return self.search(query, size=size)
233+
234+
def search_by_match_phrase_prefix(self, field: str, value: str, size: int = 10) -> Optional[Dict[str, Any]]:
235+
"""Search documents by phrase prefix match.
236+
237+
Args:
238+
field: Field name to search
239+
value: Phrase prefix to match
240+
size: Number of results to return
241+
242+
Returns:
243+
Dict containing search results, or None if failed
244+
"""
245+
query = {"match_phrase_prefix": {field: value}}
246+
return self.search(query, size=size)
247+
248+
def search_all(self, size: int = 10) -> Optional[Dict[str, Any]]:
249+
"""Search all documents in the index.
250+
251+
Args:
252+
size: Number of results to return
253+
254+
Returns:
255+
Dict containing search results, or None if failed
256+
"""
257+
query = {"match_all": {}}
258+
return self.search(query, size=size)
259+
260+
# Health and Status Operations
261+
262+
def health_check(self) -> bool:
263+
"""Check if Elasticsearch is healthy.
264+
265+
Returns:
266+
bool: True if healthy, False otherwise
267+
"""
268+
try:
269+
response = self._make_request("GET", f"{self.base_url}/_cluster/health")
270+
return response.status_code == 200
271+
except Exception:
272+
return False
273+
274+
def get_cluster_info(self) -> Optional[Dict[str, Any]]:
275+
"""Get cluster information.
276+
277+
Returns:
278+
Dict containing cluster info, or None if failed
279+
"""
280+
try:
281+
response = self._make_request("GET", f"{self.base_url}/_cluster/health")
282+
if response.status_code == 200:
283+
return response.json()
284+
return None
285+
except Exception:
286+
return None

0 commit comments

Comments
 (0)