Skip to content

Commit abd80f9

Browse files
authored
Support remaining pipe operators (#1879)
1 parent 3bc9423 commit abd80f9

File tree

3 files changed

+739
-0
lines changed

3 files changed

+739
-0
lines changed

src/ast/query.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2684,6 +2684,79 @@ pub enum PipeOperator {
26842684
/// Syntax: `|> TABLESAMPLE SYSTEM (10 PERCENT)
26852685
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#tablesample_pipe_operator>
26862686
TableSample { sample: Box<TableSample> },
2687+
/// Renames columns in the input table.
2688+
///
2689+
/// Syntax: `|> RENAME old_name AS new_name, ...`
2690+
///
2691+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#rename_pipe_operator>
2692+
Rename { mappings: Vec<IdentWithAlias> },
2693+
/// Combines the input table with one or more tables using UNION.
2694+
///
2695+
/// Syntax: `|> UNION [ALL|DISTINCT] (<query>), (<query>), ...`
2696+
///
2697+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#union_pipe_operator>
2698+
Union {
2699+
set_quantifier: SetQuantifier,
2700+
queries: Vec<Query>,
2701+
},
2702+
/// Returns only the rows that are present in both the input table and the specified tables.
2703+
///
2704+
/// Syntax: `|> INTERSECT [DISTINCT] (<query>), (<query>), ...`
2705+
///
2706+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#intersect_pipe_operator>
2707+
Intersect {
2708+
set_quantifier: SetQuantifier,
2709+
queries: Vec<Query>,
2710+
},
2711+
/// Returns only the rows that are present in the input table but not in the specified tables.
2712+
///
2713+
/// Syntax: `|> EXCEPT DISTINCT (<query>), (<query>), ...`
2714+
///
2715+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#except_pipe_operator>
2716+
Except {
2717+
set_quantifier: SetQuantifier,
2718+
queries: Vec<Query>,
2719+
},
2720+
/// Calls a table function or procedure that returns a table.
2721+
///
2722+
/// Syntax: `|> CALL function_name(args) [AS alias]`
2723+
///
2724+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#call_pipe_operator>
2725+
Call {
2726+
function: Function,
2727+
alias: Option<Ident>,
2728+
},
2729+
/// Pivots data from rows to columns.
2730+
///
2731+
/// Syntax: `|> PIVOT(aggregate_function(column) FOR pivot_column IN (value1, value2, ...)) [AS alias]`
2732+
///
2733+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#pivot_pipe_operator>
2734+
Pivot {
2735+
aggregate_functions: Vec<ExprWithAlias>,
2736+
value_column: Vec<Ident>,
2737+
value_source: PivotValueSource,
2738+
alias: Option<Ident>,
2739+
},
2740+
/// The `UNPIVOT` pipe operator transforms columns into rows.
2741+
///
2742+
/// Syntax:
2743+
/// ```sql
2744+
/// |> UNPIVOT(value_column FOR name_column IN (column1, column2, ...)) [alias]
2745+
/// ```
2746+
///
2747+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#unpivot_pipe_operator>
2748+
Unpivot {
2749+
value_column: Ident,
2750+
name_column: Ident,
2751+
unpivot_columns: Vec<Ident>,
2752+
alias: Option<Ident>,
2753+
},
2754+
/// Joins the input table with another table.
2755+
///
2756+
/// Syntax: `|> [JOIN_TYPE] JOIN <table> [alias] ON <condition>` or `|> [JOIN_TYPE] JOIN <table> [alias] USING (<columns>)`
2757+
///
2758+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#join_pipe_operator>
2759+
Join(Join),
26872760
}
26882761

26892762
impl fmt::Display for PipeOperator {
@@ -2739,7 +2812,87 @@ impl fmt::Display for PipeOperator {
27392812
PipeOperator::TableSample { sample } => {
27402813
write!(f, "{sample}")
27412814
}
2815+
PipeOperator::Rename { mappings } => {
2816+
write!(f, "RENAME {}", display_comma_separated(mappings))
2817+
}
2818+
PipeOperator::Union {
2819+
set_quantifier,
2820+
queries,
2821+
} => Self::fmt_set_operation(f, "UNION", set_quantifier, queries),
2822+
PipeOperator::Intersect {
2823+
set_quantifier,
2824+
queries,
2825+
} => Self::fmt_set_operation(f, "INTERSECT", set_quantifier, queries),
2826+
PipeOperator::Except {
2827+
set_quantifier,
2828+
queries,
2829+
} => Self::fmt_set_operation(f, "EXCEPT", set_quantifier, queries),
2830+
PipeOperator::Call { function, alias } => {
2831+
write!(f, "CALL {function}")?;
2832+
Self::fmt_optional_alias(f, alias)
2833+
}
2834+
PipeOperator::Pivot {
2835+
aggregate_functions,
2836+
value_column,
2837+
value_source,
2838+
alias,
2839+
} => {
2840+
write!(
2841+
f,
2842+
"PIVOT({} FOR {} IN ({}))",
2843+
display_comma_separated(aggregate_functions),
2844+
Expr::CompoundIdentifier(value_column.to_vec()),
2845+
value_source
2846+
)?;
2847+
Self::fmt_optional_alias(f, alias)
2848+
}
2849+
PipeOperator::Unpivot {
2850+
value_column,
2851+
name_column,
2852+
unpivot_columns,
2853+
alias,
2854+
} => {
2855+
write!(
2856+
f,
2857+
"UNPIVOT({} FOR {} IN ({}))",
2858+
value_column,
2859+
name_column,
2860+
display_comma_separated(unpivot_columns)
2861+
)?;
2862+
Self::fmt_optional_alias(f, alias)
2863+
}
2864+
PipeOperator::Join(join) => write!(f, "{join}"),
2865+
}
2866+
}
2867+
}
2868+
2869+
impl PipeOperator {
2870+
/// Helper function to format optional alias for pipe operators
2871+
fn fmt_optional_alias(f: &mut fmt::Formatter<'_>, alias: &Option<Ident>) -> fmt::Result {
2872+
if let Some(alias) = alias {
2873+
write!(f, " AS {alias}")?;
27422874
}
2875+
Ok(())
2876+
}
2877+
2878+
/// Helper function to format set operations (UNION, INTERSECT, EXCEPT) with queries
2879+
fn fmt_set_operation(
2880+
f: &mut fmt::Formatter<'_>,
2881+
operation: &str,
2882+
set_quantifier: &SetQuantifier,
2883+
queries: &[Query],
2884+
) -> fmt::Result {
2885+
write!(f, "{operation}")?;
2886+
match set_quantifier {
2887+
SetQuantifier::None => {}
2888+
_ => {
2889+
write!(f, " {set_quantifier}")?;
2890+
}
2891+
}
2892+
write!(f, " ")?;
2893+
let parenthesized_queries: Vec<String> =
2894+
queries.iter().map(|query| format!("({query})")).collect();
2895+
write!(f, "{}", display_comma_separated(&parenthesized_queries))
27432896
}
27442897
}
27452898

