Skip to content

Commit 7547a3f

Browse files
committed
feat(streaming): persist Join operator state with LSM-Tree and 3-phase watermark harvesting
- Extend TaskContext with state_dir, memory_controller, io_manager, and safe_epoch to bridge operators with the state engine - Refactor JoinWithExpirationOperator: replace in-memory VecDeque with PersistentStateBuffer backed by OperatorStateStore, using composite keys [Side(1B) + Timestamp(8B BE)] and BTreeSet timeline index - Refactor InstantJoinOperator: replace in-memory BTreeMap<SystemTime, JoinInstance> with LSM-Tree persistence, split process_watermark into 3-phase pipeline (harvest -> compute -> cleanup) to eliminate interleaved mutable/immutable borrow conflicts - Both operators now support on_start recovery via restore_metadata and snapshot_state via snapshot_epoch for exactly-once semantics Made-with: Cursor
1 parent ac946f4 commit 7547a3f

10 files changed

Lines changed: 605 additions & 227 deletions

File tree

src/coordinator/execution/executor.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,15 +337,16 @@ impl PlanVisitor for Executor {
337337
custom_interval.unwrap_or(0),
338338
)
339339
.map_err(|e| {
340-
ExecuteError::Internal(format!(
341-
"Streaming job persistence failed: {e}",
342-
))
340+
ExecuteError::Internal(format!("Streaming job persistence failed: {e}",))
343341
})?;
344342

345343
let job_id = tokio::task::block_in_place(|| {
346-
tokio::runtime::Handle::current().block_on(
347-
job_manager.submit_job(job_id, fs_program, custom_interval, None),
348-
)
344+
tokio::runtime::Handle::current().block_on(job_manager.submit_job(
345+
job_id,
346+
fs_program,
347+
custom_interval,
348+
None,
349+
))
349350
})
350351
.map_err(|e| ExecuteError::Internal(format!("Failed to submit streaming job: {e}")))?;
351352

src/runtime/streaming/api/context.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13+
use std::path::PathBuf;
1314
use std::sync::Arc;
1415
use std::time::{Duration, SystemTime};
1516

@@ -19,6 +20,7 @@ use arrow_array::RecordBatch;
1920
use crate::runtime::streaming::memory::MemoryPool;
2021
use crate::runtime::streaming::network::endpoint::PhysicalSender;
2122
use crate::runtime::streaming::protocol::event::{StreamEvent, TrackedEvent};
23+
use crate::runtime::streaming::state::{IoManager, MemoryController};
2224

