@@ -3,7 +3,9 @@ use std::time::Instant;
3
3
4
4
use pg_sys:: { FunctionCall0Coll , InvalidOid } ;
5
5
use pgrx:: ffi:: c_char;
6
- use pgrx:: pg_sys:: { index_getprocinfo, pgstat_progress_update_param, AsPgCStr , Oid } ;
6
+ use pgrx:: pg_sys:: {
7
+ index_getprocinfo, pgstat_progress_update_param, AsPgCStr , ConditionVariable , Oid ,
8
+ } ;
7
9
use pgrx:: * ;
8
10
9
11
use crate :: access_method:: distance:: DistanceType ;
@@ -112,10 +114,22 @@ impl<'a> BuildStateParallel<'a> {
112
114
// Only update shared counter for the initializing worker until threshold is reached
113
115
if self . is_initializing_worker && self . local_ntuples <= parallel:: INITIAL_START_NODES_COUNT
114
116
{
115
- self . shared_state
117
+ let new_count = self
118
+ . shared_state
116
119
. build_state
117
120
. ntuples
118
- . fetch_add ( 1 , Ordering :: Relaxed ) ;
121
+ . fetch_add ( 1 , Ordering :: Relaxed )
122
+ + 1 ;
123
+
124
+ // Signal waiting workers when threshold is reached
125
+ if new_count >= parallel:: INITIAL_START_NODES_COUNT {
126
+ unsafe {
127
+ let cv_ptr = self . shared_state as * const ParallelShared as * mut ParallelShared ;
128
+ pg_sys:: ConditionVariableBroadcast (
129
+ & raw mut ( * cv_ptr) . build_state . initialization_cv ,
130
+ ) ;
131
+ }
132
+ }
119
133
}
120
134
}
121
135
@@ -126,6 +140,14 @@ impl<'a> BuildStateParallel<'a> {
126
140
. build_state
127
141
. initializing_worker_done
128
142
. store ( true , Ordering :: Relaxed ) ;
143
+
144
+ // Signal waiting workers that initialization is done
145
+ unsafe {
146
+ let cv_ptr = self . shared_state as * const ParallelShared as * mut ParallelShared ;
147
+ pg_sys:: ConditionVariableBroadcast (
148
+ & raw mut ( * cv_ptr) . build_state . initialization_cv ,
149
+ ) ;
150
+ }
129
151
}
130
152
131
153
self . update_shared_ntuples ( ) ;
@@ -191,6 +213,7 @@ struct ParallelBuildState {
191
213
ntuples : AtomicUsize ,
192
214
start_nodes_initialized : AtomicBool ,
193
215
initializing_worker_done : AtomicBool ,
216
+ initialization_cv : ConditionVariable ,
194
217
}
195
218
196
219
/// Status data for parallel index builds, shared among all parallel workers.
@@ -330,7 +353,7 @@ pub extern "C" fn ambuild(
330
353
let parallel_shared =
331
354
pg_sys:: shm_toc_allocate ( ( * pcxt) . toc , size_of :: < ParallelShared > ( ) )
332
355
. cast :: < ParallelShared > ( ) ;
333
- parallel_shared . write ( ParallelShared {
356
+ let shared_state = ParallelShared {
334
357
params : ParallelSharedParams {
335
358
heaprelid : heap_relation. rd_id ,
336
359
indexrelid : index_relation. rd_id ,
@@ -341,8 +364,15 @@ pub extern "C" fn ambuild(
341
364
ntuples : AtomicUsize :: new ( 0 ) ,
342
365
start_nodes_initialized : AtomicBool :: new ( false ) ,
343
366
initializing_worker_done : AtomicBool :: new ( false ) ,
367
+ initialization_cv : std:: mem:: zeroed ( ) , // Will be initialized below
344
368
} ,
345
- } ) ;
369
+ } ;
370
+ parallel_shared. write ( shared_state) ;
371
+
372
+ // Initialize the condition variable
373
+ pg_sys:: ConditionVariableInit (
374
+ & raw mut ( * parallel_shared) . build_state . initialization_cv ,
375
+ ) ;
346
376
let tablescandesc =
347
377
pg_sys:: shm_toc_allocate ( ( * pcxt) . toc , tablescandesc_size_estimate)
348
378
. cast :: < pg_sys:: ParallelTableScanDescData > ( ) ;
@@ -621,23 +651,27 @@ pub extern "C-unwind" fn _vectorscale_build_main(
621
651
} ;
622
652
623
653
if !should_initialize {
624
- loop {
625
- let ntuples = unsafe {
626
- ( * parallel_shared)
654
+ unsafe {
655
+ loop {
656
+ let ntuples = ( * parallel_shared)
627
657
. build_state
628
658
. ntuples
629
- . load ( Ordering :: Relaxed )
630
- } ;
631
- let init_done = unsafe {
632
- ( * parallel_shared)
659
+ . load ( Ordering :: Relaxed ) ;
660
+ let init_done = ( * parallel_shared)
633
661
. build_state
634
662
. initializing_worker_done
635
- . load ( Ordering :: Relaxed )
636
- } ;
637
- if ntuples >= parallel:: INITIAL_START_NODES_COUNT || init_done {
638
- break ;
663
+ . load ( Ordering :: Relaxed ) ;
664
+
665
+ if ntuples >= parallel:: INITIAL_START_NODES_COUNT || init_done {
666
+ break ;
667
+ }
668
+
669
+ // Wait on condition variable
670
+ pg_sys:: ConditionVariableSleep (
671
+ & raw mut ( * parallel_shared) . build_state . initialization_cv ,
672
+ pg_sys:: PG_WAIT_EXTENSION ,
673
+ ) ;
639
674
}
640
- std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 10 ) ) ;
641
675
}
642
676
}
643
677
debug1 ! ( "Worker should initialize: {}" , should_initialize) ;
0 commit comments