Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,11 @@ impl DefaultPhysicalPlanner {
.scan(session_state, projection.as_ref(), &filters, *fetch)
.await?
}
LogicalPlan::Values(Values { values, schema }) => {
LogicalPlan::Values(Values {
values,
schema,
has_placeholders,
}) => {
let exec_schema = schema.as_ref().to_owned().into();
let exprs = values
.iter()
Expand All @@ -465,7 +469,11 @@ impl DefaultPhysicalPlanner {
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
})
.collect::<Result<Vec<_>>>()?;
let value_exec = ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?;
let value_exec = ValuesExec::try_new(
SchemaRef::new(exec_schema),
exprs,
*has_placeholders,
)?;
Arc::new(value_exec)
}
LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down
20 changes: 20 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,26 @@ impl Expr {
Expr::IsNotNull(Box::new(self))
}

/// Return true if this expression is a placeholder
pub fn is_placeholder(&self) -> bool {
matches!(self, Self::Placeholder(_))
}

/// Return true if this expression has at least one placeholder
pub fn has_placeholders(&self) -> bool {
let mut has_placeholders = false;
self.apply(|expr| {
has_placeholders |= expr.is_placeholder();
Ok(if has_placeholders {
TreeNodeRecursion::Stop
} else {
TreeNodeRecursion::Continue
})
})
.unwrap();
has_placeholders
}

/// Create a sort configuration from an existing expression.
///
/// ```
Expand Down
8 changes: 7 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,14 @@ impl LogicalPlanBuilder {
}
}

let mut has_placeholders = false;
let empty_schema = DFSchema::empty();
let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
for j in 0..n_cols {
let mut common_type: Option<DataType> = None;
for (i, row) in values.iter().enumerate() {
let value = &row[j];
has_placeholders |= value.has_placeholders();
let data_type = value.get_type(&empty_schema)?;
if data_type == DataType::Null {
continue;
Expand Down Expand Up @@ -238,7 +240,11 @@ impl LogicalPlanBuilder {
.collect::<Vec<_>>();
let dfschema = DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?;
let schema = DFSchemaRef::new(dfschema);
Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
Ok(Self::new(LogicalPlan::Values(Values {
schema,
values,
has_placeholders,
})))
}

/// Convert a table provider into a builder with a TableScan
Expand Down
33 changes: 27 additions & 6 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,17 @@ impl LogicalPlan {
}) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
LogicalPlan::Dml(_) => Ok(self),
LogicalPlan::Copy(_) => Ok(self),
LogicalPlan::Values(Values { schema, values }) => {
LogicalPlan::Values(Values {
schema,
values,
has_placeholders,
}) => {
// todo it isn't clear why the schema is not recomputed here
Ok(LogicalPlan::Values(Values { schema, values }))
Ok(LogicalPlan::Values(Values {
schema,
values,
has_placeholders,
}))
}
LogicalPlan::Filter(Filter {
predicate,
Expand Down Expand Up @@ -842,12 +850,23 @@ impl LogicalPlan {
}
LogicalPlan::Values(Values { schema, .. }) => {
self.assert_no_inputs(inputs)?;
let mut has_placeholders = false;
let values = expr
.chunks_exact(schema.fields().len())
.map(|s| {
s.iter()
.map(|expr| {
has_placeholders |= expr.has_placeholders();
expr.clone()
})
.collect()
})
.collect();

Ok(LogicalPlan::Values(Values {
schema: Arc::clone(schema),
values: expr
.chunks_exact(schema.fields().len())
.map(|s| s.to_vec())
.collect(),
values,
has_placeholders,
}))
}
LogicalPlan::Filter { .. } => {
Expand Down Expand Up @@ -2064,6 +2083,8 @@ pub struct Values {
pub schema: DFSchemaRef,
/// Values
pub values: Vec<Vec<Expr>>,
/// True if at least one of values is a placeholder.
pub has_placeholders: bool,
}

/// Evaluates an arbitrary list of expressions (essentially a
Expand Down
29 changes: 23 additions & 6 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,29 @@ impl LogicalPlan {
schema,
})
}),
LogicalPlan::Values(Values { schema, values }) => values
.into_iter()
.map_until_stop_and_collect(|value| {
value.into_iter().map_until_stop_and_collect(&mut f)
})?
.update_data(|values| LogicalPlan::Values(Values { schema, values })),
LogicalPlan::Values(Values {
schema,
values,
has_placeholders: _,
}) => {
let mut has_placeholders = false;
values
.into_iter()
.map_until_stop_and_collect(|value| {
value.into_iter().map_until_stop_and_collect(|expr| {
let res = f(expr)?;
has_placeholders |= res.data.has_placeholders();
Ok(res)
})
})?
.update_data(|values| {
LogicalPlan::Values(Values {
schema,
values,
has_placeholders,
})
})
}
LogicalPlan::Filter(Filter {
predicate,
input,
Expand Down
Loading
Loading