Skip to content

Commit 02f363d

Browse files
committed
update
1 parent 73aad1c commit 02f363d

2 files changed

Lines changed: 187 additions & 141 deletions

File tree

src/sql/physical/codec.rs

Lines changed: 127 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,20 @@ use crate::sql::analysis::UNNESTED_COL;
3131
use crate::sql::common::constants::{mem_exec_join_side, window_function_udf};
3232
use crate::sql::physical::cdc::{CdcDebeziumPackExec, CdcDebeziumUnrollExec};
3333
use crate::sql::physical::source_exec::{
34-
FsMemExec, RecordBatchVecReader, RwLockRecordBatchReader, UnboundedRecordBatchReader,
34+
BufferedBatchesExec, InjectableSingleBatchExec, MpscReceiverStreamExec, PlanningPlaceholderExec,
3535
};
3636
use crate::sql::physical::udfs::window;
3737

38-
#[derive(Debug)]
39-
pub struct FsPhysicalExtensionCodec {
40-
pub context: DecodingContext,
41-
}
42-
43-
impl Default for FsPhysicalExtensionCodec {
44-
fn default() -> Self {
45-
Self {
46-
context: DecodingContext::None,
47-
}
48-
}
49-
}
38+
// ============================================================================
39+
// StreamingExtensionCodec & StreamingDecodingContext
40+
// ============================================================================
5041

