@@ -48,6 +48,7 @@ impl PostgresRepository {
48
48
let manager = ConnectionManager :: < PgConnection > :: new ( database_url) ;
49
49
let pool = r2d2:: Pool :: new ( manager) ?;
50
50
let migrations = FileBasedMigrations :: from_path ( migrations_directory) ?;
51
+
51
52
Ok ( Self { pool, migrations } )
52
53
}
53
54
}
@@ -60,20 +61,27 @@ impl PostgresRepository {
60
61
61
62
pub fn any_pending_migrations ( & self ) -> Result < bool > {
62
63
let mut conn = self . pool . get ( ) ?;
64
+
63
65
conn. has_pending_migration ( self . migrations . clone ( ) )
64
66
. map_err ( |e| anyhow ! ( e) )
65
67
. context ( "failed checking pending migrations" )
66
68
}
67
69
68
70
pub fn run_pending_migrations ( & self ) -> Result < ( ) > {
69
71
let mut conn = self . pool . get ( ) ?;
72
+
70
73
conn. run_pending_migrations ( self . migrations . clone ( ) )
71
74
. map_err ( |e| anyhow ! ( e) )
72
75
. context ( "failed running pending migrations" ) ?;
76
+
73
77
Ok ( ( ) )
74
78
}
75
79
76
- /// Insert objects (dedup by `cve` via ON CONFLICT DO NOTHING) in batches.
80
+ /// Insert a list of objects into the database if they don't already exist.
81
+ ///
82
+ /// Insertion is done in batches of size `KEPLER__BATCH_SIZE` to avoid exceeding the maximum number of parameters = *(65535)* for PostgreSQL
83
+ ///
84
+ /// Returns a [`HashMap<String, i32>`] of CVE IDs to their assigned object IDs.
77
85
pub fn insert_objects (
78
86
& self ,
79
87
objects_to_insert : Vec < models:: NewObject > ,
@@ -86,12 +94,14 @@ impl PostgresRepository {
86
94
for chunk in objects_to_insert. chunks ( * KEPLER_BATCH_SIZE ) {
87
95
let inserted_ids: HashMap < String , i32 > =
88
96
self . batch_insert_objects ( chunk) ?. into_iter ( ) . collect ( ) ;
97
+
89
98
inserted_object_ids. extend ( inserted_ids) ;
90
99
}
100
+
91
101
Ok ( inserted_object_ids)
92
102
}
93
103
94
- /// Inserts [`schema::objects`] in batches of size `KEPLER__BATCH_SIZE`.
104
+ /// Inserts [`schema::objects`] into database in batches of size `KEPLER__BATCH_SIZE`.
95
105
pub fn batch_insert_objects (
96
106
& self ,
97
107
values_list : & [ models:: NewObject ] ,
@@ -114,7 +124,7 @@ impl PostgresRepository {
114
124
log:: warn!( "Zero object records are inserted!" ) ;
115
125
}
116
126
117
- // Query back to get IDs (idempotent whether inserted-or-existing) .
127
+ // Query back the inserted records to get their assigned IDs .
118
128
let inserted_objects = objects
119
129
. filter ( cve. eq_any ( & object_cves) )
120
130
. select ( ( cve, id) )
@@ -125,9 +135,12 @@ impl PostgresRepository {
125
135
} )
126
136
}
127
137
128
- /// Batch insert CVEs (dedup by `(cve,vendor,product)`).
138
+ /// Batch insert CVEs if they don't already exist in the database.
139
+ ///
140
+ /// Returns the number of inserted records.
129
141
pub fn batch_insert_cves ( & self , values_list : Vec < models:: NewCVE > ) -> Result < usize > {
130
142
use schema:: cves:: dsl:: * ;
143
+
131
144
let mut conn = self . pool . get ( ) ?;
132
145
conn. transaction ( |conn| {
133
146
let inserted_count = insert_into ( cves)
@@ -136,12 +149,14 @@ impl PostgresRepository {
136
149
. do_nothing ( )
137
150
. execute ( conn)
138
151
. context ( "error creating cves in batch" ) ?;
152
+
139
153
Ok ( inserted_count)
140
154
} )
141
155
}
142
156
143
157
pub fn delete_cve ( & self , the_vendor : & str , the_product : & str , the_cve : & str ) -> Result < usize > {
144
158
use schema:: cves:: dsl:: * ;
159
+
145
160
let mut conn = self . pool . get ( ) ?;
146
161
diesel:: delete (
147
162
cves. filter (
@@ -157,6 +172,7 @@ impl PostgresRepository {
157
172
158
173
pub fn get_products ( & self ) -> Result < Vec < models:: Product > > {
159
174
use schema:: cves:: dsl:: * ;
175
+
160
176
let mut conn = self . pool . get ( ) ?;
161
177
162
178
let prods: Vec < ( String , String ) > = cves
@@ -208,21 +224,16 @@ impl PostgresRepository {
208
224
209
225
let mut conn = self . pool . get ( ) ?;
210
226
211
- // Fetch candidate rows
227
+ // fetch potential candidates for this query
212
228
let start = Instant :: now ( ) ;
213
- log:: debug!(
214
- "query candidates by vendor={:?}, product='{}'" ,
215
- query. vendor,
216
- query. product
217
- ) ;
218
229
let candidates = fetch_candidates ( & mut conn, query. vendor . as_ref ( ) , & query. product ) ?;
219
230
log:: info!(
220
231
"found {} candidates in {} ms" ,
221
232
candidates. len( ) ,
222
233
start. elapsed( ) . as_millis( )
223
234
) ;
224
235
225
- // Deserialize stored JSON payloads into source-specific structs
236
+ // deserialize all objects belonging to the potential CVEs
226
237
let start = Instant :: now ( ) ;
227
238
let sources = candidates
228
239
. into_iter ( )
@@ -249,9 +260,11 @@ impl PostgresRepository {
249
260
vendor : cve_row. vendor ,
250
261
product : cve_row. product ,
251
262
} ;
263
+
252
264
let matched_cve: MatchedCVE = match source {
253
265
Source :: Nist ( nist_cve) => ( product, nist_cve) . into ( ) ,
254
266
} ;
267
+
255
268
Some ( matched_cve)
256
269
} else {
257
270
None
@@ -269,7 +282,7 @@ impl PostgresRepository {
269
282
}
270
283
}
271
284
272
- /// Build unique objects from CVEs; value is the serialized full NVD 2.0 CVE for later re-hydration.
285
+ /// Create unique objects from the CVE list
273
286
pub fn create_unique_objects (
274
287
cve_list : & [ nist:: cve:: CVE ] ,
275
288
) -> Result < HashMap < String , models:: NewObject > > {
0 commit comments