Skip to content

Commit 7d7628d

Browse files
committed
fix checkpoint
1 parent b571e31 commit 7d7628d

29 files changed

Lines changed: 296 additions & 362 deletions

protocol/proto/storage.proto

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,12 @@ message KafkaSourceSubtaskCheckpoint {
5858
repeated KafkaPartitionOffset partitions = 4;
5959
}
6060

61-
// Generic source checkpoint payload envelope (enum-like via oneof).
62-
message SourceCheckpointPayload {
63-
oneof checkpoint {
61+
// Aggregated, epoch-aligned per-source checkpoint info persisted by the coordinator
62+
// (one entry per source subtask). The envelope decouples the catalog/meta-store from
63+
// any specific source implementation so new source types can be added by registering a
64+
// new `CheckpointAggregator` in the runtime without touching storage schema layouts.
65+
message SourceCheckpointInfo {
66+
oneof info {
6467
KafkaSourceSubtaskCheckpoint kafka = 1;
6568
}
6669
}
@@ -81,10 +84,10 @@ message StreamingTableDefinition {
8184
// Updated by JobManager after all operators ACK. Used for crash recovery.
8285
uint64 latest_checkpoint_epoch = 6;
8386

84-
// Kafka source per-subtask offsets at the same committed epoch as `latest_checkpoint_epoch`.
85-
// Populated by the runtime coordinator from source checkpoint ACKs. Optional `.bin` files under
86-
// the job state dir may exist only for local recovery materialization from this field.
87-
repeated KafkaSourceSubtaskCheckpoint kafka_source_checkpoints = 7;
87+
// Source-agnostic per-subtask checkpoint entries aligned with `latest_checkpoint_epoch`.
88+
// Populated by the runtime CheckpointCoordinator via the configured aggregator registry;
89+
// storage no longer knows the internal layout of any particular source type.
90+
repeated SourceCheckpointInfo source_checkpoints = 7;
8891
}
8992

9093
// =============================================================================

src/coordinator/execution/executor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ impl PlanVisitor for Executor {
368368
fs_program,
369369
custom_interval,
370370
None,
371+
vec![],
371372
))
372373
})
373374
.map_err(|e| ExecuteError::Internal(format!("Failed to submit streaming job: {e}")))?;

src/runtime/streaming/api/context.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::time::{Duration, SystemTime};
1616

1717
use anyhow::{Context, Result, anyhow};
1818
use arrow_array::RecordBatch;
19-
use protocol::storage::SourceCheckpointPayload;
19+
use protocol::storage::SourceCheckpointInfo;
2020
use tokio::sync::mpsc;
2121

2222
use crate::runtime::memory::{MemoryBlock, MemoryPool, get_array_memory_size};
@@ -78,7 +78,7 @@ pub struct TaskContext {
7878
/// Last globally-committed safe epoch for crash recovery.
7979
safe_epoch: u64,
8080

81-
/// When set, pipelines report checkpoint completion (and optional Kafka offsets) to the job coordinator.
81+
/// When set, pipelines report checkpoint completion to the job coordinator.
8282
checkpoint_ack_tx: Option<mpsc::Sender<JobMasterEvent>>,
8383
}
8484

@@ -128,17 +128,13 @@ impl TaskContext {
128128
}
129129

