Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/alerts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .rule_engine import (
create_alert_rule,
list_alert_rules,
delete_alert_rule,
get_alert_rule
)
from .alert_scheduler import (
start_monitoring,
stop_monitoring,
check_alerts_once
)
86 changes: 86 additions & 0 deletions core/alerts/alert_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import time
import threading
from datetime import datetime
from typing import List, Dict

from .rule_engine import list_alert_rules, delete_alert_rule
from .notification_handler import send_alert_notification
from ..compute import list_vms, monitor_vm
from ..bigquery import get_bigquery_usage_by_day_user

_monitoring_active = False
_monitoring_thread = None

def start_monitoring(check_interval_seconds: int = 60) -> None:
"""Start background monitoring"""
global _monitoring_active, _monitoring_thread

if _monitoring_active:
return

_monitoring_active = True
_monitoring_thread = threading.Thread(target=_monitoring_loop, args=(check_interval_seconds,), daemon=True)
_monitoring_thread.start()

def stop_monitoring() -> None:
"""Stop background monitoring"""
global _monitoring_active
_monitoring_active = False

def check_alerts_once() -> List[Dict]:
"""Check all rules once"""
triggered_alerts = []
rules = list_alert_rules()

for rule in rules:
if rule.get("enabled", True):
alert = _evaluate_rule(rule)
if alert:
triggered_alerts.append(alert)
send_alert_notification(alert)
delete_alert_rule(rule["name"])

return triggered_alerts

def _monitoring_loop(check_interval_seconds: int) -> None:
"""Background monitoring loop"""
while _monitoring_active:
try:
check_alerts_once()
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(check_interval_seconds)

def _evaluate_rule(rule: Dict) -> Dict:
"""Check if rule threshold is exceeded"""
current_value = _get_metric_value(rule["resource_type"], rule["metric"])

if current_value and current_value > rule["threshold"]:
return {
"rule_name": rule["name"],
"resource_type": rule["resource_type"],
"metric": rule["metric"],
"current_value": current_value,
"threshold": rule["threshold"],
"triggered_at": datetime.now().isoformat()
}
return None

def _get_metric_value(resource_type: str, metric: str):
"""Get current metric value (simplified demo version)"""
try:
if resource_type == "vm" and metric == "cpu_utilization":
# For the sake of our demo, you can just replace this with a specific cpu_utilization value
vms = list_vms(os.environ['GCP_PROJECT_NUMBER'], os.environ['GCP_ZONE'])
if vms:
cpu_data = monitor_vm(vms[0]["selfLink"])
if cpu_data:
vm_name = list(cpu_data.keys())[0]
cpu_values = cpu_data[vm_name]["value"]
return cpu_values[-1] * 100 if cpu_values else None # Return last value in percentage

except Exception as e:
print(f"Error getting {resource_type}.{metric}: {e}")

return None
45 changes: 45 additions & 0 deletions core/alerts/alert_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import os
from typing import List, Dict

ALERT_RULES_FILE = "alert_rules.json"

def load_rules() -> List[Dict]:
"""Load all alert rules from file"""
if not os.path.exists(ALERT_RULES_FILE):
return []

try:
with open(ALERT_RULES_FILE, 'r') as f:
return json.load(f)
except:
return []

def save_rules(rules: List[Dict]) -> None:
"""Save all rules to file"""
with open(ALERT_RULES_FILE, 'w') as f:
json.dump(rules, f, indent=2)

def save_rule(rule: Dict) -> None:
"""Save a single alert rule"""
rules = load_rules()

for i, existing in enumerate(rules):
if existing["name"] == rule["name"]:
rules[i] = rule
break
else:
rules.append(rule)

save_rules(rules)