src/parser/mod.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9988,6 +9988,48 @@ impl<'a> Parser<'a> {
99889988
Ok(IdentWithAlias { ident, alias })
99899989
}
99909990

9991+
/// Parse `identifier [AS] identifier` where the AS keyword is optional
9992+
fn parse_identifier_with_optional_alias(&mut self) -> Result<IdentWithAlias, ParserError> {
9993+
let ident = self.parse_identifier()?;
9994+
let _after_as = self.parse_keyword(Keyword::AS);
9995+
let alias = self.parse_identifier()?;
9996+
Ok(IdentWithAlias { ident, alias })
9997+
}
9998+
9999+
/// Parse comma-separated list of parenthesized queries for pipe operators
10000+
fn parse_pipe_operator_queries(&mut self) -> Result<Vec<Query>, ParserError> {
10001+
self.parse_comma_separated(|parser| {
10002+
parser.expect_token(&Token::LParen)?;
10003+
let query = parser.parse_query()?;
10004+
parser.expect_token(&Token::RParen)?;
10005+
Ok(*query)
10006+
})
10007+
}
10008+
10009+
/// Parse set quantifier for pipe operators that require DISTINCT. E.g. INTERSECT and EXCEPT
10010+
fn parse_distinct_required_set_quantifier(
10011+
&mut self,
10012+
operator_name: &str,
10013+
) -> Result<SetQuantifier, ParserError> {
10014+
let quantifier = self.parse_set_quantifier(&Some(SetOperator::Intersect));
10015+
match quantifier {
10016+
SetQuantifier::Distinct | SetQuantifier::DistinctByName => Ok(quantifier),
10017+
_ => Err(ParserError::ParserError(format!(
10018+
"{operator_name} pipe operator requires DISTINCT modifier",
10019+
))),
10020+
}
10021+
}
10022+
10023+
/// Parse optional identifier alias (with or without AS keyword)
10024+
fn parse_identifier_optional_alias(&mut self) -> Result<Option<Ident>, ParserError> {
10025+
if self.parse_keyword(Keyword::AS) {
10026+
Ok(Some(self.parse_identifier()?))
10027+
} else {
10028+
// Check if the next token is an identifier (implicit alias)
10029+
self.maybe_parse(|parser| parser.parse_identifier())
10030+
}
10031+
}
10032+
999110033
/// Optionally parses an alias for a select list item
999210034
fn maybe_parse_select_item_alias(&mut self) -> Result<Option<Ident>, ParserError> {
999310035
fn validator(explicit: bool, kw: &Keyword, parser: &mut Parser) -> bool {
@@ -11134,6 +11176,19 @@ impl<'a> Parser<'a> {
1113411176
Keyword::AGGREGATE,
1113511177
Keyword::ORDER,
1113611178
Keyword::TABLESAMPLE,
11179+
Keyword::RENAME,
11180+
Keyword::UNION,
11181+
Keyword::INTERSECT,
11182+
Keyword::EXCEPT,
11183+
Keyword::CALL,
11184+
Keyword::PIVOT,
11185+
Keyword::UNPIVOT,
11186+
Keyword::JOIN,
11187+
Keyword::INNER,
11188+
Keyword::LEFT,
11189+
Keyword::RIGHT,
11190+
Keyword::FULL,
11191+
Keyword::CROSS,
1113711192
])?;
1113811193
match kw {
1113911194
Keyword::SELECT => {
@@ -11200,6 +11255,121 @@ impl<'a> Parser<'a> {
1120011255
let sample = self.parse_table_sample(TableSampleModifier::TableSample)?;
1120111256
pipe_operators.push(PipeOperator::TableSample { sample });
1120211257
}
11258+
Keyword::RENAME => {
11259+
let mappings =
11260+
self.parse_comma_separated(Parser::parse_identifier_with_optional_alias)?;
11261+
pipe_operators.push(PipeOperator::Rename { mappings });
11262+
}
11263+
Keyword::UNION => {
11264+
let set_quantifier = self.parse_set_quantifier(&Some(SetOperator::Union));
11265+
let queries = self.parse_pipe_operator_queries()?;
11266+
pipe_operators.push(PipeOperator::Union {
11267+
set_quantifier,
11268+
queries,
11269+
});
11270+
}
11271+
Keyword::INTERSECT => {
11272+
let set_quantifier =
11273+
self.parse_distinct_required_set_quantifier("INTERSECT")?;
11274+
let queries = self.parse_pipe_operator_queries()?;
11275+
pipe_operators.push(PipeOperator::Intersect {
11276+
set_quantifier,
11277+
queries,
11278+
});
11279+
}
11280+
Keyword::EXCEPT => {
11281+
let set_quantifier = self.parse_distinct_required_set_quantifier("EXCEPT")?;
11282+
let queries = self.parse_pipe_operator_queries()?;
11283+
pipe_operators.push(PipeOperator::Except {
11284+
set_quantifier,
11285+
queries,
11286+
});
11287+
}
11288+
Keyword::CALL => {
11289+
let function_name = self.parse_object_name(false)?;
11290+
let function_expr = self.parse_function(function_name)?;
11291+
if let Expr::Function(function) = function_expr {
11292+
let alias = self.parse_identifier_optional_alias()?;
11293+
pipe_operators.push(PipeOperator::Call { function, alias });
11294+
} else {
11295+
return Err(ParserError::ParserError(
11296+
"Expected function call after CALL".to_string(),
11297+
));
11298+
}
11299+
}
11300+
Keyword::PIVOT => {
11301+
self.expect_token(&Token::LParen)?;
11302+
let aggregate_functions =
11303+
self.parse_comma_separated(Self::parse_aliased_function_call)?;
11304+
self.expect_keyword_is(Keyword::FOR)?;
11305+
let value_column = self.parse_period_separated(|p| p.parse_identifier())?;
11306+
self.expect_keyword_is(Keyword::IN)?;
11307+
11308+
self.expect_token(&Token::LParen)?;
11309+
let value_source = if self.parse_keyword(Keyword::ANY) {
11310+
let order_by = if self.parse_keywords(&[Keyword::ORDER, Keyword::BY]) {
11311+
self.parse_comma_separated(Parser::parse_order_by_expr)?
11312+
} else {
11313+
vec![]
11314+
};
11315+
PivotValueSource::Any(order_by)
11316+
} else if self.peek_sub_query() {
11317+
PivotValueSource::Subquery(self.parse_query()?)
11318+
} else {
11319+
PivotValueSource::List(
11320+
self.parse_comma_separated(Self::parse_expr_with_alias)?,
11321+
)
11322+
};
11323+
self.expect_token(&Token::RParen)?;
11324+
self.expect_token(&Token::RParen)?;
11325+
11326+
let alias = self.parse_identifier_optional_alias()?;
11327+
11328+
pipe_operators.push(PipeOperator::Pivot {
11329+
aggregate_functions,
11330+
value_column,
11331+
value_source,
11332+
alias,
11333+
});
11334+
}
11335+
Keyword::UNPIVOT => {
11336+
self.expect_token(&Token::LParen)?;
11337+
let value_column = self.parse_identifier()?;
11338+
self.expect_keyword(Keyword::FOR)?;
11339+
let name_column = self.parse_identifier()?;
11340+
self.expect_keyword(Keyword::IN)?;
11341+
11342+
self.expect_token(&Token::LParen)?;
11343+
let unpivot_columns = self.parse_comma_separated(Parser::parse_identifier)?;
11344+
self.expect_token(&Token::RParen)?;
11345+
11346+
self.expect_token(&Token::RParen)?;
11347+
11348+
let alias = self.parse_identifier_optional_alias()?;
11349+
11350+
pipe_operators.push(PipeOperator::Unpivot {
11351+
value_column,
11352+
name_column,
11353+
unpivot_columns,
11354+
alias,
11355+
});
11356+
}
11357+
Keyword::JOIN
11358+
| Keyword::INNER
11359+
| Keyword::LEFT
11360+
| Keyword::RIGHT
11361+
| Keyword::FULL
11362+
| Keyword::CROSS => {
11363+
self.prev_token();
11364+
let mut joins = self.parse_joins()?;
11365+
if joins.len() != 1 {
11366+
return Err(ParserError::ParserError(
11367+
"Join pipe operator must have a single join".to_string(),
11368+
));
11369+
}
11370+
let join = joins.swap_remove(0);
11371+
pipe_operators.push(PipeOperator::Join(join))
11372+
}
1120311373
unhandled => {
1120411374
return Err(ParserError::ParserError(format!(
1120511375
"`expect_one_of_keywords` further up allowed unhandled keyword: {unhandled:?}"

0 commit comments

Comments
 (0)