diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 9cce9d8..166f211 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -826,7 +826,7 @@ impl Predicate { Some( self_residuals - .difference(&other.residuals) + .difference(&other_residuals) .cloned() .collect_vec(), ) @@ -1368,4 +1368,411 @@ mod test { Ok(()) } + + /// Regression test for residual_subsumption_test normalization bug. + /// + /// When a residual filter references a column that belongs to a multi-member + /// equivalence class, normalization maps it to the class representative. + /// The bug was computing the set difference between the normalized self_residuals + /// and the raw (un-normalized) other.residuals. This caused the same logical + /// filter to appear as two distinct expressions (e.g., `CAST(col_a) > col_d` + /// vs `CAST(col_b) > col_d`), producing a spurious compensating filter. + /// + /// Setup: view has `col_a = col_b AND col_b > col_d`, query has + /// `col_a = col_b AND col_a > col_d`. Under the equivalence class {col_a, col_b}, + /// both residual filters normalize to the same expression. The rewritten plan + /// should have NO compensating filter for the `> col_d` predicate. + #[tokio::test] + async fn test_residual_filter_with_equivalence_class_normalization() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE eq_test ( + col_a INT, + col_b INT, + col_c VARCHAR, + col_d DOUBLE + )", + ) + .await? + .collect() + .await?; + + // View uses col_b in the residual filter + let base = "\ + SELECT col_a, col_b, col_c, col_d \ + FROM eq_test \ + WHERE col_a = col_b AND col_b > col_d"; + + // Query uses col_a (the equivalence class representative) in the same residual + let query = "\ + SELECT col_a, col_c \ + FROM eq_test \ + WHERE col_a = col_b AND col_a > col_d"; + + let base_plan = ctx.sql(base).await?.into_optimized_plan()?; + let base_nf = SpjNormalForm::new(&base_plan)?; + + ctx.sql(&format!("CREATE TABLE mv AS {base}")) + .await? + .collect() + .await?; + + let query_plan = ctx.sql(query).await?.into_optimized_plan()?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv"); + let rewritten = query_nf + .rewrite_from( + &base_nf, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )? + .ok_or(DataFusionError::Internal( + "expected rewrite to succeed".to_string(), + ))?; + + assert_eq!(rewritten.schema().as_ref(), query_plan.schema().as_ref()); + + // The view already enforces `col > col_d`, so the rewritten plan must NOT + // re-add it as a spurious compensating filter. + let plan_str = format!("{}", rewritten.display_indent()); + assert!( + !plan_str.contains("> mv.col_d") && !plan_str.contains("> col_d"), + "Rewritten plan should not contain spurious residual filter for col > col_d.\n\ + Plan:\n{plan_str}" + ); + + Ok(()) + } + + /// Second regression test: when the query has an EXTRA residual filter beyond + /// what the view provides, only that extra filter should appear as a + /// compensating filter. The shared residual (using a different equivalence + /// class member) must not be duplicated. + #[tokio::test] + async fn test_residual_normalization_extra_filter_only() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE norm_test ( + id1 INT, + id2 INT, + value DOUBLE, + label VARCHAR + )", + ) + .await? + .collect() + .await?; + + // View: id1 = id2 equivalence, residual `id2 > value` and `label LIKE '%x%'` + let base = "\ + SELECT id1, id2, value, label \ + FROM norm_test \ + WHERE id1 = id2 AND id2 > value AND label LIKE '%x%'"; + + // Query: same equivalence & residuals but using id1 instead of id2, + // plus an extra `value > 0` filter not in the view. + let query = "\ + SELECT id1, label \ + FROM norm_test \ + WHERE id1 = id2 AND id1 > value AND label LIKE '%x%' AND value > 0"; + + let base_plan = ctx.sql(base).await?.into_optimized_plan()?; + let base_nf = SpjNormalForm::new(&base_plan)?; + + ctx.sql(&format!("CREATE TABLE mv2 AS {base}")) + .await? + .collect() + .await?; + + let query_plan = ctx.sql(query).await?.into_optimized_plan()?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv2"); + let rewritten = query_nf + .rewrite_from( + &base_nf, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )? + .ok_or(DataFusionError::Internal( + "expected rewrite to succeed".to_string(), + ))?; + + assert_eq!(rewritten.schema().as_ref(), query_plan.schema().as_ref()); + + let plan_str = format!("{}", rewritten.display_indent()); + + // The shared residual `id > value` must NOT appear as a compensating filter + assert!( + !plan_str.contains("> mv2.value") + || plan_str.matches("> mv2.value").count() == 0 + || !plan_str.contains("id1 > value"), + "Rewritten plan should not duplicate the view's residual filter.\n\ + Plan:\n{plan_str}" + ); + + Ok(()) + } + /// Regression tests for alias column filter rewrite behavior. + /// + /// Simulates the Fable merchant aggregates production bug where filtering on an + /// aliased column (`aggregate_name AS name`) with equality (`name = 'blablacar'`) + /// produced EmptyExec, while range filters (`name >= 'bla'`) worked correctly. + /// + /// Test structure mirrors the existing `range filter + equality predicate` test: + /// - base: broad query against raw table, with alias in projection + /// - query: same table + alias, but with a filter on the raw column + /// - mv: materialized from `base` + /// - rewrite_from maps `query` onto `mv` + /// Test 1: Equality filter on an aliased column. + /// + /// base: SELECT aggregate_name AS name, txn_date AS transaction_date FROM merchants + /// query: SELECT aggregate_name AS name FROM merchants WHERE aggregate_name = 'blablacar' + /// + /// The rewriter must recognize that `aggregate_name` in the query maps to the + /// `name` output column of the view, and produce a valid compensating filter. + #[tokio::test] + async fn test_equality_filter_on_aliased_column() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE merchants ( + aggregate_name VARCHAR, + txn_date DATE, + txn_currency VARCHAR, + total_spend DOUBLE + )", + ) + .await? + .collect() + .await?; + + // base: the external view SQL — aliased projection, no filter + let base = "SELECT aggregate_name AS name, \ + txn_date AS transaction_date, \ + txn_currency AS transaction_currency, \ + total_spend \ + FROM merchants"; + + // query: same table, filters on the raw (pre-alias) column name + let query = "SELECT aggregate_name AS name, txn_date AS transaction_date \ + FROM merchants \ + WHERE aggregate_name = 'blablacar'"; + + let base_plan = ctx.sql(base).await?.into_optimized_plan()?; + let base_nf = SpjNormalForm::new(&base_plan)?; + + ctx.sql(&format!("CREATE TABLE mv_eq AS {base}")) + .await? + .collect() + .await?; + + let query_plan = ctx.sql(query).await?.into_optimized_plan()?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv_eq"); + let rewritten = query_nf.rewrite_from( + &base_nf, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + assert!( + rewritten.is_some(), + "Rewrite should succeed for equality filter on aliased column `aggregate_name = 'blablacar'`" + ); + + let plan_str = format!("{}", rewritten.unwrap().display_indent()); + assert!( + plan_str.contains("blablacar"), + "Rewritten plan should contain the equality filter value.\nPlan:\n{plan_str}" + ); + + Ok(()) + } + + /// Test 2: Range filter on an aliased column — the "working" baseline from the bug report. + /// + /// This should pass both before and after any fix. + /// If this fails it means the test structure itself is wrong. + #[tokio::test] + async fn test_range_filter_on_aliased_column_baseline() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE merchants2 ( + aggregate_name VARCHAR, + txn_date DATE, + total_spend DOUBLE + )", + ) + .await? + .collect() + .await?; + + let base = "SELECT aggregate_name AS name, \ + txn_date AS transaction_date, \ + total_spend \ + FROM merchants2"; + + let query = "SELECT aggregate_name AS name \ + FROM merchants2 \ + WHERE aggregate_name >= 'bla'"; + + let base_plan = ctx.sql(base).await?.into_optimized_plan()?; + let base_nf = SpjNormalForm::new(&base_plan)?; + + ctx.sql(&format!("CREATE TABLE mv_range AS {base}")) + .await? + .collect() + .await?; + + let query_plan = ctx.sql(query).await?.into_optimized_plan()?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv_range"); + let rewritten = query_nf.rewrite_from( + &base_nf, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + assert!( + rewritten.is_some(), + "Range filter on aliased column should produce valid rewrite (baseline)" + ); + + Ok(()) + } + + /// Test 3: Equality filter on multiple aliased columns simultaneously. + /// + /// Simulates: `WHERE aggregate_name = 'blablacar' AND txn_currency = 'EUR'` + /// Both columns are aliased in the base view. + #[tokio::test] + async fn test_equality_filter_multiple_aliased_columns() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE merchants3 ( + aggregate_name VARCHAR, + txn_currency VARCHAR, + txn_date DATE, + total_spend DOUBLE + )", + ) + .await? + .collect() + .await?; + + let base = "SELECT aggregate_name AS name, \ + txn_currency AS transaction_currency, \ + txn_date AS transaction_date, \ + total_spend \ + FROM merchants3"; + + let query = "SELECT aggregate_name AS name, txn_currency AS transaction_currency \ + FROM merchants3 \ + WHERE aggregate_name = 'blablacar' AND txn_currency = 'EUR'"; + + let base_plan = ctx.sql(base).await?.into_optimized_plan()?; + let base_nf = SpjNormalForm::new(&base_plan)?; + + ctx.sql(&format!("CREATE TABLE mv_multi AS {base}")) + .await? + .collect() + .await?; + + let query_plan = ctx.sql(query).await?.into_optimized_plan()?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv_multi"); + let rewritten = query_nf.rewrite_from( + &base_nf, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + assert!( + rewritten.is_some(), + "Rewrite should succeed for equality filters on multiple aliased columns" + ); + + let plan_str = format!("{}", rewritten.unwrap().display_indent()); + assert!( + plan_str.contains("blablacar"), + "Rewritten plan should contain filter value.\nPlan:\n{plan_str}" + ); + + Ok(()) + } + + /// Test 4: Equality on one aliased column combined with range on another. + /// + /// Simulates: `WHERE aggregate_name = 'blablacar' AND txn_date >= '2026-01-01'` + /// Common real-world query pattern. + #[tokio::test] + async fn test_mixed_equality_and_range_on_aliased_columns() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE merchants4 ( + aggregate_name VARCHAR, + txn_date DATE, + total_spend DOUBLE + )", + ) + .await? + .collect() + .await?; + + let base = "SELECT aggregate_name AS name, \ + txn_date AS transaction_date, \ + total_spend \ + FROM merchants4"; + + let query = "SELECT aggregate_name AS name, txn_date AS transaction_date, total_spend \ + FROM merchants4 \ + WHERE aggregate_name = 'blablacar' AND txn_date >= '2026-01-01'"; + + let base_plan = ctx.sql(base).await?.into_optimized_plan()?; + let base_nf = SpjNormalForm::new(&base_plan)?; + + ctx.sql(&format!("CREATE TABLE mv_mixed AS {base}")) + .await? + .collect() + .await?; + + let query_plan = ctx.sql(query).await?.into_optimized_plan()?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv_mixed"); + let rewritten = query_nf.rewrite_from( + &base_nf, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + assert!( + rewritten.is_some(), + "Rewrite should succeed for mixed equality + range filters on aliased columns" + ); + + Ok(()) + } }