Skip to content

Support remaining pipe operators #1879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ceb9496
support rename
simonvandel Jun 5, 2025
365f7f7
support union
simonvandel Jun 5, 2025
3dbdadc
support intersect
simonvandel Jun 5, 2025
affe850
support except
simonvandel Jun 5, 2025
e215e7f
intersect requires distinct
simonvandel Jun 5, 2025
96eb232
support call operator
simonvandel Jun 5, 2025
140d723
impl pivot
simonvandel Jun 5, 2025
eaf7c8e
unpivot
simonvandel Jun 5, 2025
4478d6b
fmt
simonvandel Jun 5, 2025
34ba719
self review
simonvandel Jun 5, 2025
f62333e
reduce duplication
simonvandel Jun 6, 2025
9f91429
inline constant
simonvandel Jun 6, 2025
ea50019
don't box
simonvandel Jun 6, 2025
87c529b
clarify
simonvandel Jun 6, 2025
8c817c3
remove useless comment
simonvandel Jun 6, 2025
840fbbd
use maybe_parse
simonvandel Jun 6, 2025
e1bbbd4
cleanup
simonvandel Jun 6, 2025
e78e8d1
simplify
simonvandel Jun 6, 2025
989f6dd
impl join
simonvandel Jun 10, 2025
552def1
fmt
simonvandel Jun 10, 2025
18ea4ba
use display_comma_separated
simonvandel Jun 16, 2025
15e4e32
non-pub
simonvandel Jun 16, 2025
370d119
remove transpose
simonvandel Jun 16, 2025
bfbd008
use parse_set_quantifier
simonvandel Jun 16, 2025
774daf5
rename parse_optional_pipe_alias -> parse_identifier_optional_alias
simonvandel Jun 16, 2025
330a925
fix use of display_comma_separated
simonvandel Jun 16, 2025
324ad14
Reuse `parse_joins` function
simonvandel Jun 29, 2025
e064650
Merge remote-tracking branch 'origin' into support-more-pipe-operators
simonvandel Jun 29, 2025
a45a5a0
clippy
simonvandel Jun 29, 2025
552714a
Don't panic
simonvandel Jun 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,79 @@ pub enum PipeOperator {
/// Syntax: `|> TABLESAMPLE SYSTEM (10 PERCENT)
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#tablesample_pipe_operator>
TableSample { sample: Box<TableSample> },
/// Renames columns in the input table.
///
/// Syntax: `|> RENAME old_name AS new_name, ...`
///
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#rename_pipe_operator>
Rename { mappings: Vec<IdentWithAlias> },
/// Combines the input table with one or more tables using UNION.
///
/// Syntax: `|> UNION [ALL|DISTINCT] (<query>), (<query>), ...`
///
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#union_pipe_operator>
Union {
set_quantifier: SetQuantifier,
queries: Vec<Query>,
},
/// Returns only the rows that are present in both the input table and the specified tables.
///
/// Syntax: `|> INTERSECT [DISTINCT] (<query>), (<query>), ...`
///
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#intersect_pipe_operator>
Intersect {
set_quantifier: SetQuantifier,
queries: Vec<Query>,
},
/// Returns only the rows that are present in the input table but not in the specified tables.
///
/// Syntax: `|> EXCEPT DISTINCT (<query>), (<query>), ...`
///
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#except_pipe_operator>
Except {
set_quantifier: SetQuantifier,
queries: Vec<Query>,
},
/// Calls a table function or procedure that returns a table.
///
/// Syntax: `|> CALL function_name(args) [AS alias]`
///
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#call_pipe_operator>
Call {
function: Function,
alias: Option<Ident>,
},
/// Pivots data from rows to columns.
///
/// Syntax: `|> PIVOT(aggregate_function(column) FOR pivot_column IN (value1, value2, ...)) [AS alias]`
///
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#pivot_pipe_operator>
Pivot {
aggregate_functions: Vec<ExprWithAlias>,
value_column: Vec<Ident>,
value_source: PivotValueSource,
alias: Option<Ident>,
},
/// The `UNPIVOT` pipe operator transforms columns into rows.
///
/// Syntax:
/// ```sql
/// |> UNPIVOT(value_column FOR name_column IN (column1, column2, ...)) [alias]
/// ```
///
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#unpivot_pipe_operator>
Unpivot {
value_column: Ident,
name_column: Ident,
unpivot_columns: Vec<Ident>,
alias: Option<Ident>,
},
/// Joins the input table with another table.
///
/// Syntax: `|> [JOIN_TYPE] JOIN <table> [alias] ON <condition>` or `|> [JOIN_TYPE] JOIN <table> [alias] USING (<columns>)`
///
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#join_pipe_operator>
Join(Join),
}

