Skip to content

Commit b0dcfca

Browse files
committed
update
1 parent 3d63ad6 commit b0dcfca

11 files changed

Lines changed: 1001 additions & 789 deletions

File tree

src/runtime/streaming/api/context.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,22 @@ use crate::runtime::streaming::protocol::event::TrackedEvent;
1717

1818
use arrow_array::RecordBatch;
1919
use std::sync::Arc;
20+
use std::time::Duration;
21+
22+
/// 与单个子任务绑定的运行时参数(可由 `TaskContext::new` 默认填充,后续可扩展为从 Job 配置注入)。
23+
#[derive(Debug, Clone)]
24+
pub struct TaskContextConfig {
25+
/// Source 在无数据(`SourceEvent::Idle`)时的退避休眠时长。
26+
pub source_idle_timeout: Duration,
27+
}
28+
29+
impl Default for TaskContextConfig {
30+
fn default() -> Self {
31+
Self {
32+
source_idle_timeout: Duration::from_millis(50),
33+
}
34+
}
35+
}
2036

2137
pub struct TaskContext {
2238
pub job_id: String,
@@ -29,6 +45,8 @@ pub struct TaskContext {
2945
memory_pool: Arc<MemoryPool>,
3046

3147
current_watermark: Option<std::time::SystemTime>,
48+
49+
config: TaskContextConfig,
3250
}
3351

3452
impl TaskContext {
@@ -48,9 +66,14 @@ impl TaskContext {
4866
outboxes,
4967
memory_pool,
5068
current_watermark: None,
69+
config: TaskContextConfig::default(),
5170
}
5271
}
5372

73+
pub fn config(&self) -> &TaskContextConfig {
74+
&self.config
75+
}
76+
5477
// ========================================================================
5578
// ========================================================================
5679

src/runtime/streaming/api/source.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ pub trait SourceOperator: Send + 'static {
5151
ctx: &mut TaskContext,
5252
) -> anyhow::Result<()>;
5353

54+
async fn commit_checkpoint(
55+
&mut self,
56+
_epoch: u32,
57+
_ctx: &mut TaskContext,
58+
) -> anyhow::Result<()> {
59+
Ok(())
60+
}
61+
5462
async fn on_close(&mut self, _ctx: &mut TaskContext) -> anyhow::Result<()> {
5563
Ok(())
5664
}

