-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
134 lines (110 loc) · 6.8 KB
/
Copy pathworker.py
File metadata and controls
134 lines (110 loc) · 6.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import asyncio
import logging
from services.kubernetes import get_cluster_status, restart_pod
from services.ai import analyze_cluster, explain_healing
from services.slack import send_slack_alert, send_heal_notification
from services.history import add_to_history
from models.schemas import HealingAction
from datetime import datetime
from services.metrics import ALERTS_SENT, AI_ANALYSES, K8S_CONNECTION
from telemetry import get_tracer
from opentelemetry import trace
# Configuration
POLL_INTERVAL = 60 # seconds
AUTO_HEAL_ENABLED = False # Toggle this via API
ALREADY_ALERTED = set()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("OpsAgentWorker")
async def monitoring_loop():
logger.info("🚀 OpsAgent Monitoring Worker started.")
tracer = get_tracer()
while True:
# Start a main span for this polling iteration
with tracer.start_as_current_span("opsagent.worker.poll") as span:
try:
status = get_cluster_status()
# Update connection metric
K8S_CONNECTION.set(1)
# Set cluster health details on the main span
span.set_attribute("k8s.cluster.total_pods", status.total_pods)
span.set_attribute("k8s.cluster.running_pods", status.running)
span.set_attribute("k8s.cluster.pending_pods", status.pending)
span.set_attribute("k8s.cluster.failed_pods", status.failed)
failed_pods = [p for p in status.pods if p.status == "Failed" or p.restarts > 5]
span.set_attribute("k8s.failed_pods.count", len(failed_pods))
# Reset alerts for pods that are now healthy
current_pod_names = {p.name for p in status.pods}
for pod in list(ALREADY_ALERTED):
if pod not in current_pod_names:
ALREADY_ALERTED.remove(pod)
for pod in failed_pods:
if pod.name not in ALREADY_ALERTED:
logger.warning(f"🚨 Issue detected: {pod.name} in {pod.namespace}")
# Start a child span to handle this specific pod failure
with tracer.start_as_current_span("opsagent.worker.handle_failure") as fail_span:
fail_span.set_attribute("k8s.pod.name", pod.name)
fail_span.set_attribute("k8s.pod.namespace", pod.namespace)
fail_span.set_attribute("k8s.pod.status", pod.status)
fail_span.set_attribute("k8s.pod.restarts", pod.restarts)
# Get AI Analysis for context (wrapped in child span)
with tracer.start_as_current_span("opsagent.ai.analyze_cluster") as ai_span:
analysis = analyze_cluster(status)
ai_span.set_attribute("ai.analysis", analysis.analysis)
# Track metrics
AI_ANALYSES.inc()
ALERTS_SENT.labels(status=pod.status, namespace=pod.namespace).inc()
with tracer.start_as_current_span("opsagent.slack.send_alert"):
await send_slack_alert(
pod.name,
pod.namespace,
pod.status,
pod.restarts,
analysis.analysis
)
ALREADY_ALERTED.add(pod.name)
# Auto-Heal if enabled
if AUTO_HEAL_ENABLED:
logger.info(f"⚡ Auto-Healing {pod.name}...")
reason = (
f"Status: {pod.status}, Restarts: {pod.restarts}"
)
with tracer.start_as_current_span("opsagent.worker.auto_heal") as heal_span:
heal_span.set_attribute("k8s.pod.name", pod.name)
heal_span.set_attribute("k8s.pod.namespace", pod.namespace)
with tracer.start_as_current_span("opsagent.ai.explain_healing") as explain_span:
explanation = explain_healing(pod.name, reason)
explain_span.set_attribute("ai.explanation", explanation)
with tracer.start_as_current_span("opsagent.k8s.restart_pod") as restart_span:
success = restart_pod(pod.name, pod.namespace)
restart_span.set_attribute("k8s.action.success", success)
if success:
action = HealingAction(
pod_name=pod.name,
namespace=pod.namespace,
action="restart",
reason=explanation,
timestamp=datetime.now(),
success=True
)
add_to_history(action)
with tracer.start_as_current_span("opsagent.slack.send_heal_notification"):
await send_heal_notification(pod.name, "Restarted Pod", explanation)
logger.info(f"✅ Successfully healed {pod.name}")
else:
logger.error(f"❌ Failed to heal {pod.name}")
heal_span.set_status(trace.StatusCode.ERROR, "Failed to restart pod")
except Exception as e:
# Mark as disconnected in metrics
K8S_CONNECTION.set(0)
logger.error(f"Worker Error: {e}")
span.record_exception(e)
span.set_status(trace.StatusCode.ERROR, str(e))
await asyncio.sleep(POLL_INTERVAL)
def start_worker():
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(monitoring_loop())
else:
asyncio.run(monitoring_loop())
if __name__ == "__main__":
asyncio.run(monitoring_loop())