2325
#[derive(Debug, Clone)]
2426
pub struct TaskContextConfig {
@@ -61,6 +63,18 @@ pub struct TaskContext {
6163

6264
/// Subtask-level tunables.
6365
config: TaskContextConfig,
66+
67+
/// Root directory for operator state persistence (LSM-Tree data/tombstone files).
68+
pub state_dir: PathBuf,
69+
70+
/// Shared memory controller for state engine back-pressure.
71+
pub memory_controller: Arc<MemoryController>,
72+
73+
/// I/O thread pool handle for background spill/compaction.
74+
pub io_manager: IoManager,
75+
76+
/// Last globally-committed safe epoch for crash recovery.
77+
safe_epoch: u64,
6478
}
6579

6680
impl TaskContext {
@@ -71,6 +85,10 @@ impl TaskContext {
7185
parallelism: u32,
7286
downstream_senders: Vec<PhysicalSender>,
7387
memory_pool: Arc<MemoryPool>,
88+
memory_controller: Arc<MemoryController>,
89+
io_manager: IoManager,
90+
state_dir: PathBuf,
91+
safe_epoch: u64,
7492
) -> Self {
7593
let task_name = format!(
7694
"Task-[{}]-Pipe[{}]-Sub[{}/{}]",
@@ -87,9 +105,18 @@ impl TaskContext {
87105
memory_pool,
88106
current_watermark: None,
89107
config: TaskContextConfig::default(),
108+
state_dir,
109+
memory_controller,
110+
io_manager,
111+
safe_epoch,
90112
}
91113
}
92114

115+
#[inline]
116+
pub fn latest_safe_epoch(&self) -> u64 {
117+
self.safe_epoch
118+
}
119+
93120
#[inline]
94121
pub fn config(&self) -> &TaskContextConfig {
95122
&self.config

src/runtime/streaming/job/job_manager.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use anyhow::{Context, Result, anyhow, bail, ensure};
1919
use tokio::sync::mpsc;
2020
use tokio::task::JoinHandle as TokioJoinHandle;
2121
use tokio_stream::wrappers::ReceiverStream;
22-
use tracing::{error, info, warn, debug};
22+
use tracing::{debug, error, info, warn};
2323

2424
use protocol::function_stream_graph::{ChainedOperator, FsProgram};
2525

@@ -34,7 +34,7 @@ use crate::runtime::streaming::job::models::{
3434
};
3535
use crate::runtime::streaming::memory::MemoryPool;
3636
use crate::runtime::streaming::network::endpoint::{BoxedEventStream, PhysicalSender};
37-
use crate::runtime::streaming::protocol::control::{ControlCommand, StopMode, JobMasterEvent};
37+
use crate::runtime::streaming::protocol::control::{ControlCommand, JobMasterEvent, StopMode};
3838
use crate::runtime::streaming::protocol::event::CheckpointBarrier;
3939
use crate::runtime::streaming::state::{IoManager, IoPool, MemoryController, NoopMetricsCollector};
4040
use crate::storage::stream_catalog::CatalogManager;
@@ -132,7 +132,8 @@ impl JobManager {
132132
state_config.max_background_spills,
133133
state_config.max_background_compactions,
134134
metrics,
135-
).context("Failed to initialize state engine I/O pool")?;
135+
)
136+
.context("Failed to initialize state engine I/O pool")?;
136137

137138
Ok(Self {
138139
active_jobs: Arc::new(RwLock::new(HashMap::new())),
@@ -153,7 +154,12 @@ impl JobManager {
153154
state_config: StateConfig,
154155
) -> Result<()> {
155156
GLOBAL_JOB_MANAGER
156-
.set(Arc::new(Self::new(factory, memory_bytes, state_base_dir, state_config)?))
157+
.set(Arc::new(Self::new(
158+
factory,
159+
memory_bytes,
160+
state_base_dir,
161+
state_config,
162+
)?))
157163
.map_err(|_| anyhow!("JobManager singleton already initialized"))
158164
}
159165

@@ -217,8 +223,8 @@ impl JobManager {
217223
pipelines.insert(pipeline_id, pipeline);
218224
}
219225

220-
let interval_ms = custom_checkpoint_interval_ms
221-
.unwrap_or(self.state_config.checkpoint_interval_ms);
226+
let interval_ms =
227+
custom_checkpoint_interval_ms.unwrap_or(self.state_config.checkpoint_interval_ms);
222228

223229
self.spawn_checkpoint_coordinator(
224230
job_id.clone(),
@@ -425,9 +431,9 @@ impl JobManager {
425431
pipeline_id: u32,
426432
operators: &[ChainedOperator],
427433
edge_manager: &mut EdgeManager,
428-
_job_state_dir: &Path,
434+
job_state_dir: &Path,
429435
_job_master_tx: mpsc::Sender<JobMasterEvent>,
430-
_recovery_epoch: u64,
436+
recovery_epoch: u64,
431437
) -> Result<(PhysicalPipeline, bool)> {
432438
let (raw_inboxes, raw_outboxes) =
433439
edge_manager.take_endpoints(pipeline_id).with_context(|| {
@@ -479,6 +485,10 @@ impl JobManager {
479485
parallelism,
480486
physical_outboxes,
481487
Arc::clone(&self.memory_pool),
488+
Arc::clone(&self.memory_controller),
489+
self.io_manager_client.clone(),
490+
job_state_dir.to_path_buf(),
491+
recovery_epoch,
482492
);
483493

484494
let runner = if let Some(source) = chain.source {

0 commit comments

Comments
 (0)