diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 5d67960315fc..cbd21702574a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -453,7 +453,11 @@ impl DefaultPhysicalPlanner { .scan(session_state, projection.as_ref(), &filters, *fetch) .await? } - LogicalPlan::Values(Values { values, schema }) => { + LogicalPlan::Values(Values { + values, + schema, + has_placeholders, + }) => { let exec_schema = schema.as_ref().to_owned().into(); let exprs = values .iter() @@ -465,7 +469,11 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - let value_exec = ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?; + let value_exec = ValuesExec::try_new( + SchemaRef::new(exec_schema), + exprs, + *has_placeholders, + )?; Arc::new(value_exec) } LogicalPlan::EmptyRelation(EmptyRelation { diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 52d407be9e17..55ed1b2b1160 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1374,6 +1374,26 @@ impl Expr { Expr::IsNotNull(Box::new(self)) } + /// Return true if this expression is a placeholder + pub fn is_placeholder(&self) -> bool { + matches!(self, Self::Placeholder(_)) + } + + /// Return true if this expression has at least one placeholder + pub fn has_placeholders(&self) -> bool { + let mut has_placeholders = false; + self.apply(|expr| { + has_placeholders |= expr.is_placeholder(); + Ok(if has_placeholders { + TreeNodeRecursion::Stop + } else { + TreeNodeRecursion::Continue + }) + }) + .unwrap(); + has_placeholders + } + /// Create a sort configuration from an existing expression. /// /// ``` diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index d7de550ff575..0cbe00cc6269 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -194,12 +194,14 @@ impl LogicalPlanBuilder { } } + let mut has_placeholders = false; let empty_schema = DFSchema::empty(); let mut field_types: Vec = Vec::with_capacity(n_cols); for j in 0..n_cols { let mut common_type: Option = None; for (i, row) in values.iter().enumerate() { let value = &row[j]; + has_placeholders |= value.has_placeholders(); let data_type = value.get_type(&empty_schema)?; if data_type == DataType::Null { continue; @@ -238,7 +240,11 @@ impl LogicalPlanBuilder { .collect::>(); let dfschema = DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?; let schema = DFSchemaRef::new(dfschema); - Ok(Self::new(LogicalPlan::Values(Values { schema, values }))) + Ok(Self::new(LogicalPlan::Values(Values { + schema, + values, + has_placeholders, + }))) } /// Convert a table provider into a builder with a TableScan diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 90dd4159be5f..50030ed670a7 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -635,9 +635,17 @@ impl LogicalPlan { }) => Projection::try_new(expr, input).map(LogicalPlan::Projection), LogicalPlan::Dml(_) => Ok(self), LogicalPlan::Copy(_) => Ok(self), - LogicalPlan::Values(Values { schema, values }) => { + LogicalPlan::Values(Values { + schema, + values, + has_placeholders, + }) => { // todo it isn't clear why the schema is not recomputed here - Ok(LogicalPlan::Values(Values { schema, values })) + Ok(LogicalPlan::Values(Values { + schema, + values, + has_placeholders, + })) } LogicalPlan::Filter(Filter { predicate, @@ -842,12 +850,23 @@ impl LogicalPlan { } LogicalPlan::Values(Values { schema, .. }) => { self.assert_no_inputs(inputs)?; + let mut has_placeholders = false; + let values = expr + .chunks_exact(schema.fields().len()) + .map(|s| { + s.iter() + .map(|expr| { + has_placeholders |= expr.has_placeholders(); + expr.clone() + }) + .collect() + }) + .collect(); + Ok(LogicalPlan::Values(Values { schema: Arc::clone(schema), - values: expr - .chunks_exact(schema.fields().len()) - .map(|s| s.to_vec()) - .collect(), + values, + has_placeholders, })) } LogicalPlan::Filter { .. } => { @@ -2064,6 +2083,8 @@ pub struct Values { pub schema: DFSchemaRef, /// Values pub values: Vec>, + /// True if at least one of values is a placeholder. + pub has_placeholders: bool, } /// Evaluates an arbitrary list of expressions (essentially a diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index ce3480a656a2..b1915b736055 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -557,12 +557,29 @@ impl LogicalPlan { schema, }) }), - LogicalPlan::Values(Values { schema, values }) => values - .into_iter() - .map_until_stop_and_collect(|value| { - value.into_iter().map_until_stop_and_collect(&mut f) - })? - .update_data(|values| LogicalPlan::Values(Values { schema, values })), + LogicalPlan::Values(Values { + schema, + values, + has_placeholders: _, + }) => { + let mut has_placeholders = false; + values + .into_iter() + .map_until_stop_and_collect(|value| { + value.into_iter().map_until_stop_and_collect(|expr| { + let res = f(expr)?; + has_placeholders |= res.data.has_placeholders(); + Ok(res) + }) + })? + .update_data(|values| { + LogicalPlan::Values(Values { + schema, + values, + has_placeholders, + }) + }) + } LogicalPlan::Filter(Filter { predicate, input, diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index e01aea1fdd6b..c000574a5262 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -31,8 +31,9 @@ use crate::{ use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; -use datafusion_common::{internal_err, plan_err, Result, ScalarValue}; +use datafusion_common::{internal_err, plan_err, ParamValues, Result, ScalarValue}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::expressions::resolve_placeholders; use datafusion_physical_expr::EquivalenceProperties; /// Execution plan for values list based relation (produces constant rows) @@ -41,60 +42,53 @@ pub struct ValuesExec { /// The schema schema: SchemaRef, /// The data - data: Vec, + data: ValuesData, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, } +/// Represents maybe placeholdered values recordbatch. +#[derive(Debug, Clone)] +enum ValuesData { + /// Resolved batch. + Resolved(Vec), + /// Batch with placeholders. + Placeholdered(Vec>>), +} + impl ValuesExec { - /// create a new values exec from data as expr + fn new(schema: SchemaRef, data: ValuesData) -> Self { + let cache = Self::compute_properties(Arc::clone(&schema)); + Self { + schema, + data, + cache, + } + } + + /// Create a new values exec from data as expr. + /// + /// `has_placeholders` indicates if there are possible placeholders within rows. + /// If there are placeholders they will be resolved in execution time. + /// Flag is passed to avoid extra values bypass is there are placeholders and + /// the caller knows it from somewhere. + /// pub fn try_new( schema: SchemaRef, data: Vec>>, + has_placeholders: bool, ) -> Result { if data.is_empty() { return plan_err!("Values list cannot be empty"); } - let n_row = data.len(); - let n_col = schema.fields().len(); - // we have this single row batch as a placeholder to satisfy evaluation argument - // and generate a single output row - let batch = RecordBatch::try_new_with_options( - Arc::new(Schema::empty()), - vec![], - &RecordBatchOptions::new().with_row_count(Some(1)), - )?; - - let arr = (0..n_col) - .map(|j| { - (0..n_row) - .map(|i| { - let r = data[i][j].evaluate(&batch); - - match r { - Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar), - Ok(ColumnarValue::Array(a)) if a.len() == 1 => { - ScalarValue::try_from_array(&a, 0) - } - Ok(ColumnarValue::Array(a)) => { - plan_err!( - "Cannot have array values {a:?} in a values list" - ) - } - Err(err) => Err(err), - } - }) - .collect::>>() - .and_then(ScalarValue::iter_to_array) - }) - .collect::>>()?; - let batch = RecordBatch::try_new_with_options( - Arc::clone(&schema), - arr, - &RecordBatchOptions::new().with_row_count(Some(n_row)), - )?; - let data: Vec = vec![batch]; - Self::try_new_from_batches(schema, data) + let data = if has_placeholders { + ValuesData::Placeholdered(data) + } else { + let batch = data_to_recordbatch(data, &None, &schema)?; + ValuesData::Resolved(vec![batch]) + }; + + Ok(Self::new(schema, data)) } /// Create a new plan using the provided schema and batches. @@ -118,17 +112,7 @@ impl ValuesExec { } } - let cache = Self::compute_properties(Arc::clone(&schema)); - Ok(ValuesExec { - schema, - data: batches, - cache, - }) - } - - /// provides the data - pub fn data(&self) -> Vec { - self.data.clone() + Ok(Self::new(schema, ValuesData::Resolved(batches))) } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -179,14 +163,16 @@ impl ExecutionPlan for ValuesExec { self: Arc, _: Vec>, ) -> Result> { - ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone()) - .map(|e| Arc::new(e) as _) + Ok(Arc::new(Self::new( + Arc::clone(&self.schema), + self.data.clone(), + ))) } fn execute( &self, partition: usize, - _context: Arc, + context: Arc, ) -> Result { // ValuesExec has a single output partition if 0 != partition { @@ -195,35 +181,105 @@ impl ExecutionPlan for ValuesExec { ); } + let data = match &self.data { + ValuesData::Resolved(batches) => batches.clone(), + ValuesData::Placeholdered(raw_batch) => vec![data_to_recordbatch( + raw_batch.clone(), + context.param_values(), + &self.schema, + )?], + }; + Ok(Box::pin(MemoryStream::try_new( - self.data(), + data, Arc::clone(&self.schema), None, )?)) } fn statistics(&self) -> Result { - let batch = self.data(); - Ok(common::compute_record_batch_statistics( - &[batch], - &self.schema, - None, - )) + Ok(match &self.data { + ValuesData::Resolved(batches) => common::compute_record_batch_statistics( + &[batches.clone()], + &self.schema, + None, + ), + ValuesData::Placeholdered(_) => Statistics::new_unknown(&self.schema), + }) } } +/// Evaluate values expressions and convert them into recordbatch. +/// +fn data_to_recordbatch( + data: Vec>>, + params: &Option, + schema: &SchemaRef, +) -> Result { + let n_col = schema.fields().len(); + let n_row = data.len(); + + // We have this single row batch as a placeholder to satisfy evaluation argument, + // and generate a single output row. + let batch = RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), + )?; + + let arr = (0..n_col) + .map(|j| { + (0..n_row) + .map(|i| { + let value = if params.is_some() { + resolve_placeholders(&data[i][j], params)? + .0 + .evaluate(&batch) + } else { + // No param values, so even the expression contains placeholders, + // there will be a resolving error. + data[i][j].evaluate(&batch) + }; + match value { + Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar), + Ok(ColumnarValue::Array(a)) if a.len() == 1 => { + ScalarValue::try_from_array(&a, 0) + } + Ok(ColumnarValue::Array(a)) => { + plan_err!("Cannot have array values {a:?} in a values list") + } + Err(err) => Err(err), + } + }) + .collect::>>() + .and_then(ScalarValue::iter_to_array) + }) + .collect::>>()?; + + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + arr, + &RecordBatchOptions::new().with_row_count(Some(n_row)), + )?; + + Ok(batch) +} + #[cfg(test)] mod tests { use super::*; use crate::expressions::lit; use crate::test::{self, make_partition}; + use arrow::util::pretty::pretty_format_batches; use arrow_schema::{DataType, Field}; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{binary, placeholder}; #[tokio::test] async fn values_empty_case() -> Result<()> { let schema = test::aggr_test_schema(); - let empty = ValuesExec::try_new(schema, vec![]); + let empty = ValuesExec::try_new(schema, vec![], false); assert!(empty.is_err()); Ok(()) } @@ -264,9 +320,47 @@ mod tests { DataType::UInt32, false, )])); - let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap(); + let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]], false) + .unwrap(); // Test that a null value is rejected - let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]]) - .unwrap_err(); + let _ = ValuesExec::try_new( + schema, + vec![vec![lit(ScalarValue::UInt32(None))]], + false, + ) + .unwrap_err(); + } + + #[test] + fn data_to_recordbatch_resolves_placeholders() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + + let sum = binary( + lit(27u32), + Operator::Plus, + placeholder("$1", DataType::UInt32), + &schema, + )?; + + let rb = data_to_recordbatch( + vec![vec![sum]], + &Some(ParamValues::List(vec![ScalarValue::UInt32(Some(15))])), + &schema, + )?; + + let actual = format!("{}", pretty_format_batches(&[rb])?).to_lowercase(); + let expected = r#" ++------+ +| col0 | ++------+ +| 42 | ++------+ + "#; + assert_eq!(actual, expected.trim()); + Ok(()) } } diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index c9d27237a49b..32ebf6263f9e 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -730,9 +730,12 @@ pub async fn from_substrait_rel( }) .collect::>()?; + // All values are literals. + let has_placeholders = false; Ok(LogicalPlan::Values(Values { schema: DFSchemaRef::new(schema), values, + has_placeholders, })) } Some(ReadType::LocalFiles(lf)) => { @@ -961,7 +964,7 @@ fn compatible_nullabilities( } /// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise -/// conflict with the columns from the other. +/// conflict with the columns from the other. /// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For /// Substrait the names don't matter since it only refers to columns by indices, however DataFusion /// requires columns to be uniquely identifiable, in some places (see e.g. DFSchema::check_names). diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index cc353ab36d97..26416af08d34 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -773,6 +773,7 @@ async fn roundtrip_values_no_columns() -> Result<()> { let plan = LogicalPlan::Values(Values { values: vec![vec![], vec![]], // two rows, no columns schema: DFSchemaRef::new(DFSchema::empty()), + has_placeholders: false, }); roundtrip_logical_plan_with_ctx(plan, ctx).await?; Ok(())