impl fmt::Display for PipeOperator {
Expand Down Expand Up @@ -2739,7 +2812,87 @@ impl fmt::Display for PipeOperator {
PipeOperator::TableSample { sample } => {
write!(f, "{sample}")
}
PipeOperator::Rename { mappings } => {
write!(f, "RENAME {}", display_comma_separated(mappings))
}
PipeOperator::Union {
set_quantifier,
queries,
} => Self::fmt_set_operation(f, "UNION", set_quantifier, queries),
PipeOperator::Intersect {
set_quantifier,
queries,
} => Self::fmt_set_operation(f, "INTERSECT", set_quantifier, queries),
PipeOperator::Except {
set_quantifier,
queries,
} => Self::fmt_set_operation(f, "EXCEPT", set_quantifier, queries),
PipeOperator::Call { function, alias } => {
write!(f, "CALL {function}")?;
Self::fmt_optional_alias(f, alias)
}
PipeOperator::Pivot {
aggregate_functions,
value_column,
value_source,
alias,
} => {
write!(
f,
"PIVOT({} FOR {} IN ({}))",
display_comma_separated(aggregate_functions),
Expr::CompoundIdentifier(value_column.to_vec()),
value_source
)?;
Self::fmt_optional_alias(f, alias)
}
PipeOperator::Unpivot {
value_column,
name_column,
unpivot_columns,
alias,
} => {
write!(
f,
"UNPIVOT({} FOR {} IN ({}))",
value_column,
name_column,
display_comma_separated(unpivot_columns)
)?;
Self::fmt_optional_alias(f, alias)
}
PipeOperator::Join(join) => write!(f, "{join}"),
}
}
}

impl PipeOperator {
/// Helper function to format optional alias for pipe operators
fn fmt_optional_alias(f: &mut fmt::Formatter<'_>, alias: &Option<Ident>) -> fmt::Result {
if let Some(alias) = alias {
write!(f, " AS {alias}")?;
}
Ok(())
}

/// Helper function to format set operations (UNION, INTERSECT, EXCEPT) with queries
fn fmt_set_operation(
f: &mut fmt::Formatter<'_>,
operation: &str,
set_quantifier: &SetQuantifier,
queries: &[Query],
) -> fmt::Result {
write!(f, "{operation}")?;
match set_quantifier {
SetQuantifier::None => {}
_ => {
write!(f, " {set_quantifier}")?;
}
}
write!(f, " ")?;
let parenthesized_queries: Vec<String> =
queries.iter().map(|query| format!("({query})")).collect();
write!(f, "{}", display_comma_separated(&parenthesized_queries))
}
}

Expand Down
170 changes: 170 additions & 0 deletions src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9988,6 +9988,48 @@ impl<'a> Parser<'a> {
Ok(IdentWithAlias { ident, alias })
}

/// Parse `identifier [AS] identifier` where the AS keyword is optional
fn parse_identifier_with_optional_alias(&mut self) -> Result<IdentWithAlias, ParserError> {
let ident = self.parse_identifier()?;
let _after_as = self.parse_keyword(Keyword::AS);
let alias = self.parse_identifier()?;
Ok(IdentWithAlias { ident, alias })
}

/// Parse comma-separated list of parenthesized queries for pipe operators
fn parse_pipe_operator_queries(&mut self) -> Result<Vec<Query>, ParserError> {
self.parse_comma_separated(|parser| {
parser.expect_token(&Token::LParen)?;
let query = parser.parse_query()?;
parser.expect_token(&Token::RParen)?;
Ok(*query)
})
}

/// Parse set quantifier for pipe operators that require DISTINCT. E.g. INTERSECT and EXCEPT
fn parse_distinct_required_set_quantifier(
&mut self,
operator_name: &str,
) -> Result<SetQuantifier, ParserError> {
let quantifier = self.parse_set_quantifier(&Some(SetOperator::Intersect));
match quantifier {
SetQuantifier::Distinct | SetQuantifier::DistinctByName => Ok(quantifier),
_ => Err(ParserError::ParserError(format!(
"{operator_name} pipe operator requires DISTINCT modifier",
))),
}
}

/// Parse optional identifier alias (with or without AS keyword)
fn parse_identifier_optional_alias(&mut self) -> Result<Option<Ident>, ParserError> {
if self.parse_keyword(Keyword::AS) {
Ok(Some(self.parse_identifier()?))
} else {
// Check if the next token is an identifier (implicit alias)
self.maybe_parse(|parser| parser.parse_identifier())
}
}

