Skip to content

Commit 2cb7612

Browse files
author
Альберт Скальт
committed
values: support physical placeholders
This patch adds a physical placeholders support for `ValuesExec`. 1) Planner keeps a flag that tells whether there are placeholders within a Values Logical Plan. This is known when SQL is turning into the logical plan. 2) This flag is updated each time when expressions are mapped. 3) On the physical planning stage this flag is passed into `ValuesExec` constructor. 4) `ValuesExec` now has 2 work modes: (a) When an input batch is resolved. It do the same as before. (b) When it is known that an input batch contains placeholders. Then placeholders will be resolved in execution time.
1 parent 3b3f850 commit 2cb7612

File tree

8 files changed

+254
-84
lines changed

8 files changed

+254
-84
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,11 @@ impl DefaultPhysicalPlanner {
453453
.scan(session_state, projection.as_ref(), &filters, *fetch)
454454
.await?
455455
}
456-
LogicalPlan::Values(Values { values, schema }) => {
456+
LogicalPlan::Values(Values {
457+
values,
458+
schema,
459+
has_placeholders,
460+
}) => {
457461
let exec_schema = schema.as_ref().to_owned().into();
458462
let exprs = values
459463
.iter()
@@ -465,7 +469,11 @@ impl DefaultPhysicalPlanner {
465469
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
466470
})
467471
.collect::<Result<Vec<_>>>()?;
468-
let value_exec = ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?;
472+
let value_exec = ValuesExec::try_new(
473+
SchemaRef::new(exec_schema),
474+
exprs,
475+
*has_placeholders,
476+
)?;
469477
Arc::new(value_exec)
470478
}
471479
LogicalPlan::EmptyRelation(EmptyRelation {

datafusion/expr/src/expr.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,26 @@ impl Expr {
13741374
Expr::IsNotNull(Box::new(self))
13751375
}
13761376

1377+
/// Return true if this expression is a placeholder
1378+
pub fn is_placeholder(&self) -> bool {
1379+
matches!(self, Self::Placeholder(_))
1380+
}
1381+
1382+
/// Return true if this expression has at least one placeholder
1383+
pub fn has_placeholders(&self) -> bool {
1384+
let mut has_placeholders = false;
1385+
self.apply(|expr| {
1386+
has_placeholders |= expr.is_placeholder();
1387+
Ok(if has_placeholders {
1388+
TreeNodeRecursion::Stop
1389+
} else {
1390+
TreeNodeRecursion::Continue
1391+
})
1392+
})
1393+
.unwrap();
1394+
has_placeholders
1395+
}
1396+
13771397
/// Create a sort configuration from an existing expression.
13781398
///
13791399
/// ```

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,14 @@ impl LogicalPlanBuilder {
194194
}
195195
}
196196

197+
let mut has_placeholders = false;
197198
let empty_schema = DFSchema::empty();
198199
let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
199200
for j in 0..n_cols {
200201
let mut common_type: Option<DataType> = None;
201202
for (i, row) in values.iter().enumerate() {
202203
let value = &row[j];
204+
has_placeholders |= value.has_placeholders();
203205
let data_type = value.get_type(&empty_schema)?;
204206
if data_type == DataType::Null {
205207
continue;
@@ -238,7 +240,11 @@ impl LogicalPlanBuilder {
238240
.collect::<Vec<_>>();
239241
let dfschema = DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?;
240242
let schema = DFSchemaRef::new(dfschema);
241-
Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
243+
Ok(Self::new(LogicalPlan::Values(Values {
244+
schema,
245+
values,
246+
has_placeholders,
247+
})))
242248
}
243249

244250
/// Convert a table provider into a builder with a TableScan

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -635,9 +635,17 @@ impl LogicalPlan {
635635
}) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
636636
LogicalPlan::Dml(_) => Ok(self),
637637
LogicalPlan::Copy(_) => Ok(self),
638-
LogicalPlan::Values(Values { schema, values }) => {
638+
LogicalPlan::Values(Values {
639+
schema,
640+
values,
641+
has_placeholders,
642+
}) => {
639643
// todo it isn't clear why the schema is not recomputed here
640-
Ok(LogicalPlan::Values(Values { schema, values }))
644+
Ok(LogicalPlan::Values(Values {
645+
schema,
646+
values,
647+
has_placeholders,
648+
}))
641649
}
642650
LogicalPlan::Filter(Filter {
643651
predicate,
@@ -842,12 +850,23 @@ impl LogicalPlan {
842850
}
843851
LogicalPlan::Values(Values { schema, .. }) => {
844852
self.assert_no_inputs(inputs)?;
853+
let mut has_placeholders = false;
854+
let values = expr
855+
.chunks_exact(schema.fields().len())
856+
.map(|s| {
857+
s.iter()
858+
.map(|expr| {
859+
has_placeholders |= expr.has_placeholders();
860+
expr.clone()
861+
})
862+
.collect()
863+
})
864+
.collect();
865+
845866
Ok(LogicalPlan::Values(Values {
846867
schema: Arc::clone(schema),
847-
values: expr
848-
.chunks_exact(schema.fields().len())
849-
.map(|s| s.to_vec())
850-
.collect(),
868+
values,
869+
has_placeholders,
851870
}))
852871
}
853872
LogicalPlan::Filter { .. } => {
@@ -2064,6 +2083,8 @@ pub struct Values {
20642083
pub schema: DFSchemaRef,
20652084
/// Values
20662085
pub values: Vec<Vec<Expr>>,
2086+
/// True if at least one of values is a placeholder.
2087+
pub has_placeholders: bool,
20672088
}
20682089

20692090
/// Evaluates an arbitrary list of expressions (essentially a

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -557,12 +557,29 @@ impl LogicalPlan {
557557
schema,
558558
})
559559
}),
560-
LogicalPlan::Values(Values { schema, values }) => values
561-
.into_iter()
562-
.map_until_stop_and_collect(|value| {
563-
value.into_iter().map_until_stop_and_collect(&mut f)
564-
})?
565-
.update_data(|values| LogicalPlan::Values(Values { schema, values })),
560+
LogicalPlan::Values(Values {
561+
schema,
562+
values,
563+
has_placeholders: _,
564+
}) => {
565+
let mut has_placeholders = false;
566+
values
567+
.into_iter()
568+
.map_until_stop_and_collect(|value| {
569+
value.into_iter().map_until_stop_and_collect(|expr| {
570+
let res = f(expr)?;
571+
has_placeholders |= res.data.has_placeholders();
572+
Ok(res)
573+
})
574+
})?
575+
.update_data(|values| {
576+
LogicalPlan::Values(Values {
577+
schema,
578+
values,
579+
has_placeholders,
580+
})
581+
})
582+
}
566583
LogicalPlan::Filter(Filter {
567584
predicate,
568585
input,

0 commit comments

Comments
 (0)