-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
130 lines (118 loc) · 4.28 KB
/
utils.py
File metadata and controls
130 lines (118 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import json
from fastapi import WebSocket
from typing import Dict, Any, Optional
from dataclasses import dataclass
from typing import List
import aiohttp
@dataclass
class Vulnerability:
severity: str
filename: str
line_number: int
issue_text: str
code_snippet: str
@dataclass
class ConnectionManager:
active_connections: List[WebSocket] = None
def __init__(self):
self.active_connections = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
async def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_message(self, message: Dict[str, Any], websocket: WebSocket):
await websocket.send_json(message)
async def send_slack_fix_alert(webhook_url: str, pr_url: str, vulnerability: str, repo_url: str):
"""Send Slack notification about security fix"""
message = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "🛡️ Security Vulnerability Fix Alert",
"emoji": True
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Repository:*\n{repo_url}"
},
{
"type": "mrkdwn",
"text": f"*Vulnerability:*\n{vulnerability}"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Pull Request:* <{pr_url}|View Fix PR>"
}
}
]
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=message) as response:
if response.status != 200:
raise ValueError(f"Failed to send Slack notification: {await response.text()}")
async def send_to_slack(webhook_url: str, message: dict) -> None:
"""Send message to Slack asynchronously"""
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=message) as response:
if response.status != 200:
raise ValueError(f"Error sending to Slack: {await response.text()}")
async def send_slack_scan_alert(vulnerability: Vulnerability, webhook_url: Optional[str] = None) -> None:
"""Process and send vulnerability information to Slack asynchronously"""
if webhook_url:
message = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "⚠️ Security Vulnerability Found",
"emoji": True
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Severity:*\n{vulnerability.get('issue_severity')}"
},
{
"type": "mrkdwn",
"text": f"*File:*\n{vulnerability.get('filename')}"
}
]
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Line Number:*\n{vulnerability.get('line_number')}"
},
{
"type": "mrkdwn",
"text": f"*Issue:*\n{vulnerability.get('issue_text')}"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Code:*\n```{vulnerability.get('code')}```"
}
}
]
}
await send_to_slack(webhook_url, message)