Skip to content

Commit 0f9f464

Browse files
committed
update
1 parent e2f610b commit 0f9f464

22 files changed

Lines changed: 642 additions & 83 deletions

File tree

protocol/proto/storage.proto

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,21 @@ message CatalogSourceTable {
4343
// Streaming table storage (CREATE STREAMING TABLE persistence)
4444
// =============================================================================
4545

46+
// Partition offset for one Kafka partition at a completed checkpoint.
47+
message KafkaPartitionOffset {
48+
int32 partition = 1;
49+
int64 offset = 2;
50+
}
51+
52+
// Kafka source subtask checkpoint: one file / one TaskContext (pipeline + subtask).
53+
message KafkaSourceSubtaskCheckpoint {
54+
uint32 pipeline_id = 1;
55+
uint32 subtask_index = 2;
56+
// Epoch of the barrier when this snapshot was taken (aligns with latest_checkpoint_epoch on commit).
57+
uint64 checkpoint_epoch = 3;
58+
repeated KafkaPartitionOffset partitions = 4;
59+
}
60+
4661
// Persisted record for one streaming table (CREATE STREAMING TABLE).
4762
// On restart, the engine re-submits each record to JobManager to resume the pipeline.
4863
message StreamingTableDefinition {
@@ -58,6 +73,11 @@ message StreamingTableDefinition {
5873
// Last globally-committed checkpoint epoch.
5974
// Updated by JobManager after all operators ACK. Used for crash recovery.
6075
uint64 latest_checkpoint_epoch = 6;
76+
77+
// Kafka source per-subtask offsets at the same committed epoch as `latest_checkpoint_epoch`.
78+
// Populated by the runtime coordinator from source checkpoint ACKs. Optional `.bin` files under
79+
// the job state dir may exist only for local recovery materialization from this field.
80+
repeated KafkaSourceSubtaskCheckpoint kafka_source_checkpoints = 7;
6181
}
6282

6383
// =============================================================================

src/runtime/streaming/api/context.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ use std::time::{Duration, SystemTime};
1616

1717
use anyhow::{Context, Result, anyhow};
1818
use arrow_array::RecordBatch;
19+
use protocol::storage::KafkaSourceSubtaskCheckpoint;
20+
use tokio::sync::mpsc;
1921

2022
use crate::runtime::memory::{MemoryBlock, MemoryPool, get_array_memory_size};
2123
use crate::runtime::streaming::network::endpoint::PhysicalSender;
24+
use crate::runtime::streaming::protocol::control::JobMasterEvent;
2225
use crate::runtime::streaming::protocol::event::{StreamEvent, TrackedEvent};
2326
use crate::runtime::streaming::state::IoManager;
2427

@@ -74,6 +77,9 @@ pub struct TaskContext {
7477

7578
/// Last globally-committed safe epoch for crash recovery.
7679
safe_epoch: u64,
80+
81+
/// When set, pipelines report checkpoint completion (and optional Kafka offsets) to the job coordinator.
82+
checkpoint_ack_tx: Option<mpsc::Sender<JobMasterEvent>>,
7783
}
7884

7985
impl TaskContext {
@@ -90,6 +96,7 @@ impl TaskContext {
9096
pipeline_state_memory_block: Option<Arc<MemoryBlock>>,
9197
operator_state_memory_bytes: u64,
9298
safe_epoch: u64,
99+
checkpoint_ack_tx: Option<mpsc::Sender<JobMasterEvent>>,
93100
) -> Self {
94101
let task_name = format!(
95102
"Task-[{}]-Pipe[{}]-Sub[{}/{}]",
@@ -111,6 +118,7 @@ impl TaskContext {
111118
pipeline_state_memory_block,
112119
operator_state_memory_bytes,
113120
safe_epoch,
121+
checkpoint_ack_tx,
114122
}
115123
}
116124

@@ -119,6 +127,23 @@ impl TaskContext {
119127
self.safe_epoch
120128
}
121129

130+
/// Notify the job checkpoint coordinator that this pipeline has finished the barrier for `epoch`.
131+
pub async fn send_checkpoint_ack(
132+
&self,
133+
epoch: u64,
134+
kafka_subtask: Option<KafkaSourceSubtaskCheckpoint>,
135+
) {
136+
if let Some(tx) = &self.checkpoint_ack_tx {
137+
let _ = tx
138+
.send(JobMasterEvent::CheckpointAck {
139+
pipeline_id: self.pipeline_id,
140+
epoch,
141+
kafka_subtask,
142+
})
143+
.await;
144+
}
145+
}
146+
122147
#[inline]
123148
pub fn config(&self) -> &TaskContextConfig {
124149
&self.config

src/runtime/streaming/api/operator.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,24 @@ pub trait Operator: Send + 'static {
5454
ctx: &mut TaskContext,
5555
) -> anyhow::Result<()>;
5656

57+
/// Global checkpoint **phase 2** (after metadata is durable): finalize external side effects.
58+
///
59+
/// Default is no-op. Examples of overrides: transactional Kafka sink calls
60+
/// `commit_transaction` on the producer stashed during [`Self::snapshot_state`].
5761
async fn commit_checkpoint(
5862
&mut self,
59-
_epoch: u32,
63+
epoch: u32,
6064
_ctx: &mut TaskContext,
6165
) -> anyhow::Result<()> {
66+
let _ = epoch;
67+
Ok(())
68+
}
69+
70+
/// Global checkpoint **rollback** when phase 2 must not commit (e.g. catalog persist failed).
71+
///
72+
/// Default is no-op. Transactional Kafka sink overrides with `abort_transaction` on the stashed producer.
73+
async fn abort_checkpoint(&mut self, epoch: u32, _ctx: &mut TaskContext) -> anyhow::Result<()> {
74+
let _ = epoch;
6275
Ok(())
6376
}
6477

src/runtime/streaming/api/source.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::runtime::streaming::api::context::TaskContext;
1414
use crate::sql::common::{CheckpointBarrier, Watermark};
1515
use arrow_array::RecordBatch;
1616
use async_trait::async_trait;
17+
use protocol::storage::KafkaSourceSubtaskCheckpoint;
1718

1819
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1920
pub enum SourceOffset {
@@ -31,6 +32,12 @@ pub enum SourceEvent {
3132
EndOfStream,
3233
}
3334

35+
/// Optional metadata returned when a source completes a checkpoint barrier snapshot.
36+
#[derive(Debug, Default, Clone)]
37+
pub struct SourceCheckpointReport {
38+
pub kafka_subtask: Option<KafkaSourceSubtaskCheckpoint>,
39+
}
40+
3441
#[async_trait]
3542
pub trait SourceOperator: Send + 'static {
3643
fn name(&self) -> &str;
@@ -49,13 +56,22 @@ pub trait SourceOperator: Send + 'static {
4956
&mut self,
5057
barrier: CheckpointBarrier,
5158
ctx: &mut TaskContext,
52-
) -> anyhow::Result<()>;
59+
) -> anyhow::Result<SourceCheckpointReport>;
5360

61+
/// Same checkpoint **phase 2** hook as [`super::operator::Operator::commit_checkpoint`].
62+
/// Kafka source keeps the default: offsets are reported at the barrier in [`Self::snapshot_state`].
5463
async fn commit_checkpoint(
5564
&mut self,
56-
_epoch: u32,
65+
epoch: u32,
5766
_ctx: &mut TaskContext,
5867
) -> anyhow::Result<()> {
68+
let _ = epoch;
69+
Ok(())
70+
}
71+
72+
/// Same rollback hook as [`super::operator::Operator::abort_checkpoint`].
73+
async fn abort_checkpoint(&mut self, epoch: u32, _ctx: &mut TaskContext) -> anyhow::Result<()> {
74+
let _ = epoch;
5975
Ok(())
6076
}
6177

src/runtime/streaming/execution/operator_chain.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ impl OperatorDrive for IntermediateDriver {
158158
ControlCommand::Commit { epoch } => {
159159
self.operator.commit_checkpoint(*epoch, ctx).await?;
160160
}
161+
ControlCommand::AbortCheckpoint { epoch } => {
162+
self.operator.abort_checkpoint(*epoch, ctx).await?;
163+
}
161164
ControlCommand::Stop { mode } if *mode == StopMode::Immediate => {
162165
stop = true;
163166
}
@@ -273,6 +276,9 @@ impl OperatorDrive for TailDriver {
273276
ControlCommand::Commit { epoch } => {
274277
self.operator.commit_checkpoint(*epoch, ctx).await?;
275278
}
279+
ControlCommand::AbortCheckpoint { epoch } => {
280+
self.operator.abort_checkpoint(*epoch, ctx).await?;
281+
}
276282
ControlCommand::Stop { mode } if *mode == StopMode::Immediate => {
277283
stop = true;
278284
}

src/runtime/streaming/execution/pipeline.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ impl Pipeline {
110110
}
111111
}
112112
AlignmentStatus::Complete => {
113+
let epoch = barrier.epoch as u64;
113114
self.chain_head
114115
.process_event(
115116
idx,
@@ -123,6 +124,7 @@ impl Pipeline {
123124
active_streams.insert(i, stream);
124125
}
125126
}
127+
self.ctx.send_checkpoint_ack(epoch, None).await;
126128
}
127129
}
128130
}

src/runtime/streaming/execution/source_driver.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tokio::time::{Instant, sleep};
1515
use tracing::{Instrument, info, info_span, warn};
1616

1717
use crate::runtime::streaming::api::context::TaskContext;
18-
use crate::runtime::streaming::api::source::{SourceEvent, SourceOperator};
18+
use crate::runtime::streaming::api::source::{SourceCheckpointReport, SourceEvent, SourceOperator};
1919
use crate::runtime::streaming::error::RunError;
2020
use crate::runtime::streaming::execution::OperatorDrive;
2121
use crate::runtime::streaming::protocol::{
@@ -154,18 +154,25 @@ impl SourceDriver {
154154

155155
async fn handle_control(&mut self, cmd: ControlCommand) -> Result<bool, RunError> {
156156
let mut stop = false;
157+
let mut pending_source_checkpoint: Option<(u64, SourceCheckpointReport)> = None;
157158

158159
match &cmd {
159160
ControlCommand::TriggerCheckpoint { barrier } => {
160161
let b: CheckpointBarrier = barrier.clone().into();
161-
self.operator.snapshot_state(b, &mut self.ctx).await?;
162+
let report = self.operator.snapshot_state(b, &mut self.ctx).await?;
162163
self.dispatch_event(StreamEvent::Barrier(b)).await?;
164+
pending_source_checkpoint = Some((b.epoch as u64, report));
163165
}
164166
ControlCommand::Commit { epoch } => {
165167
self.operator
166168
.commit_checkpoint(*epoch, &mut self.ctx)
167169
.await?;
168170
}
171+
ControlCommand::AbortCheckpoint { epoch } => {
172+
self.operator
173+
.abort_checkpoint(*epoch, &mut self.ctx)
174+
.await?;
175+
}
169176
ControlCommand::Stop { .. } => {
170177
stop = true;
171178
}
@@ -178,6 +185,12 @@ impl SourceDriver {
178185
stop = true;
179186
}
180187

188+
if let Some((epoch, report)) = pending_source_checkpoint {
189+
self.ctx
190+
.send_checkpoint_ack(epoch, report.kafka_subtask)
191+
.await;
192+
}
193+
181194
Ok(stop)
182195
}
183196

src/runtime/streaming/factory/connector/kafka.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,13 @@ impl KafkaConnectorDispatcher {
200200
let client_configs = merge_client_configs(&cfg.auth, &cfg.client_configs);
201201

202202
let consistency = match cfg.commit_mode() {
203-
KafkaSinkCommitMode::KafkaSinkExactlyOnce => ConsistencyMode::ExactlyOnce,
203+
KafkaSinkCommitMode::KafkaSinkExactlyOnce => {
204+
info!(
205+
topic = %cfg.topic,
206+
"Kafka sink exactly-once: transactional producer + checkpoint 2PC. Downstream Kafka consumers of this topic should set isolation.level=read_committed."
207+
);
208+
ConsistencyMode::ExactlyOnce
209+
}
204210
KafkaSinkCommitMode::KafkaSinkAtLeastOnce => ConsistencyMode::AtLeastOnce,
205211
};
206212

0 commit comments

Comments
 (0)