/// Optionally parses an alias for a select list item
fn maybe_parse_select_item_alias(&mut self) -> Result<Option<Ident>, ParserError> {
fn validator(explicit: bool, kw: &Keyword, parser: &mut Parser) -> bool {
Expand Down Expand Up @@ -11134,6 +11176,19 @@ impl<'a> Parser<'a> {
Keyword::AGGREGATE,
Keyword::ORDER,
Keyword::TABLESAMPLE,
Keyword::RENAME,
Keyword::UNION,
Keyword::INTERSECT,
Keyword::EXCEPT,
Keyword::CALL,
Keyword::PIVOT,
Keyword::UNPIVOT,
Keyword::JOIN,
Keyword::INNER,
Keyword::LEFT,
Keyword::RIGHT,
Keyword::FULL,
Keyword::CROSS,
])?;
match kw {
Keyword::SELECT => {
Expand Down Expand Up @@ -11200,6 +11255,121 @@ impl<'a> Parser<'a> {
let sample = self.parse_table_sample(TableSampleModifier::TableSample)?;
pipe_operators.push(PipeOperator::TableSample { sample });
}
Keyword::RENAME => {
let mappings =
self.parse_comma_separated(Parser::parse_identifier_with_optional_alias)?;
pipe_operators.push(PipeOperator::Rename { mappings });
}
Keyword::UNION => {
let set_quantifier = self.parse_set_quantifier(&Some(SetOperator::Union));
let queries = self.parse_pipe_operator_queries()?;
pipe_operators.push(PipeOperator::Union {
set_quantifier,
queries,
});
}
Keyword::INTERSECT => {
let set_quantifier =
self.parse_distinct_required_set_quantifier("INTERSECT")?;
let queries = self.parse_pipe_operator_queries()?;
pipe_operators.push(PipeOperator::Intersect {
set_quantifier,
queries,
});
}
Keyword::EXCEPT => {
let set_quantifier = self.parse_distinct_required_set_quantifier("EXCEPT")?;
let queries = self.parse_pipe_operator_queries()?;
pipe_operators.push(PipeOperator::Except {
set_quantifier,
queries,
});
}
Keyword::CALL => {
let function_name = self.parse_object_name(false)?;
let function_expr = self.parse_function(function_name)?;
if let Expr::Function(function) = function_expr {
let alias = self.parse_identifier_optional_alias()?;
pipe_operators.push(PipeOperator::Call { function, alias });
} else {
return Err(ParserError::ParserError(
"Expected function call after CALL".to_string(),
));
}
}
Keyword::PIVOT => {
self.expect_token(&Token::LParen)?;
let aggregate_functions =
self.parse_comma_separated(Self::parse_aliased_function_call)?;
self.expect_keyword_is(Keyword::FOR)?;
let value_column = self.parse_period_separated(|p| p.parse_identifier())?;
self.expect_keyword_is(Keyword::IN)?;

self.expect_token(&Token::LParen)?;
let value_source = if self.parse_keyword(Keyword::ANY) {
let order_by = if self.parse_keywords(&[Keyword::ORDER, Keyword::BY]) {
self.parse_comma_separated(Parser::parse_order_by_expr)?
} else {
vec![]
};
PivotValueSource::Any(order_by)
} else if self.peek_sub_query() {
PivotValueSource::Subquery(self.parse_query()?)
} else {
PivotValueSource::List(
self.parse_comma_separated(Self::parse_expr_with_alias)?,
)
};
self.expect_token(&Token::RParen)?;
self.expect_token(&Token::RParen)?;

let alias = self.parse_identifier_optional_alias()?;

pipe_operators.push(PipeOperator::Pivot {
aggregate_functions,
value_column,
value_source,
alias,
});
}
Keyword::UNPIVOT => {
self.expect_token(&Token::LParen)?;
let value_column = self.parse_identifier()?;
self.expect_keyword(Keyword::FOR)?;
let name_column = self.parse_identifier()?;
self.expect_keyword(Keyword::IN)?;

self.expect_token(&Token::LParen)?;
let unpivot_columns = self.parse_comma_separated(Parser::parse_identifier)?;
self.expect_token(&Token::RParen)?;

self.expect_token(&Token::RParen)?;

let alias = self.parse_identifier_optional_alias()?;

pipe_operators.push(PipeOperator::Unpivot {
value_column,
name_column,
unpivot_columns,
alias,
});
}
Keyword::JOIN
| Keyword::INNER
| Keyword::LEFT
| Keyword::RIGHT
| Keyword::FULL
| Keyword::CROSS => {
self.prev_token();
let mut joins = self.parse_joins()?;
if joins.len() != 1 {
return Err(ParserError::ParserError(
"Join pipe operator must have a single join".to_string(),
));
}
let join = joins.swap_remove(0);
pipe_operators.push(PipeOperator::Join(join))
}
unhandled => {
return Err(ParserError::ParserError(format!(
"`expect_one_of_keywords` further up allowed unhandled keyword: {unhandled:?}"
Expand Down
Loading
Loading