File tree Expand file tree Collapse file tree
src/runtime/streaming/state Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -170,6 +170,12 @@ impl OperatorStateStore {
170170 Ok ( ( ) )
171171 }
172172
173+ pub async fn await_spill_complete ( & self ) {
174+ while self . is_spilling . load ( Ordering :: SeqCst ) {
175+ self . spill_notify . notified ( ) . await ;
176+ }
177+ }
178+
173179 fn downgrade_active_table ( & self , epoch : u64 ) {
174180 let mut active_guard = self . active_table . write ( ) ;
175181 if active_guard. is_empty ( ) {
@@ -833,10 +839,7 @@ mod tests {
833839 let key = b"persist" . to_vec ( ) ;
834840 store. put ( key. clone ( ) , make_batch ( & [ 99 ] ) ) . await . unwrap ( ) ;
835841 store. snapshot_epoch ( 1 ) . unwrap ( ) ;
836-
837- // snapshot_epoch triggers a spill; wait for the background worker to
838- // flush the data to disk so get_batches can read it from parquet files.
839- tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 200 ) ) . await ;
842+ store. await_spill_complete ( ) . await ;
840843
841844 let result = store. get_batches ( & key) . await . unwrap ( ) ;
842845 assert ! ( !result. is_empty( ) ) ;
You can’t perform that action at this time.
0 commit comments