Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 105 additions & 7 deletions app/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import json
import logging
import re
import time
from abc import ABC, abstractmethod

import requests

from .tz import utc_now
Expand All @@ -13,6 +13,24 @@

SEVERITY_ORDER = {"info": 0, "warning": 1, "critical": 2}

_DISCORD_URL_RE = re.compile(
r"https://(?:ptb\.|canary\.)?discord(?:app)?\.com/api(?:/v\d+)?/webhooks/\d+/[\w-]+(?:\?[\w=&]+)?\Z",
re.IGNORECASE,
)

_DISCORD_EMBED_TITLE_MAX = 256
_DISCORD_EMBED_DESC_MAX = 4096
_DISCORD_EMBED_FIELD_NAME_MAX = 256
_DISCORD_EMBED_FIELD_VALUE_MAX = 1024
_DISCORD_EMBED_FIELDS_MAX = 25
_DISCORD_EMBED_TOTAL_MAX = 6000

DISCORD_SEVERITY_COLORS = {
"info": 0x3498DB, # blue
"warning": 0xF39C12, # amber
"critical": 0xE74C3C, # red
}


class NotificationChannel(ABC):
"""Base class for notification channels."""
Expand Down Expand Up @@ -46,6 +64,82 @@ def send(self, payload: dict) -> bool:
return False


class DiscordWebhookChannel(NotificationChannel):
"""Discord-native webhook channel with rich embed formatting."""

def __init__(self, url):
self._url = url
self._log_label = "Discord webhook"

@staticmethod
def _format_embed(payload: dict) -> dict:
"""Convert a DOCSight notification payload into a Discord embed."""
severity = payload.get("severity", "info")
event_type = payload.get("event_type", "unknown")
message = payload.get("message", "")
details = payload.get("details") or {}
timestamp = payload.get("timestamp", utc_now())

title = f"{severity.upper()}: {event_type.replace('_', ' ').title()}"
embed = {
"title": title[:_DISCORD_EMBED_TITLE_MAX],
"description": message[:_DISCORD_EMBED_DESC_MAX],
"color": DISCORD_SEVERITY_COLORS.get(severity, 0x95A5A6),
"timestamp": timestamp,
"footer": {"text": "DOCSight"},
}

footer_text = embed["footer"]["text"]
total_chars = len(embed["title"]) + len(embed["description"]) + len(footer_text)
fields = []
for key, value in details.items():
if isinstance(value, (dict, list)):
value = json.dumps(value, default=str)
name = key.replace("_", " ").title()[:_DISCORD_EMBED_FIELD_NAME_MAX]
val_str = str(value)[:_DISCORD_EMBED_FIELD_VALUE_MAX]
total_chars += len(name) + len(val_str)
if total_chars > _DISCORD_EMBED_TOTAL_MAX:
break
fields.append({
"name": name,
"value": val_str,
"inline": len(str(value)) < 40,
})
if len(fields) >= _DISCORD_EMBED_FIELDS_MAX:
break
if fields:
embed["fields"] = fields

return embed

def send(self, payload: dict) -> bool:
try:
discord_payload = {"embeds": [self._format_embed(payload)]}
r = requests.post(
self._url,
json=discord_payload,
headers={"Content-Type": "application/json"},
timeout=10,
)
r.raise_for_status()
return True
except requests.HTTPError as e:
# Sanitize: raise_for_status() embeds the full URL (with token)
log.warning(
"Discord webhook POST failed (%s): HTTP %s",
self._log_label, e.response.status_code if e.response is not None else "unknown",
)
return False
except Exception as e:
log.warning("Discord webhook POST failed (%s): %s", self._log_label, type(e).__name__)
return False


def is_discord_webhook_url(url: str) -> bool:
"""Check if a URL is a Discord webhook endpoint."""
return bool(_DISCORD_URL_RE.match(url))


class NotificationDispatcher:
"""Routes events through severity filter and cooldown to notification channels."""

Expand All @@ -69,12 +163,16 @@ def __init__(self, config_mgr):
def _setup_channels(self):
url = self._config_mgr.get("notify_webhook_url")
if url:
headers = {}
token = self._config_mgr.get("notify_webhook_token")
if token:
headers["Authorization"] = f"Bearer {token}"
self._channels.append(WebhookChannel(url, headers))
log.info("Notification channel: webhook configured")
if is_discord_webhook_url(url):
self._channels.append(DiscordWebhookChannel(url))
log.info("Notification channel: Discord webhook configured")
else:
headers = {}
token = self._config_mgr.get("notify_webhook_token")
if token:
headers["Authorization"] = f"Bearer {token}"
self._channels.append(WebhookChannel(url, headers))
log.info("Notification channel: webhook configured")

def dispatch(self, events: list):
"""Send qualifying events to all configured channels."""
Expand Down
Loading
Loading