42+
/// Worker-side context used when deserializing a physical plan from the coordinator.
43+
///
44+
/// Planning uses [`PlanningPlaceholderExec`]; at runtime this selects the real source
45+
/// implementation (locked batch, MPSC stream, join sides, etc.).
5146
#[derive(Debug)]
52-
pub enum DecodingContext {
47+
pub enum StreamingDecodingContext {
5348
None,
5449
Planning,
5550
SingleLockedBatch(Arc<std::sync::RwLock<Option<RecordBatch>>>),
@@ -65,77 +60,102 @@ pub enum DecodingContext {
6560
},
6661
}
6762

68-
impl PhysicalExtensionCodec for FsPhysicalExtensionCodec {
63+
/// Codec for custom streaming physical extension nodes (`FsExecNode` protobuf).
64+
#[derive(Debug)]
65+
pub struct StreamingExtensionCodec {
66+
pub context: StreamingDecodingContext,
67+
}
68+
69+
impl Default for StreamingExtensionCodec {
70+
fn default() -> Self {
71+
Self {
72+
context: StreamingDecodingContext::None,
73+
}
74+
}
75+
}
76+
77+
impl PhysicalExtensionCodec for StreamingExtensionCodec {
6978
fn try_decode(
7079
&self,
7180
buf: &[u8],
7281
inputs: &[Arc<dyn ExecutionPlan>],
7382
_registry: &dyn FunctionRegistry,
7483
) -> Result<Arc<dyn ExecutionPlan>> {
75-
let exec: FsExecNode = Message::decode(buf)
76-
.map_err(|err| DataFusionError::Internal(format!("couldn't deserialize: {err}")))?;
84+
let exec: FsExecNode = Message::decode(buf).map_err(|err| {
85+
DataFusionError::Internal(format!("Failed to deserialize FsExecNode protobuf: {err}"))
86+
})?;
7787

78-
let node = exec
79-
.node
80-
.ok_or_else(|| DataFusionError::Internal("exec node is empty".to_string()))?;
88+
let node = exec.node.ok_or_else(|| {
89+
DataFusionError::Internal("Decoded FsExecNode contains no inner node data".to_string())
90+
})?;
8191

8292
match node {
83-
Node::MemExec(mem) => self.decode_mem_exec(mem),
93+
Node::MemExec(mem) => self.decode_placeholder_exec(mem),
8494
Node::UnnestExec(unnest) => decode_unnest_exec(unnest, inputs),
85-
Node::DebeziumDecode(debezium) => decode_debezium_decode(debezium, inputs),
86-
Node::DebeziumEncode(debezium) => decode_debezium_encode(debezium, inputs),
95+
Node::DebeziumDecode(debezium) => decode_debezium_unroll(debezium, inputs),
96+
Node::DebeziumEncode(debezium) => decode_debezium_pack(debezium, inputs),
8797
}
8898
}
8999

90100
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
91101
let mut proto = None;
92102

93-
if let Some(table) = node.as_any().downcast_ref::<FsMemExec>() {
103+
if let Some(table) = node.as_any().downcast_ref::<PlanningPlaceholderExec>() {
104+
let schema_json = serde_json::to_string(&table.schema).map_err(|e| {
105+
DataFusionError::Internal(format!("Failed to serialize schema to JSON: {e}"))
106+
})?;
107+
94108
proto = Some(FsExecNode {
95109
node: Some(Node::MemExec(MemExecNode {
96110
table_name: table.table_name.clone(),
97-
schema: serde_json::to_string(&table.schema).unwrap(),
111+
schema: schema_json,
98112
})),
99113
});
100-
}
114+
} else if let Some(unnest) = node.as_any().downcast_ref::<UnnestExec>() {
115+
let schema_json = serde_json::to_string(&unnest.schema()).map_err(|e| {
116+
DataFusionError::Internal(format!("Failed to serialize unnest schema to JSON: {e}"))
117+
})?;
101118

102-
if let Some(unnest) = node.as_any().downcast_ref::<UnnestExec>() {
103119
proto = Some(FsExecNode {
104120
node: Some(Node::UnnestExec(UnnestExecNode {
105-
schema: serde_json::to_string(&unnest.schema()).unwrap(),
121+
schema: schema_json,
106122
})),
107123
});
108-
}
124+
} else if let Some(decode) = node.as_any().downcast_ref::<CdcDebeziumUnrollExec>() {
125+
let schema_json = serde_json::to_string(decode.schema().as_ref()).map_err(|e| {
126+
DataFusionError::Internal(format!("Failed to serialize CDC unroll schema: {e}"))
127+
})?;
109128

110-
if let Some(decode) = node.as_any().downcast_ref::<CdcDebeziumUnrollExec>() {
111129
proto = Some(FsExecNode {
112130
node: Some(Node::DebeziumDecode(DebeziumDecodeNode {
113-
schema: serde_json::to_string(decode.schema().as_ref()).unwrap(),
131+
schema: schema_json,
114132
primary_keys: decode
115133
.primary_key_indices()
116134
.iter()
117-
.map(|c| *c as u64)
135+
.map(|&c| c as u64)
118136
.collect(),
119137
})),
120138
});
121-
}
139+
} else if let Some(encode) = node.as_any().downcast_ref::<CdcDebeziumPackExec>() {
140+
let schema_json = serde_json::to_string(encode.schema().as_ref()).map_err(|e| {
141+
DataFusionError::Internal(format!("Failed to serialize CDC pack schema: {e}"))
142+
})?;
122143

123-
if let Some(encode) = node.as_any().downcast_ref::<CdcDebeziumPackExec>() {
124144
proto = Some(FsExecNode {
125145
node: Some(Node::DebeziumEncode(DebeziumEncodeNode {
126-
schema: serde_json::to_string(encode.schema().as_ref()).unwrap(),
146+
schema: schema_json,
127147
})),
128148
});
129149
}
130150

131-
if let Some(node) = proto {
132-
node.encode(buf).map_err(|err| {
133-
DataFusionError::Internal(format!("couldn't serialize exec node {err}"))
151+
if let Some(proto_node) = proto {
152+
proto_node.encode(buf).map_err(|err| {
153+
DataFusionError::Internal(format!("Failed to encode protobuf node: {err}"))
134154
})?;
135155
Ok(())
136156
} else {
137157
Err(DataFusionError::Internal(format!(
138-
"cannot serialize {node:?}"
158+
"Cannot serialize unknown physical plan node: {node:?}"
139159
)))
140160
}
141161
}
@@ -144,55 +164,62 @@ impl PhysicalExtensionCodec for FsPhysicalExtensionCodec {
144164
if name == window_function_udf::NAME {
145165
return Ok(window());
146166
}
147-
not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
167+
not_impl_err!("PhysicalExtensionCodec does not support scalar function '{name}'")
148168
}
149169
}
150170

151-
impl FsPhysicalExtensionCodec {
152-
fn decode_mem_exec(&self, mem_exec: MemExecNode) -> Result<Arc<dyn ExecutionPlan>> {
171+
impl StreamingExtensionCodec {
172+
fn decode_placeholder_exec(&self, mem_exec: MemExecNode) -> Result<Arc<dyn ExecutionPlan>> {
153173
let schema: Schema = serde_json::from_str(&mem_exec.schema).map_err(|e| {
154-
DataFusionError::Internal(format!("invalid schema in exec codec: {e:?}"))
174+
DataFusionError::Internal(format!("Invalid schema JSON in exec codec: {e:?}"))
155175
})?;
156176
let schema = Arc::new(schema);
177+
157178
match &self.context {
158-
DecodingContext::SingleLockedBatch(single_batch) => Ok(Arc::new(
159-
RwLockRecordBatchReader::new(schema, single_batch.clone()),
179+
StreamingDecodingContext::SingleLockedBatch(single_batch) => Ok(Arc::new(
180+
InjectableSingleBatchExec::new(schema, single_batch.clone()),
160181
)),
161-
DecodingContext::UnboundedBatchStream(unbounded_stream) => Ok(Arc::new(
162-
UnboundedRecordBatchReader::new(schema, unbounded_stream.clone()),
182+
StreamingDecodingContext::UnboundedBatchStream(unbounded_stream) => Ok(Arc::new(
183+
MpscReceiverStreamExec::new(schema, unbounded_stream.clone()),
163184
)),
164-
DecodingContext::LockedBatchVec(locked_batches) => Ok(Arc::new(
165-
RecordBatchVecReader::new(schema, locked_batches.clone()),
185+
StreamingDecodingContext::LockedBatchVec(locked_batches) => Ok(Arc::new(
186+
BufferedBatchesExec::new(schema, locked_batches.clone()),
166187
)),
167-
DecodingContext::Planning => Ok(Arc::new(FsMemExec::new(mem_exec.table_name, schema))),
168-
DecodingContext::None => Err(DataFusionError::Internal(
169-
"Need an internal context to decode".into(),
188+
StreamingDecodingContext::Planning => Ok(Arc::new(PlanningPlaceholderExec::new(
189+
mem_exec.table_name,
190+
schema,
191+
))),
192+
StreamingDecodingContext::None => Err(DataFusionError::Internal(
193+
"A valid StreamingDecodingContext is required to decode placeholders into execution streams.".into(),
170194
)),
171-
DecodingContext::LockedJoinPair { left, right } => match mem_exec.table_name.as_str() {
172-
mem_exec_join_side::LEFT => {
173-
Ok(Arc::new(RwLockRecordBatchReader::new(schema, left.clone())))
195+
StreamingDecodingContext::LockedJoinPair { left, right } => {
196+
match mem_exec.table_name.as_str() {
197+
mem_exec_join_side::LEFT => Ok(Arc::new(InjectableSingleBatchExec::new(
198+
schema,
199+
left.clone(),
200+
))),
201+
mem_exec_join_side::RIGHT => Ok(Arc::new(InjectableSingleBatchExec::new(
202+
schema,
203+
right.clone(),
204+
))),
205+
_ => Err(DataFusionError::Internal(format!(
206+
"Unknown join side table name: {}",
207+
mem_exec.table_name
208+
))),
174209
}
175-
mem_exec_join_side::RIGHT => Ok(Arc::new(RwLockRecordBatchReader::new(
176-
schema,
177-
right.clone(),
178-
))),
179-
_ => Err(DataFusionError::Internal(format!(
180-
"unknown table name {}",
181-
mem_exec.table_name
182-
))),
183-
},
184-
DecodingContext::LockedJoinStream { left, right } => {
210+
}
211+
StreamingDecodingContext::LockedJoinStream { left, right } => {
185212
match mem_exec.table_name.as_str() {
186-
mem_exec_join_side::LEFT => Ok(Arc::new(UnboundedRecordBatchReader::new(
213+
mem_exec_join_side::LEFT => Ok(Arc::new(MpscReceiverStreamExec::new(
187214
schema,
188215
left.clone(),
189216
))),
190-
mem_exec_join_side::RIGHT => Ok(Arc::new(UnboundedRecordBatchReader::new(
217+
mem_exec_join_side::RIGHT => Ok(Arc::new(MpscReceiverStreamExec::new(
191218
schema,
192219
right.clone(),
193220
))),
194221
_ => Err(DataFusionError::Internal(format!(
195-
"unknown table name {}",
222+
"Unknown join side table name: {}",
196223
mem_exec.table_name
197224
))),
198225
}
@@ -206,19 +233,20 @@ fn decode_unnest_exec(
206233
inputs: &[Arc<dyn ExecutionPlan>],
207234
) -> Result<Arc<dyn ExecutionPlan>> {
208235
let schema: Schema = serde_json::from_str(&unnest.schema)
209-
.map_err(|e| DataFusionError::Internal(format!("invalid schema in exec codec: {e:?}")))?;
236+
.map_err(|e| DataFusionError::Internal(format!("Invalid unnest schema JSON: {e:?}")))?;
210237

211238
let column = schema.index_of(UNNESTED_COL).map_err(|_| {
212239
DataFusionError::Internal(format!(
213-
"unnest node schema does not contain {UNNESTED_COL} col"
240+
"Unnest schema missing required column: {UNNESTED_COL}"
214241
))
215242
})?;
216243

244+
let input = inputs.first().ok_or_else(|| {
245+
DataFusionError::Internal("UnnestExec requires exactly one input plan".to_string())
246+
})?;
247+
217248
Ok(Arc::new(UnnestExec::new(
218-
inputs
219-
.first()
220-
.ok_or_else(|| DataFusionError::Internal("no input for unnest node".to_string()))?
221-
.clone(),
249+
input.clone(),
222250
vec![ListUnnest {
223251
index_in_input_schema: column,
224252
depth: 1,
@@ -229,45 +257,55 @@ fn decode_unnest_exec(
229257
)))
230258
}
231259

232-
fn decode_debezium_decode(
260+
fn decode_debezium_unroll(
233261
debezium: DebeziumDecodeNode,
234262
inputs: &[Arc<dyn ExecutionPlan>],
235263
) -> Result<Arc<dyn ExecutionPlan>> {
236264
let schema = Arc::new(
237265
serde_json::from_str::<Schema>(&debezium.schema).map_err(|e| {
238-
DataFusionError::Internal(format!("invalid schema in exec codec: {e:?}"))
266+
DataFusionError::Internal(format!("Invalid DebeziumDecode schema JSON: {e:?}"))
239267
})?,
240268
);
241-
let input = inputs
242-
.first()
243-
.ok_or_else(|| DataFusionError::Internal("no input for debezium node".to_string()))?
244-
.clone();
269+
270+
let input = inputs.first().ok_or_else(|| {
271+
DataFusionError::Internal(
272+
"CdcDebeziumUnrollExec requires exactly one input plan".to_string(),
273+
)
274+
})?;
275+
245276
let primary_keys = debezium
246277
.primary_keys
247278
.into_iter()
248279
.map(|c| c as usize)
249280
.collect();
281+
250282
Ok(Arc::new(CdcDebeziumUnrollExec::from_decoded_parts(
251-
input,
252-
schema.clone(),
283+
input.clone(),
284+
schema,
253285
primary_keys,
254286
)))
255287
}
256288

257-
fn decode_debezium_encode(
289+
fn decode_debezium_pack(
258290
debezium: DebeziumEncodeNode,
259291
inputs: &[Arc<dyn ExecutionPlan>],
260292
) -> Result<Arc<dyn ExecutionPlan>> {
261293
let schema = Arc::new(
262294
serde_json::from_str::<Schema>(&debezium.schema).map_err(|e| {
263-
DataFusionError::Internal(format!("invalid schema in exec codec: {e:?}"))
295+
DataFusionError::Internal(format!("Invalid DebeziumEncode schema JSON: {e:?}"))
264296
})?,
265297
);
266-
let input = inputs
267-
.first()
268-
.ok_or_else(|| DataFusionError::Internal("no input for debezium node".to_string()))?
269-
.clone();
298+
299+
let input = inputs.first().ok_or_else(|| {
300+
DataFusionError::Internal("CdcDebeziumPackExec requires exactly one input plan".to_string())
301+
})?;
302+
270303
Ok(Arc::new(CdcDebeziumPackExec::from_decoded_parts(
271-
input, schema,
304+
input.clone(),
305+
schema,
272306
)))
273307
}
308+
309+
// Historical names (same types) — keep existing `use crate::sql::physical::FsPhysicalExtensionCodec` working.
310+
pub type FsPhysicalExtensionCodec = StreamingExtensionCodec;
311+
pub type DecodingContext = StreamingDecodingContext;

0 commit comments

Comments
 (0)