def delete_rule(rule_name: str) -> bool:
"""Delete a rule by name"""
rules = load_rules()
new_rules = []
for r in rules:
if r["name"] != rule_name:
new_rules.append(r)
rules = new_rules
save_rules(rules)
return True
19 changes: 19 additions & 0 deletions core/alerts/notification_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Simple notification handler for alerts"""

from datetime import datetime
from typing import Dict

def send_alert_notification(alert: Dict) -> None:
"""Send alert notification (console only for demo)"""
message = format_alert_message(alert)
print("\n" + "="*40)
print("🚨 ALERT TRIGGERED 🚨")
print(f"Rule: {alert['rule_name']}")
print(f"Current: {alert['current_value']} | Threshold: {alert['threshold']}")
print(f"Time: {datetime.now().strftime('%H:%M:%S')}")
print("="*40)
print("User :> ", end="", flush=True)

def format_alert_message(alert: Dict) -> str:
"""Format alert into readable message"""
return f"Alert: {alert['rule_name']} - {alert['metric']} is {alert['current_value']}, exceeds {alert['threshold']}"
31 changes: 31 additions & 0 deletions core/alerts/rule_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime
from typing import Dict, List
from .alert_storage import save_rule, load_rules, delete_rule

def create_alert_rule(rule_name: str, resource_type: str, metric: str, threshold: float) -> dict:
"""Create a simple alert rule"""
rule = {
"name": rule_name,
"resource_type": resource_type,
"metric": metric,
"threshold": threshold,
"created": datetime.now().isoformat(),
"enabled": True
}
save_rule(rule)
return rule

def list_alert_rules() -> List[dict]:
"""Get all alert rules"""
return load_rules()

def delete_alert_rule(rule_name: str) -> bool:
"""Delete an alert rule by name"""
return delete_rule(rule_name)

def get_alert_rule(rule_name: str) -> dict:
"""Get a specific rule by name"""
for rule in load_rules():
if rule["name"] == rule_name:
return rule
return None
12 changes: 10 additions & 2 deletions core/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
monitor_vm
)
from .service_accounts import list_custom_service_accounts
from .alerts import (
create_alert_rule,
list_alert_rules,
delete_alert_rule
)


def create_bot():
Expand All @@ -34,12 +39,13 @@ def create_bot():
list_datasets, get_bigquery_usage_by_user, get_bytes_loaded_to_dataset,
get_bigquery_usage_by_day_user,
list_cloud_run_jobs, get_job_executions, get_cloud_run_job_execution_logs,
list_custom_service_accounts
list_custom_service_accounts,
create_alert_rule, list_alert_rules, delete_alert_rule,
]

instruction = f"""
You are a helpful, knowledgeable, and tool-aware assistant integrated with an organization's Google Cloud Platform (GCP) environment.
Your primary responsibility is to assist a DevOps executive in monitoring and managing cloud infrastructure — especially virtual machines (VMs) and the BigQuery data warehouse.
Your primary responsibility is to assist a DevOps executive in monitoring and managing cloud infrastructure — especially virtual machines (VMs), BigQuery data warehouse, and **proactive alerting**.

You can access and invoke specific tools connected to the GCP account. When a user asks a question, you will:

Expand Down Expand Up @@ -116,6 +122,8 @@ def create_bot():
For table output, ensure the columns are aligned using spaces for readability. Use fixed-width formatting like in GitHub-flavored Markdown.
For list output, use a numbered list format.
For tool failures, provide a clear error message and suggest possible next steps.

**Important**: When background monitoring is started, it will continuously check alert rules and send notifications to the console when thresholds are exceeded.
"""

client = genai.Client(api_key=os.environ["GENAI_API_KEY"])
Expand Down
9 changes: 9 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
A QnA interface for monitoring GCP environments using Google Gemini API
"""
import sys
import os

from core.bot import create_bot
from core.utils.env_utils import load_environment_variables
from core.alerts.alert_scheduler import start_monitoring, stop_monitoring


def main():
Expand All @@ -18,6 +20,7 @@ def main():
load_environment_variables()

chat = create_bot()
start_monitoring()

print("GCP Monitoring Bot started. Type 'q', 'quit', or 'exit' to stop.")
print("-" * 50)
Expand All @@ -32,6 +35,12 @@ def main():
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1

finally:
stop_monitoring()
# Hard coded file name for now
if os.path.exists("alert_rules.json"):
os.remove("alert_rules.json")

return 0

Expand Down