11use crate :: models;
2- // use crate::versions::PkgVerCmp ;
2+ use diesel :: Connection ;
33use diesel:: SqliteConnection ;
44use rebuilderd_common:: { PkgGroup , Status } ;
55use rebuilderd_common:: api:: * ;
66use rebuilderd_common:: errors:: * ;
7- // use std::cmp::Ordering;
87use std:: collections:: HashMap ;
98
109const DEFAULT_QUEUE_PRIORITY : i32 = 1 ;
@@ -30,12 +29,18 @@ impl CurrentArtifactNamespace {
3029 } )
3130 }
3231
32+ fn gen_key ( name : & str , version : & str , architecture : & str ) -> String {
33+ format ! ( "{:?}-{:?}-{:?}" , name, version, architecture)
34+ }
35+
36+ #[ inline]
3337 fn gen_key_for_pkgbase ( pkgbase : & models:: PkgBase ) -> String {
34- format ! ( "{:?}-{:?}" , pkgbase. name, pkgbase. version)
38+ Self :: gen_key ( & pkgbase. name , & pkgbase. version , & pkgbase . architecture )
3539 }
3640
41+ #[ inline]
3742 fn gen_key_for_pkggroup ( pkggroup : & PkgGroup ) -> String {
38- format ! ( "{:?}-{:?}" , pkggroup. name, pkggroup. version)
43+ Self :: gen_key ( & pkggroup. name , & pkggroup. version , & pkggroup . architecture )
3944 }
4045
4146 // returns Some(()) if the group was already known and remove
@@ -87,124 +92,140 @@ fn pkggroup_to_newpkgbase(group: &PkgGroup) -> Result<models::NewPkgBase> {
8792}
8893
8994fn sync ( import : & SuiteImport , connection : & SqliteConnection ) -> Result < ( ) > {
90- let distro = & import. distro ;
91- let suite = & import. suite ;
92-
93- info ! ( "received submitted artifact groups {:?}" , import. groups. len( ) ) ;
94- let mut new_namespace = NewArtifactNamespace :: new ( ) ;
95-
96- info ! ( "loading existing artifact groups from database..." ) ;
97- let mut current_namespace = CurrentArtifactNamespace :: load_current_namespace_from_database ( distro, suite, connection) ?;
98- info ! ( "found existing artifact groups: len={}" , current_namespace. pkgbases. len( ) ) ;
99-
100- info ! ( "checking groups already in the database..." ) ;
101- let mut num_already_in_database = 0 ;
102- for group in & import. groups {
103- trace ! ( "received group during import: {:?}" , group) ;
104- if current_namespace. mark_pkggroup_still_present ( group) . is_some ( ) {
105- num_already_in_database += 1 ;
106- } else {
107- new_namespace. add ( group. clone ( ) ) ;
95+ connection. transaction :: < ( ) , _ , _ > ( || {
96+ let distro = & import. distro ;
97+ let suite = & import. suite ;
98+
99+ info ! ( "received submitted artifact groups {:?}" , import. groups. len( ) ) ;
100+ let mut new_namespace = NewArtifactNamespace :: new ( ) ;
101+
102+ info ! ( "loading existing artifact groups from database..." ) ;
103+ let mut current_namespace = CurrentArtifactNamespace :: load_current_namespace_from_database ( distro, suite, connection) ?;
104+ info ! ( "found existing artifact groups: len={}" , current_namespace. pkgbases. len( ) ) ;
105+
106+ info ! ( "checking groups already in the database..." ) ;
107+ let mut num_already_in_database = 0 ;
108+ for group in & import. groups {
109+ trace ! ( "received group during import: {:?}" , group) ;
110+ if current_namespace. mark_pkggroup_still_present ( group) . is_some ( ) {
111+ num_already_in_database += 1 ;
112+ } else {
113+ new_namespace. add ( group. clone ( ) ) ;
114+ }
108115 }
109- }
110- info ! ( "found groups already in database: len={}" , num_already_in_database) ;
111- info ! ( "found groups that need to be added to database: len={}" , new_namespace. groups. len( ) ) ;
112- info ! ( "found groups no longer present: len={}" , current_namespace. pkgbases. len( ) ) ;
113-
114- for ( key, pkgbase_to_remove) in current_namespace. pkgbases {
115- debug ! ( "deleting old group with key={:?}" , key) ;
116- models:: PkgBase :: delete ( pkgbase_to_remove. id , connection)
117- . with_context ( || anyhow ! ( "Failed to delete pkgbase with key={:?}" , key) ) ?;
118- }
119-
120- // inserting new groups
121- let mut progress_group_insert = 0 ;
122- for group_batch in new_namespace. groups . chunks ( 1_000 ) {
123- progress_group_insert += group_batch. len ( ) ;
124- info ! ( "inserting new groups in batch: {}/{}" , progress_group_insert, new_namespace. groups. len( ) ) ;
125- let group_batch = group_batch. iter ( )
126- . map ( pkggroup_to_newpkgbase)
127- . collect :: < Result < Vec < _ > > > ( ) ?;
128- if log:: log_enabled!( log:: Level :: Trace ) {
129- for group in & group_batch {
130- trace ! ( "group in this batch: {:?}" , group) ;
116+ info ! ( "found groups already in database: len={}" , num_already_in_database) ;
117+ info ! ( "found groups that need to be added to database: len={}" , new_namespace. groups. len( ) ) ;
118+ info ! ( "found groups no longer present: len={}" , current_namespace. pkgbases. len( ) ) ;
119+
120+ let pkgbase_delete_queue = current_namespace. pkgbases . values ( ) . map ( |x| x. id ) . collect :: < Vec < _ > > ( ) ;
121+
122+ // deleting groups no longer present
123+ let mut progress_group_delete = 0 ;
124+ for delete_batch in pkgbase_delete_queue. chunks ( 1_000 ) {
125+ progress_group_delete += delete_batch. len ( ) ;
126+ info ! ( "deleting groups in batch: {}/{}" , progress_group_delete, pkgbase_delete_queue. len( ) ) ;
127+
128+ if log:: log_enabled!( log:: Level :: Trace ) {
129+ for pkgbase in delete_batch {
130+ trace ! ( "pkgbase in this batch: {:?}" , pkgbase) ;
131+ }
131132 }
133+
134+ models:: PkgBase :: delete_batch ( delete_batch, connection)
135+ . with_context ( || anyhow ! ( "Failed to delete pkgbases" ) ) ?;
132136 }
133- models:: NewPkgBase :: insert_batch ( & group_batch, connection) ?;
134- }
135137
136- // detecting pkgbase ids for new artifacts
137- let mut progress_pkgbase_detect = 0 ;
138- let mut backlog_insert_pkgs = Vec :: new ( ) ;
139- let mut backlog_insert_queue = Vec :: new ( ) ;
140- for group_batch in new_namespace. groups . chunks ( 1_000 ) {
141- progress_pkgbase_detect += group_batch. len ( ) ;
142- info ! ( "detecting pkgbase ids for new artifacts: {}/{}" , progress_pkgbase_detect, new_namespace. groups. len( ) ) ;
143- for group in group_batch {
144- debug ! ( "searching for pkgbases {:?} {:?} {:?} {:?} {:?}" , group. name, group. version, distro, suite, group. architecture) ;
145- let pkgbases = models:: PkgBase :: get_by ( & group. name ,
146- distro,
147- suite,
148- Some ( & group. version ) ,
149- Some ( & group. architecture ) ,
150- connection) ?;
151-
152- if pkgbases. len ( ) != 1 {
153- bail ! ( "Failed to determine pkgbase in database for grouop (expected=1, found={}): {:?}" , pkgbases. len( ) , group) ;
154- }
155- let pkgbase_id = pkgbases[ 0 ] . id ;
156-
157- for artifact in & group. artifacts {
158- backlog_insert_pkgs. push ( models:: NewPackage {
159- pkgbase_id,
160- name : artifact. name . clone ( ) ,
161- version : artifact. version . clone ( ) ,
162- status : Status :: Unknown . to_string ( ) ,
163- distro : distro. clone ( ) ,
164- suite : suite. clone ( ) ,
165- architecture : group. architecture . clone ( ) ,
166- artifact_url : artifact. url . clone ( ) ,
167- build_id : None ,
168- built_at : None ,
169- has_diffoscope : false ,
170- has_attestation : false ,
171- checksum : None ,
172- } ) ;
138+ // inserting new groups
139+ let mut progress_group_insert = 0 ;
140+ for group_batch in new_namespace. groups . chunks ( 1_000 ) {
141+ progress_group_insert += group_batch. len ( ) ;
142+ info ! ( "inserting new groups in batch: {}/{}" , progress_group_insert, new_namespace. groups. len( ) ) ;
143+ let group_batch = group_batch. iter ( )
144+ . map ( pkggroup_to_newpkgbase)
145+ . collect :: < Result < Vec < _ > > > ( ) ?;
146+ if log:: log_enabled!( log:: Level :: Trace ) {
147+ for group in & group_batch {
148+ trace ! ( "group in this batch: {:?}" , group) ;
149+ }
173150 }
151+ models:: NewPkgBase :: insert_batch ( & group_batch, connection) ?;
152+ }
174153
175- backlog_insert_queue. push ( models:: NewQueued :: new ( pkgbase_id,
176- group. version . clone ( ) ,
177- distro. to_string ( ) ,
178- DEFAULT_QUEUE_PRIORITY ) ) ;
154+ // detecting pkgbase ids for new artifacts
155+ let mut progress_pkgbase_detect = 0 ;
156+ let mut backlog_insert_pkgs = Vec :: new ( ) ;
157+ let mut backlog_insert_queue = Vec :: new ( ) ;
158+ for group_batch in new_namespace. groups . chunks ( 1_000 ) {
159+ progress_pkgbase_detect += group_batch. len ( ) ;
160+ info ! ( "detecting pkgbase ids for new artifacts: {}/{}" , progress_pkgbase_detect, new_namespace. groups. len( ) ) ;
161+ for group in group_batch {
162+ debug ! ( "searching for pkgbases {:?} {:?} {:?} {:?} {:?}" , group. name, group. version, distro, suite, group. architecture) ;
163+ let pkgbases = models:: PkgBase :: get_by ( & group. name ,
164+ distro,
165+ suite,
166+ Some ( & group. version ) ,
167+ Some ( & group. architecture ) ,
168+ connection) ?;
169+
170+ if pkgbases. len ( ) != 1 {
171+ bail ! ( "Failed to determine pkgbase in database for grouop (expected=1, found={}): {:?}" , pkgbases. len( ) , group) ;
172+ }
173+ let pkgbase_id = pkgbases[ 0 ] . id ;
174+
175+ for artifact in & group. artifacts {
176+ backlog_insert_pkgs. push ( models:: NewPackage {
177+ pkgbase_id,
178+ name : artifact. name . clone ( ) ,
179+ version : artifact. version . clone ( ) ,
180+ status : Status :: Unknown . to_string ( ) ,
181+ distro : distro. clone ( ) ,
182+ suite : suite. clone ( ) ,
183+ architecture : group. architecture . clone ( ) ,
184+ artifact_url : artifact. url . clone ( ) ,
185+ build_id : None ,
186+ built_at : None ,
187+ has_diffoscope : false ,
188+ has_attestation : false ,
189+ checksum : None ,
190+ } ) ;
191+ }
192+
193+ backlog_insert_queue. push ( models:: NewQueued :: new ( pkgbase_id,
194+ group. version . clone ( ) ,
195+ distro. to_string ( ) ,
196+ DEFAULT_QUEUE_PRIORITY ) ) ;
197+ }
179198 }
180- }
181199
182- // inserting new packages
183- let mut progress_pkg_inserts = 0 ;
184- for pkg_batch in backlog_insert_pkgs. chunks ( 1_000 ) {
185- progress_pkg_inserts += pkg_batch. len ( ) ;
186- info ! ( "inserting new packages in batch: {}/{}" , progress_pkg_inserts, backlog_insert_pkgs. len( ) ) ;
187- if log:: log_enabled!( log:: Level :: Trace ) {
188- for pkg in pkg_batch {
189- trace ! ( "pkg in this batch: {:?}" , pkg) ;
200+ // inserting new packages
201+ let mut progress_pkg_inserts = 0 ;
202+ for pkg_batch in backlog_insert_pkgs. chunks ( 1_000 ) {
203+ progress_pkg_inserts += pkg_batch. len ( ) ;
204+ info ! ( "inserting new packages in batch: {}/{}" , progress_pkg_inserts, backlog_insert_pkgs. len( ) ) ;
205+ if log:: log_enabled!( log:: Level :: Trace ) {
206+ for pkg in pkg_batch {
207+ trace ! ( "pkg in this batch: {:?}" , pkg) ;
208+ }
190209 }
210+ models:: NewPackage :: insert_batch ( pkg_batch, connection) ?;
191211 }
192- models:: NewPackage :: insert_batch ( pkg_batch, connection) ?;
193- }
194212
195- // inserting to queue
196- // TODO: check if queueing has been disabled in the request, eg. to initially fill the database
197- let mut progress_queue_inserts = 0 ;
198- for queue_batch in backlog_insert_queue. chunks ( 1_000 ) {
199- progress_queue_inserts += queue_batch. len ( ) ;
200- info ! ( "inserting to queue in batch: {}/{}" , progress_queue_inserts, backlog_insert_queue. len( ) ) ;
201- if log:: log_enabled!( log:: Level :: Trace ) {
202- for queued in queue_batch {
203- trace ! ( "queued in this batch: {:?}" , queued) ;
213+ // inserting to queue
214+ // TODO: check if queueing has been disabled in the request, eg. to initially fill the database
215+ let mut progress_queue_inserts = 0 ;
216+ for queue_batch in backlog_insert_queue. chunks ( 1_000 ) {
217+ progress_queue_inserts += queue_batch. len ( ) ;
218+ info ! ( "inserting to queue in batch: {}/{}" , progress_queue_inserts, backlog_insert_queue. len( ) ) ;
219+ if log:: log_enabled!( log:: Level :: Trace ) {
220+ for queued in queue_batch {
221+ trace ! ( "queued in this batch: {:?}" , queued) ;
222+ }
204223 }
224+ models:: Queued :: insert_batch ( queue_batch, connection) ?;
205225 }
206- models:: Queued :: insert_batch ( queue_batch, connection) ?;
207- }
226+
227+ Ok ( ( ) )
228+ } ) ?;
208229
209230 info ! ( "successfully synced import to database" ) ;
210231
0 commit comments