Skip to content

Commit 3dc172c

Browse files
authored
ElasticSearch logging in RemoteRolloutProcessor (part 1) (#240)
* ran "curl -fsSL https://elastic.co/start-local | sh -s -- --esonly" - from https://github.com/elastic/start-local?tab=readme-ov-file#install-only-elasticsearch * actually don't check-in elastic-start-local * Add Elasticsearch setup and configuration to RemoteRolloutProcessor - Introduced setup method in RemoteRolloutProcessor to initialize Elasticsearch if not disabled. - Added ElasticSearchConfig model for managing Elasticsearch configuration. - Implemented logic to parse environment variables from a .env file and start Elasticsearch if necessary. - Updated evaluation_test to call rollout_processor.setup() for proper initialization. - Modified RolloutProcessor to include a setup method for potential overrides in subclasses. * Refactor Elasticsearch setup in RemoteRolloutProcessor - Removed the previous logic for parsing environment variables and starting Elasticsearch directly. - Introduced a dedicated ElasticsearchSetup module to handle Elasticsearch initialization. - Updated the _setup_elastic_search method to utilize the new setup module for improved clarity and maintainability. * Refactor Elasticsearch setup methods in ElasticsearchSetup class - Renamed and updated methods for clarity: _setup_local_elasticsearch to _setup_existing_docker_elasticsearch and _setup_remote_elasticsearch to _setup_initialized_docker_elasticsearch. - Improved comments to better describe the purpose of each setup method. - Enhanced the logic for initializing Elasticsearch with Docker, ensuring clearer handling of existing and new setups. * Add PID tracking to langfuse_row in RemoteRolloutProcessor - Introduced a new attribute 'pid' to langfuse_row to facilitate detection of stopped evaluations. - Updated comments to clarify the purpose of the new attribute in relation to logging status updates. * Add Elasticsearch logging and index management functionality - Introduced ElasticsearchDirectHttpHandler for asynchronous logging to Elasticsearch. - Added ElasticsearchIndexManager for managing index creation and mapping configuration. - Updated ElasticsearchSetup to create logging indices with proper mappings. - Enhanced ElasticSearchConfig model to include index_name attribute. - Implemented tests for ElasticsearchDirectHttpHandler to verify log transmission and sorting.
1 parent c09755b commit 3dc172c

File tree

9 files changed

+805
-1
lines changed

9 files changed

+805
-1
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import json
2+
import logging
3+
import asyncio
4+
import threading
5+
from concurrent.futures import ThreadPoolExecutor
6+
from typing import Optional, Tuple, Any, Dict
7+
from datetime import datetime
8+
from urllib.parse import urlparse
9+
import requests
10+
11+
from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig
12+
13+
14+
class ElasticsearchDirectHttpHandler(logging.Handler):
15+
def __init__(self, elasticsearch_config: ElasticSearchConfig) -> None:
16+
super().__init__()
17+
self.base_url: str = elasticsearch_config.url.rstrip("/")
18+
self.index_name: str = elasticsearch_config.index_name
19+
self.api_key: str = elasticsearch_config.api_key
20+
self.url: str = f"{self.base_url}/{self.index_name}/_doc"
21+
self.formatter: logging.Formatter = logging.Formatter()
22+
self._executor = None
23+
24+
# Parse URL to determine if we should verify SSL
25+
parsed_url = urlparse(elasticsearch_config.url)
26+
self.verify_ssl = parsed_url.scheme == "https"
27+
28+
def emit(self, record: logging.LogRecord) -> None:
29+
"""Emit a log record by scheduling it for async transmission."""
30+
try:
31+
# Create proper ISO 8601 timestamp
32+
timestamp = datetime.fromtimestamp(record.created).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
33+
34+
data: Dict[str, Any] = {
35+
"@timestamp": timestamp,
36+
"level": record.levelname,
37+
"message": record.getMessage(),
38+
"logger_name": record.name,
39+
# Add other relevant record attributes if needed
40+
}
41+
42+
# Schedule the HTTP request to run asynchronously
43+
self._schedule_async_send(data, record)
44+
except Exception as e:
45+
self.handleError(record)
46+
print(f"Error preparing log for Elasticsearch: {e}")
47+
48+
def _schedule_async_send(self, data: Dict[str, Any], record: logging.LogRecord) -> None:
49+
"""Schedule an async task to send the log data to Elasticsearch."""
50+
if self._executor is None:
51+
self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="elasticsearch-logger")
52+
53+
# Submit the HTTP request to the thread pool
54+
future = self._executor.submit(self._send_to_elasticsearch, data, record)
55+
56+
# Add error handling callback
57+
future.add_done_callback(lambda f: self._handle_async_result(f, record))
58+
59+
def _send_to_elasticsearch(self, data: Dict[str, Any], record: logging.LogRecord) -> None:
60+
"""Send data to Elasticsearch (runs in thread pool)."""
61+
try:
62+
response: requests.Response = requests.post(
63+
self.url,
64+
headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"},
65+
data=json.dumps(data),
66+
verify=self.verify_ssl, # If using HTTPS, verify SSL certificate
67+
)
68+
response.raise_for_status() # Raise an exception for HTTP errors
69+
except Exception as e:
70+
# Re-raise to be handled by the callback
71+
raise e
72+
73+
def _handle_async_result(self, future, record: logging.LogRecord) -> None:
74+
"""Handle the result of the async send operation."""
75+
try:
76+
future.result() # This will raise any exception that occurred
77+
except Exception as e:
78+
self.handleError(record)
79+
# You might want to log this error to a file or console
80+
# to prevent a logging loop.
81+
if hasattr(e, "response") and getattr(e, "response", None) is not None:
82+
print(f"Error sending log to Elasticsearch: {e}")
83+
print(f"Response content: {getattr(e, 'response').text}")
84+
else:
85+
print(f"Error sending log to Elasticsearch: {e}")
86+
87+
def close(self) -> None:
88+
"""Clean up resources when the handler is closed."""
89+
super().close()
90+
if self._executor:
91+
self._executor.shutdown(wait=True)
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import requests
2+
from typing import Dict, Any, Optional
3+
from urllib.parse import urlparse
4+
5+
6+
class ElasticsearchIndexManager:
7+
"""Manages Elasticsearch index creation and mapping configuration."""
8+
9+
def __init__(self, base_url: str, index_name: str, api_key: str) -> None:
10+
"""Initialize the Elasticsearch index manager.
11+
12+
Args:
13+
base_url: Elasticsearch base URL (e.g., "https://localhost:9200")
14+
index_name: Name of the index to manage
15+
api_key: API key for authentication
16+
"""
17+
self.base_url: str = base_url.rstrip("/")
18+
self.index_name: str = index_name
19+
self.api_key: str = api_key
20+
self.index_url: str = f"{self.base_url}/{self.index_name}"
21+
self._mapping_created: bool = False
22+
23+
# Parse URL to determine if we should verify SSL
24+
parsed_url = urlparse(base_url)
25+
self.verify_ssl = parsed_url.scheme == "https"
26+
27+
def create_logging_index_mapping(self) -> bool:
28+
"""Create index with proper mapping for logging data.
29+
30+
Returns:
31+
bool: True if mapping was created successfully, False otherwise.
32+
"""
33+
if self._mapping_created:
34+
return True
35+
36+
try:
37+
# Check if index exists and has correct mapping
38+
if self._index_exists_with_correct_mapping():
39+
self._mapping_created = True
40+
return True
41+
42+
# If index exists but has wrong mapping, delete and recreate it
43+
if self.index_exists():
44+
print(f"Warning: Index {self.index_name} exists with incorrect mapping. Deleting and recreating...")
45+
if not self.delete_index():
46+
print(f"Warning: Failed to delete existing index {self.index_name}")
47+
return False
48+
49+
# Create index with proper mapping
50+
mapping = self._get_logging_mapping()
51+
response = requests.put(
52+
self.index_url,
53+
headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"},
54+
json=mapping,
55+
verify=self.verify_ssl,
56+
)
57+
58+
if response.status_code in [200, 201]:
59+
self._mapping_created = True
60+
return True
61+
else:
62+
print(f"Warning: Failed to create index mapping: {response.status_code} - {response.text}")
63+
return False
64+
65+
except Exception as e:
66+
print(f"Warning: Failed to create index mapping: {e}")
67+
return False
68+
69+
def _index_exists_with_correct_mapping(self) -> bool:
70+
"""Check if index exists and has the correct @timestamp mapping.
71+
72+
Returns:
73+
bool: True if index exists with correct mapping, False otherwise.
74+
"""
75+
try:
76+
# Check if index exists
77+
response = requests.head(
78+
self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl
79+
)
80+
81+
if response.status_code != 200:
82+
return False
83+
84+
# Check if mapping is correct
85+
mapping_response = requests.get(
86+
f"{self.index_url}/_mapping",
87+
headers={"Authorization": f"ApiKey {self.api_key}"},
88+
verify=self.verify_ssl,
89+
)
90+
91+
if mapping_response.status_code != 200:
92+
return False
93+
94+
mapping_data = mapping_response.json()
95+
return self._has_correct_timestamp_mapping(mapping_data)
96+
97+
except Exception:
98+
return False
99+
100+
def _has_correct_timestamp_mapping(self, mapping_data: Dict[str, Any]) -> bool:
101+
"""Check if the mapping has @timestamp as a date field.
102+
103+
Args:
104+
mapping_data: Elasticsearch mapping response data
105+
106+
Returns:
107+
bool: True if @timestamp is correctly mapped as date field
108+
"""
109+
try:
110+
return (
111+
self.index_name in mapping_data
112+
and "mappings" in mapping_data[self.index_name]
113+
and "properties" in mapping_data[self.index_name]["mappings"]
114+
and "@timestamp" in mapping_data[self.index_name]["mappings"]["properties"]
115+
and mapping_data[self.index_name]["mappings"]["properties"]["@timestamp"].get("type") == "date"
116+
)
117+
except (KeyError, TypeError):
118+
return False
119+
120+
def _get_logging_mapping(self) -> Dict[str, Any]:
121+
"""Get the standard mapping for logging data.
122+
123+
Returns:
124+
Dict containing the index mapping configuration
125+
"""
126+
return {
127+
"mappings": {
128+
"properties": {
129+
"@timestamp": {"type": "date", "format": "strict_date_optional_time||epoch_millis"},
130+
"level": {"type": "keyword"},
131+
"message": {"type": "text"},
132+
"logger_name": {"type": "keyword"},
133+
}
134+
}
135+
}
136+
137+
def delete_index(self) -> bool:
138+
"""Delete the managed index.
139+
140+
Returns:
141+
bool: True if index was deleted successfully, False otherwise.
142+
"""
143+
try:
144+
response = requests.delete(
145+
self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl
146+
)
147+
if response.status_code in [200, 404]: # 404 means index doesn't exist, which is fine
148+
self._mapping_created = False
149+
return True
150+
else:
151+
print(f"Warning: Failed to delete index: {response.status_code} - {response.text}")
152+
return False
153+
except Exception as e:
154+
print(f"Warning: Failed to delete index: {e}")
155+
return False
156+
157+
def index_exists(self) -> bool:
158+
"""Check if the index exists.
159+
160+
Returns:
161+
bool: True if index exists, False otherwise.
162+
"""
163+
try:
164+
response = requests.head(
165+
self.index_url, headers={"Authorization": f"ApiKey {self.api_key}"}, verify=self.verify_ssl
166+
)
167+
return response.status_code == 200
168+
except Exception:
169+
return False
170+
171+
def get_index_stats(self) -> Optional[Dict[str, Any]]:
172+
"""Get statistics about the index.
173+
174+
Returns:
175+
Dict containing index statistics, or None if failed
176+
"""
177+
try:
178+
response = requests.get(
179+
f"{self.index_url}/_stats",
180+
headers={"Authorization": f"ApiKey {self.api_key}"},
181+
verify=self.verify_ssl,
182+
)
183+
if response.status_code == 200:
184+
return response.json()
185+
return None
186+
except Exception:
187+
return None

0 commit comments

Comments
 (0)