7
7
from mastodon import Mastodon
8
8
9
9
from fedivuln import config
10
- from fedivuln .utils import heartbeat , report_error
10
+ from fedivuln .monitoring import heartbeat , log
11
+ from fedivuln .utils import get_vendor_product_cve
11
12
12
13
# Set up your Mastodon instance with access credentials
13
14
if config .mastodon_clientcred_push and config .mastodon_usercred_push :
@@ -30,24 +31,46 @@ def create_status_content(event_data: str, topic: str) -> str:
30
31
status = config .templates .get (topic , "" )
31
32
match topic :
32
33
case "vulnerability" :
33
- try : # CVE
34
+ # CVE
35
+ try :
36
+ if (
37
+ event_dict ["cveMetadata" ]["datePublished" ]
38
+ != event_dict ["cveMetadata" ]["dateUpdated" ]
39
+ ):
40
+ return ""
41
+ vendor , product = get_vendor_product_cve (event_dict )[0 ]
34
42
status = status .replace ("<VULNID>" , event_dict ["cveMetadata" ]["cveId" ])
35
43
status = status .replace (
36
44
"<LINK>" ,
37
45
f"https://vulnerability.circl.lu/vuln/{ event_dict ['cveMetadata' ]['cveId' ]} " ,
38
46
)
47
+ status = status .replace ("<VENDOR>" , vendor )
48
+ status = status .replace ("<PRODUCT>" , product )
39
49
return status
40
50
except Exception :
41
51
pass
42
- try : # GHSA, PySec
52
+
53
+ # GHSA, PySec
54
+ try :
55
+ if event_dict ["published" ] != event_dict ["modified" ]:
56
+ return ""
43
57
status = status .replace ("<VULNID>" , event_dict ["id" ])
44
58
status = status .replace (
45
59
"<LINK>" , f"https://vulnerability.circl.lu/vuln/{ event_dict ['id' ]} "
46
60
)
61
+ status = status .replace ("<VENDOR>" , "" )
62
+ status = status .replace ("<PRODUCT>" , "" )
47
63
return status
48
64
except Exception :
49
65
pass
50
- try : # CSAF
66
+
67
+ # CSAF
68
+ try :
69
+ if (
70
+ event_dict ["document" ]["tracking" ]["initial_release_date" ]
71
+ != event_dict ["document" ]["tracking" ]["current_release_date" ]
72
+ ):
73
+ return ""
51
74
try :
52
75
vuln_id = event_dict ["document" ]["tracking" ]["id" ].replace (":" , "_" )
53
76
except Exception :
@@ -56,6 +79,8 @@ def create_status_content(event_data: str, topic: str) -> str:
56
79
status = status .replace (
57
80
"<LINK>" , f"https://vulnerability.circl.lu/vuln/{ vuln_id } "
58
81
)
82
+ status = status .replace ("<VENDOR>" , "" )
83
+ status = status .replace ("<PRODUCT>" , "" )
59
84
return status
60
85
except Exception :
61
86
return ""
@@ -67,7 +92,7 @@ def create_status_content(event_data: str, topic: str) -> str:
67
92
status = status .replace ("<BUNDLETITLE>" , event_dict ["payload" ]["name" ])
68
93
status = status .replace ("<LINK>" , event_dict ["uri" ])
69
94
case _:
70
- pass
95
+ status = ""
71
96
return status
72
97
73
98
@@ -117,13 +142,13 @@ def listen_to_http_event_stream(url, headers=None, params=None, topic="comment")
117
142
118
143
except requests .exceptions .RequestException as req_err :
119
144
print (f"Request error: { req_err } " )
120
- report_error ("error" , f"Request error with HTTP event stream: { req_err } " )
145
+ log ("error" , f"Request error with HTTP event stream: { req_err } " )
121
146
except KeyboardInterrupt :
122
147
print ("\n Stream interrupted by user. Closing connection." )
123
- report_error ("error" , "Stream interrupted by user. Closing connection." )
148
+ log ("error" , "Stream interrupted by user. Closing connection." )
124
149
except Exception as e :
125
150
print (f"Unexpected error: { e } " )
126
- report_error ("error" , f"Unexpected error in listen_to_http_event_stream: { e } " )
151
+ log ("error" , f"Unexpected error in listen_to_http_event_stream: { e } " )
127
152
128
153
129
154
def listen_to_valkey_stream (topic = "comment" ):
0 commit comments