130130
/// Notify the job checkpoint coordinator that this pipeline has finished the barrier for `epoch`.
131-
pub async fn send_checkpoint_ack(
132-
&self,
133-
epoch: u64,
134-
source_payloads: Vec<SourceCheckpointPayload>,
135-
) {
131+
pub async fn send_checkpoint_ack(&self, epoch: u64, source_infos: Vec<SourceCheckpointInfo>) {
136132
if let Some(tx) = &self.checkpoint_ack_tx {
137133
let _ = tx
138134
.send(JobMasterEvent::CheckpointAck {
139135
pipeline_id: self.pipeline_id,
140136
epoch,
141-
source_payloads,
137+
source_infos,
142138
})
143139
.await;
144140
}

src/runtime/streaming/api/operator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub trait Operator: Send + 'static {
6666
/// `commit_transaction` on the producer stashed during [`Self::snapshot_state`].
6767
async fn commit_checkpoint(
6868
&mut self,
69-
epoch: u32,
69+
epoch: u64,
7070
_ctx: &mut TaskContext,
7171
) -> anyhow::Result<()> {
7272
let _ = epoch;
@@ -76,7 +76,7 @@ pub trait Operator: Send + 'static {
7676
/// Global checkpoint **rollback** when phase 2 must not commit (e.g. catalog persist failed).
7777
///
7878
/// Default is no-op. Transactional Kafka sink overrides with `abort_transaction` on the stashed producer.
79-
async fn abort_checkpoint(&mut self, epoch: u32, _ctx: &mut TaskContext) -> anyhow::Result<()> {
79+
async fn abort_checkpoint(&mut self, epoch: u64, _ctx: &mut TaskContext) -> anyhow::Result<()> {
8080
let _ = epoch;
8181
Ok(())
8282
}

src/runtime/streaming/api/source.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::sql::common::{CheckpointBarrier, Watermark};
1515
use arrow_array::RecordBatch;
1616
use async_trait::async_trait;
1717
use protocol::storage::{
18-
KafkaSourceSubtaskCheckpoint, SourceCheckpointPayload, source_checkpoint_payload,
18+
KafkaSourceSubtaskCheckpoint, SourceCheckpointInfo, source_checkpoint_info,
1919
};
2020

2121
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
@@ -34,17 +34,19 @@ pub enum SourceEvent {
3434
EndOfStream,
3535
}
3636

37-
/// Optional metadata returned when a source completes a checkpoint barrier snapshot.
37+
/// Checkpoint metadata produced by a source subtask during a barrier snapshot.
38+
/// Sources fill this directly with [`SourceCheckpointInfo`] — the coordinator collects
39+
/// and persists these entries without any further translation step.
3840
#[derive(Debug, Default, Clone)]
3941
pub struct SourceCheckpointReport {
40-
pub payloads: Vec<SourceCheckpointPayload>,
42+
pub infos: Vec<SourceCheckpointInfo>,
4143
}
4244

4345
impl SourceCheckpointReport {
4446
pub fn from_kafka_checkpoint(kafka: KafkaSourceSubtaskCheckpoint) -> Self {
4547
Self {
46-
payloads: vec![SourceCheckpointPayload {
47-
checkpoint: Some(source_checkpoint_payload::Checkpoint::Kafka(kafka)),
48+
infos: vec![SourceCheckpointInfo {
49+
info: Some(source_checkpoint_info::Info::Kafka(kafka)),
4850
}],
4951
}
5052
}
@@ -54,6 +56,11 @@ impl SourceCheckpointReport {
5456
pub trait SourceOperator: Send + 'static {
5557
fn name(&self) -> &str;
5658

59+
/// Inject persisted checkpoint records before the source is started.
60+
/// Called by the engine after the operator is constructed and before [`Self::on_start`].
61+
/// Default implementation is a no-op; sources with stateful recovery override this.
62+
fn set_recovery_checkpoint(&mut self, _infos: Vec<SourceCheckpointInfo>) {}
63+
5764
async fn on_start(&mut self, _ctx: &mut TaskContext) -> anyhow::Result<()> {
5865
Ok(())
5966
}
@@ -74,15 +81,15 @@ pub trait SourceOperator: Send + 'static {
7481
/// Kafka source keeps the default: offsets are reported at the barrier in [`Self::snapshot_state`].
7582
async fn commit_checkpoint(
7683
&mut self,
77-
epoch: u32,
84+
epoch: u64,
7885
_ctx: &mut TaskContext,
7986
) -> anyhow::Result<()> {
8087
let _ = epoch;
8188
Ok(())
8289
}
8390

8491
/// Same rollback hook as [`super::operator::Operator::abort_checkpoint`].
85-
async fn abort_checkpoint(&mut self, epoch: u32, _ctx: &mut TaskContext) -> anyhow::Result<()> {
92+
async fn abort_checkpoint(&mut self, epoch: u64, _ctx: &mut TaskContext) -> anyhow::Result<()> {
8693
let _ = epoch;
8794
Ok(())
8895
}

src/runtime/streaming/execution/pipeline.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl Pipeline {
110110
}
111111
}
112112
AlignmentStatus::Complete => {
113-
let epoch = barrier.epoch as u64;
113+
let epoch = barrier.epoch;
114114
self.chain_head
115115
.process_event(
116116
idx,

src/runtime/streaming/execution/source_driver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ impl SourceDriver {
161161
let b: CheckpointBarrier = barrier.clone().into();
162162
let report = self.operator.snapshot_state(b, &mut self.ctx).await?;
163163
self.dispatch_event(StreamEvent::Barrier(b)).await?;
164-
pending_source_checkpoint = Some((b.epoch as u64, report));
164+
pending_source_checkpoint = Some((b.epoch, report));
165165
}
166166
ControlCommand::Commit { epoch } => {
167167
self.operator
@@ -186,7 +186,7 @@ impl SourceDriver {
186186
}
187187

188188
if let Some((epoch, report)) = pending_source_checkpoint {
189-
self.ctx.send_checkpoint_ack(epoch, report.payloads).await;
189+
self.ctx.send_checkpoint_ack(epoch, report.infos).await;
190190
}
191191

192192
Ok(stop)

src/runtime/streaming/execution/tracker/barrier_aligner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub enum AlignmentStatus {
2323
#[derive(Debug)]
2424
pub struct BarrierAligner {
2525
input_count: usize,
26-
current_epoch: Option<u32>,
26+
current_epoch: Option<u64>,
2727
reached_inputs: HashSet<usize>,
2828
}
2929

0 commit comments

Comments
 (0)