-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathproxy_script.py
More file actions
137 lines (116 loc) · 4.12 KB
/
proxy_script.py
File metadata and controls
137 lines (116 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import sqlite3
import json
from datetime import datetime
from mitmproxy import http
# Database setup
DB_PATH = "/proxy/logs/logs.db"
def setup_database():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
method TEXT,
url TEXT,
request_headers TEXT,
request_body TEXT,
response_status INTEGER,
response_headers TEXT,
response_body TEXT,
duration_ms REAL
)
''')
conn.commit()
conn.close()
def compact_streaming_response(raw_body: str) -> str:
"""Compact SSE streaming chunks into a single response."""
chunks = []
content_parts = []
metadata = {}
for line in raw_body.split('\n'):
line = line.strip()
if not line.startswith('data:'):
continue
data_str = line[5:].strip()
if data_str == '[DONE]':
continue
try:
chunk = json.loads(data_str)
chunks.append(chunk)
# Extract metadata from first chunk
if not metadata and 'id' in chunk:
metadata = {
'id': chunk.get('id'),
'model': chunk.get('model'),
'created': chunk.get('created'),
'object': 'chat.completion'
}
# Extract content from choices
for choice in chunk.get('choices', []):
delta = choice.get('delta', {})
if 'content' in delta and delta['content']:
content_parts.append(delta['content'])
if 'finish_reason' in choice and choice['finish_reason']:
metadata['finish_reason'] = choice['finish_reason']
except json.JSONDecodeError:
continue
if not chunks:
return raw_body
# Build compacted response
compacted = {
**metadata,
'choices': [{
'index': 0,
'message': {
'role': 'assistant',
'content': ''.join(content_parts)
},
'finish_reason': metadata.get('finish_reason', 'stop')
}],
'_streaming': True,
'_chunk_count': len(chunks)
}
return json.dumps(compacted, indent=2)
setup_database()
class APILogger:
def __init__(self):
self.flows = {}
def request(self, flow: http.HTTPFlow) -> None:
# In reverse proxy mode, all traffic is Mistral API traffic
self.flows[flow] = datetime.now()
def response(self, flow: http.HTTPFlow) -> None:
if flow not in self.flows:
return
start_time = self.flows[flow]
duration = (datetime.now() - start_time).total_seconds() * 1000
# Extract request data
request_body = flow.request.content.decode('utf-8', errors='replace') if flow.request.content else ""
response_body = flow.response.content.decode('utf-8', errors='replace') if flow.response.content else ""
# Compact streaming responses
content_type = flow.response.headers.get('content-type', '')
if 'text/event-stream' in content_type or response_body.startswith('data:'):
response_body = compact_streaming_response(response_body)
# Store in database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO api_logs (
timestamp, method, url, request_headers, request_body,
response_status, response_headers, response_body, duration_ms
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
start_time.isoformat(),
flow.request.method,
flow.request.pretty_url,
json.dumps(dict(flow.request.headers)),
request_body,
flow.response.status_code,
json.dumps(dict(flow.response.headers)),
response_body,
duration
))
conn.commit()
conn.close()
del self.flows[flow]
addons = [APILogger()]