src/runtime/streaming/execution/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
pub mod runner;
14-
pub mod source;
13+
pub mod operator_chain;
14+
pub mod pipeline;
15+
pub mod source_driver;
1516
pub mod tracker;
17+
18+
pub use operator_chain::{ChainBuilder, OperatorDrive};
19+
pub use pipeline::Pipeline;
20+
pub use source_driver::SourceDriver;
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
use async_trait::async_trait;
14+
15+
use crate::runtime::streaming::api::context::TaskContext;
16+
use crate::runtime::streaming::api::operator::Operator;
17+
use crate::runtime::streaming::error::RunError;
18+
use crate::runtime::streaming::protocol::{
19+
control::{ControlCommand, StopMode},
20+
event::{StreamEvent, StreamOutput, TrackedEvent},
21+
};
22+
use crate::sql::common::CheckpointBarrier;
23+
24+
#[async_trait]
25+
pub trait OperatorDrive: Send {
26+
async fn on_start(&mut self, ctx: &mut TaskContext) -> Result<(), RunError>;
27+
async fn process_event(
28+
&mut self,
29+
input_idx: usize,
30+
event: TrackedEvent,
31+
ctx: &mut TaskContext,
32+
) -> Result<bool, RunError>;
33+
async fn handle_control(
34+
&mut self,
35+
cmd: ControlCommand,
36+
ctx: &mut TaskContext,
37+
) -> Result<bool, RunError>;
38+
async fn on_close(&mut self, ctx: &mut TaskContext) -> Result<(), RunError>;
39+
}
40+
41+
pub struct ChainBuilder;
42+
43+
impl ChainBuilder {
44+
pub fn build(mut operators: Vec<Box<dyn Operator>>) -> Option<Box<dyn OperatorDrive>> {
45+
let tail_operator = operators.pop()?;
46+
47+
let mut current_driver: Box<dyn OperatorDrive> = Box::new(TailDriver::new(tail_operator));
48+
49+
while let Some(op) = operators.pop() {
50+
current_driver = Box::new(IntermediateDriver::new(op, current_driver));
51+
}
52+
53+
Some(current_driver)
54+
}
55+
}
56+
57+
pub struct IntermediateDriver {
58+
operator: Box<dyn Operator>,
59+
next: Box<dyn OperatorDrive>,
60+
}
61+
62+
impl IntermediateDriver {
63+
pub fn new(operator: Box<dyn Operator>, next: Box<dyn OperatorDrive>) -> Self {
64+
Self { operator, next }
65+
}
66+
67+
async fn dispatch_outputs(
68+
&mut self,
69+
outputs: Vec<StreamOutput>,
70+
ctx: &mut TaskContext,
71+
) -> Result<(), RunError> {
72+
for out in outputs {
73+
match out {
74+
StreamOutput::Forward(b) => {
75+
self.next
76+
.process_event(0, TrackedEvent::control(StreamEvent::Data(b)), ctx)
77+
.await?;
78+
}
79+
StreamOutput::Watermark(wm) => {
80+
self.next
81+
.process_event(0, TrackedEvent::control(StreamEvent::Watermark(wm)), ctx)
82+
.await?;
83+
}
84+
StreamOutput::Keyed(_, _) | StreamOutput::Broadcast(_) => {
85+
return Err(RunError::internal(format!(
86+
"Topology Violation: Keyed or Broadcast output emitted in the middle of chain by '{}'",
87+
self.operator.name()
88+
)));
89+
}
90+
}
91+
}
92+
Ok(())
93+
}
94+
95+
async fn forward_signal(
96+
&mut self,
97+
event: StreamEvent,
98+
ctx: &mut TaskContext,
99+
) -> Result<(), RunError> {
100+
self.next
101+
.process_event(0, TrackedEvent::control(event), ctx)
102+
.await
103+
.map(|_| ())
104+
}
105+
}
106+
107+
#[async_trait]
108+
impl OperatorDrive for IntermediateDriver {
109+
async fn on_start(&mut self, ctx: &mut TaskContext) -> Result<(), RunError> {
110+
self.operator.on_start(ctx).await?;
111+
self.next.on_start(ctx).await?;
112+
Ok(())
113+
}
114+
115+
async fn process_event(
116+
&mut self,
117+
input_idx: usize,
118+
tracked: TrackedEvent,
119+
ctx: &mut TaskContext,
120+
) -> Result<bool, RunError> {
121+
match tracked.event {
122+
StreamEvent::Data(batch) => {
123+
let outputs = self.operator.process_data(input_idx, batch, ctx).await?;
124+
self.dispatch_outputs(outputs, ctx).await?;
125+
Ok(false)
126+
}
127+
StreamEvent::Watermark(wm) => {
128+
let outputs = self.operator.process_watermark(wm.clone(), ctx).await?;
129+
self.dispatch_outputs(outputs, ctx).await?;
130+
self.forward_signal(StreamEvent::Watermark(wm), ctx).await?;
131+
Ok(false)
132+
}
133+
StreamEvent::Barrier(barrier) => {
134+
self.operator.snapshot_state(barrier.clone(), ctx).await?;
135+
self.forward_signal(StreamEvent::Barrier(barrier), ctx)
136+
.await?;
137+
Ok(false)
138+
}
139+
StreamEvent::EndOfStream => {
140+
self.forward_signal(StreamEvent::EndOfStream, ctx).await?;
141+
Ok(true)
142+
}
143+
}
144+
}
145+
146+
async fn handle_control(
147+
&mut self,
148+
cmd: ControlCommand,
149+
ctx: &mut TaskContext,
150+
) -> Result<bool, RunError> {
151+
let mut stop = false;
152+
153+
match &cmd {
154+
ControlCommand::TriggerCheckpoint { barrier } => {
155+
let b: CheckpointBarrier = barrier.clone().into();
156+
self.operator.snapshot_state(b, ctx).await?;
157+
}
158+
ControlCommand::Commit { epoch } => {
159+
self.operator.commit_checkpoint(*epoch, ctx).await?;
160+
}
161+
ControlCommand::Stop { mode } if *mode == StopMode::Immediate => {
162+
stop = true;
163+
}
164+
_ => {}
165+
}
166+
167+
if self.next.handle_control(cmd, ctx).await? {
168+
stop = true;
169+
}
170+
171+
Ok(stop)
172+
}
173+
174+
async fn on_close(&mut self, ctx: &mut TaskContext) -> Result<(), RunError> {
175+
let close_outs = self.operator.on_close(ctx).await?;
176+
self.dispatch_outputs(close_outs, ctx).await?;
177+
self.next.on_close(ctx).await?;
178+
Ok(())
179+
}
180+
}
181+
182+
pub struct TailDriver {
183+
operator: Box<dyn Operator>,
184+
}
185+
186+
impl TailDriver {
187+
pub fn new(operator: Box<dyn Operator>) -> Self {
188+
Self { operator }
189+
}
190+
191+
async fn dispatch_outputs(
192+
&mut self,
193+
outputs: Vec<StreamOutput>,
194+
ctx: &mut TaskContext,
195+
) -> Result<(), RunError> {
196+
for out in outputs {
197+
match out {
198+
StreamOutput::Forward(b) => ctx.collect(b).await?,
199+
StreamOutput::Keyed(hash, b) => ctx.collect_keyed(hash, b).await?,
200+
StreamOutput::Broadcast(b) => ctx.collect(b).await?,
201+
StreamOutput::Watermark(wm) => ctx.broadcast(StreamEvent::Watermark(wm)).await?,
202+
}
203+
}
204+
Ok(())
205+
}
206+
207+
async fn forward_signal(
208+
&mut self,
209+
event: StreamEvent,
210+
ctx: &mut TaskContext,
211+
) -> Result<(), RunError> {
212+
match event {
213+
StreamEvent::Watermark(wm) => ctx.broadcast(StreamEvent::Watermark(wm)).await?,
214+
StreamEvent::Barrier(b) => ctx.broadcast(StreamEvent::Barrier(b)).await?,
215+
StreamEvent::EndOfStream => ctx.broadcast(StreamEvent::EndOfStream).await?,
216+
StreamEvent::Data(_) => unreachable!("Data signal should not be forwarded implicitly"),
217+
}
218+
Ok(())
219+
}
220+
}
221+
222+
#[async_trait]
223+
impl OperatorDrive for TailDriver {
224+
async fn on_start(&mut self, ctx: &mut TaskContext) -> Result<(), RunError> {
225+
self.operator.on_start(ctx).await?;
226+
Ok(())
227+
}
228+
229+
async fn process_event(
230+
&mut self,
231+
input_idx: usize,
232+
tracked: TrackedEvent,
233+
ctx: &mut TaskContext,
234+
) -> Result<bool, RunError> {
235+
match tracked.event {
236+
StreamEvent::Data(batch) => {
237+
let outputs = self.operator.process_data(input_idx, batch, ctx).await?;
238+
self.dispatch_outputs(outputs, ctx).await?;
239+
Ok(false)
240+
}
241+
StreamEvent::Watermark(wm) => {
242+
let outputs = self.operator.process_watermark(wm.clone(), ctx).await?;
243+
self.dispatch_outputs(outputs, ctx).await?;
244+
self.forward_signal(StreamEvent::Watermark(wm), ctx).await?;
245+
Ok(false)
246+
}
247+
StreamEvent::Barrier(barrier) => {
248+
self.operator.snapshot_state(barrier.clone(), ctx).await?;
249+
self.forward_signal(StreamEvent::Barrier(barrier), ctx)
250+
.await?;
251+
Ok(false)
252+
}
253+
StreamEvent::EndOfStream => {
254+
self.forward_signal(StreamEvent::EndOfStream, ctx).await?;
255+
Ok(true)
256+
}
257+
}
258+
}
259+
260+
async fn handle_control(
261+
&mut self,
262+
cmd: ControlCommand,
263+
ctx: &mut TaskContext,
264+
) -> Result<bool, RunError> {
265+
let mut stop = false;
266+
267+
match &cmd {
268+
ControlCommand::TriggerCheckpoint { barrier } => {
269+
let b: CheckpointBarrier = barrier.clone().into();
270+
self.operator.snapshot_state(b.clone(), ctx).await?;
271+
ctx.broadcast(StreamEvent::Barrier(b)).await?;
272+
}
273+
ControlCommand::Commit { epoch } => {
274+
self.operator.commit_checkpoint(*epoch, ctx).await?;
275+
}
276+
ControlCommand::Stop { mode } if *mode == StopMode::Immediate => {
277+
stop = true;
278+
}
279+
_ => {}
280+
}
281+
282+
Ok(stop)
283+
}
284+
285+
async fn on_close(&mut self, ctx: &mut TaskContext) -> Result<(), RunError> {
286+
let close_outs = self.operator.on_close(ctx).await?;
287+
self.dispatch_outputs(close_outs, ctx).await?;
288+
Ok(())
289+
}
290+
}

0 commit comments

Comments
 (0)