Skip to content

feat: rewrite subquery into dependent join logical plan #16016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 67 commits into
base: main
Choose a base branch
from

Conversation

duongcongtoai
Copy link
Contributor

@duongcongtoai duongcongtoai commented May 10, 2025

Which issue does this PR close?

This PR is a part of a long story for a general purpose subquery decorrelation, after some discussion in #5492, this PR proposes adding the followings:

  • an optimizor that convert all the subqueries into dependent join logical plan (this is only a temporary plan) <---- This is what this PR trying to achieve
  • an optimizor to decorrelate dependent join logical plan, POC is in this working branch

To avoid breaking existing tests and smoother collaboration, the changes should happen in the following sequence

1. Merge item 1 without integrate the new optimizor to the main flow (behavior is tested in-code instead of sqllogictests)
2. Start implement more rewriting rules for different query plan (aggregate, projection, filter ...) using the a new optimizer
We keep the working of this new optimizor in a working branch, and if the implementation can fully support existing subqueries,
we make the following change to the main branch

From

            Arc::new(DecorrelatePredicateSubquery::new()),
            Arc::new(ScalarSubqueryToJoin::new()),
            Arc::new(DecorrelateLateralJoin::new()),

Into

            Arc::new(DependentJoinRewriter::new()),
            Arc::new(DependentJoinDecorrelator::new()),

Or we can even combine the 2 optimizors into one, into

            Arc::new(DependentJoinDecorrelator::new()),

The following works are needed after merging this PR

  • Implement DelimGet logical plan and physical plan
  • Implement DelimGetRemove optimizor similar to DuckDB
  • Implement JoinType::Single similar to duckdb. This operator is needed to support this type of query
select * from outer where outer.a = (select inner.b from inner where inner.c=outer.c)

Rationale for this change

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added logical-expr Logical plan and expressions optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels May 10, 2025
@duongcongtoai duongcongtoai changed the title refactor: framework for subquery unnesting [WIP] refactor: framework for subquery decorrelation May 10, 2025
@alamb
Copy link
Contributor

alamb commented May 12, 2025

FYI @irenjj

@xudong963 xudong963 self-requested a review May 14, 2025 14:49
Copy link
Contributor

@irenjj irenjj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @duongcongtoai , Here are some of my review comments. There are a few things I haven't fully figured out yet and will need some more time.

@@ -485,6 +485,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {

object
}
LogicalPlan::DependentJoin(..) => todo!(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add an empty json!() here.

Comment on lines 406 to 407
// TODO: apply expr on the subquery
LogicalPlan::DependentJoin(..) => Ok(TreeNodeRecursion::Continue),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you mean, but i also updated the implementation here

Comment on lines 317 to 330
LogicalPlan::Projection(proj) => {
for expr in &proj.expr {
if contains_subquery(expr) {
is_dependent_join_node = true;
break;
}
expr.apply(|expr| {
if let Expr::OuterReferenceColumn(data_type, col) = expr {
self.mark_outer_column_access(new_id, data_type, col);
}
Ok(TreeNodeRecursion::Continue)
})?;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that break may prevent mark_outer_column_access.🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, i didn't test this logic, so didn't catch this issue 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, problem addressed, i also handle f_up for the case of projecion, (also added a test case)


f.predicate
.apply(|expr| {
if let Expr::OuterReferenceColumn(data_type, col) = expr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a dependent join, the depth should be incremented by 1, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the depth is not incremented until this match block finishes, and function predicate.apply will not traverse down to the expr under a Subquery expr, for example:
Given a predicate: where col_level1=1 and col_level2=(select count(*) from inner where inner_col=col_level1)
This function only visits:

  • col_level1=1, col_level1, 1
  • col_level2=ScalarSubqueryExpr, col_level2, ScalarSubqueryExpr

It will not visit the expr inner_col=col_level1. This happen when f_down function visits the children of this node (at that time depth has already been incremented)

Comment on lines 331 to 338
LogicalPlan::Subquery(subquery) => {
let parent = self.stack.last().unwrap();
let parent_node = self.nodes.get_mut(parent).unwrap();
// the inserting sequence matter here
// when a parent has multiple children subquery at the same time
// we rely on the order in which subquery children are visited
// to later on find back the corresponding subquery (if some part of them
// were rewritten in the lower node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think more considerations are needed for the subquery case, such as OuterRefColumn, depth, etc. How does DuckDB handle this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't do much for LogicalPlan::Subquery here, because it contains little information of the OuterRefColumn or depth (notice how i create an empty entry for this node for the parent dependent_join_node

                parent_node
                    .columns_accesses_by_subquery_id
                    .insert(new_id, vec![]);

The same entry will be updated with OuterRefColumn and depth should any such expr is found during the downard traversal (f_down). I think the diagram mentioned here describe how the traversal works

                  ↓1   
                  ↑12   
             ┌────────────┐
             │  FILTER    │<--- DependentJoin rewrite
             │   (1)      │     happens here
             └─────┬────┬─┘     Here we already have enough information
             |     |    |       of which node is accessing which column
             |     |    |       provided by "Table Scan t1" node        
             │     |    |       (for example node (6) below )           
             │     |    |       
             │     |    |       
             │     |    |
       ↓2────┘     ↓6   └────↓10
       ↑5          ↑11         ↑11
   ┌───▼───┐    ┌──▼───┐   ┌───▼───────┐
   │SUBQ1  │    │SUBQ2 │   │TABLE SCAN │
   └──┬────┘    └──┬───┘   │    t1     │
      |            |       └───────────┘
      |            |
      |            |
      |            ↓7
      |            ↑10
      |         ┌──▼───────┐
      |         │Filter    │----> mark_outer_column_access(outer_ref)
      |         │outer_ref |
      |         │ (6)      |
      |         └──┬───────┘  
      |            |
     ↓3           ↓8
     ↑4           ↑9
  ┌──▼────┐    ┌──▼────┐
  │SCAN t2│    │SCAN t2│
  └───────┘    └───────┘

Copy link
Contributor Author

@duongcongtoai duongcongtoai May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can relate this section to a codeblock in duckdb: https://github.com/duckdb/duckdb/blob/95d71ea4ec0fb3a151b18943fb532f7b5131dc95/src/planner/binder/query_node/plan_subquery.cpp#L294

unique_ptr<Expression> Binder::PlanSubquery(BoundSubqueryExpression &expr, unique_ptr<LogicalOperator> &root) {
	...
	auto plan = std::move(subquery_root);
	unique_ptr<Expression> result_expression;
	if (!expr.IsCorrelated()) {
		result_expression = PlanUncorrelatedSubquery(*this, expr, root, std::move(plan));
	} else {
		result_expression = PlanCorrelatedSubquery(*this, expr, root, std::move(plan));
	}
	// finally, we recursively plan the nested subqueries (if there are any)
	if (sub_binder->has_unplanned_dependent_joins) {
		RecursiveDependentJoinPlanner plan(*this);
		plan.VisitOperator(*root);
	}

Let's use the SUBQ2 on the above diagram as demonstration:
for duckdb, plan=std::move(subquery_root) is equivalent to node (6), and root is equivalent to node (1) (I think the usage of root is quite misleading, it should be named "subquery_parent" instead, because it is used to reference the direct parent node of subquery instead of the "root" of the whole query plan.

what PlanCorrelatedSubquery does is convert node (6) into DelimJoin given all the information of correlated_columns.

static unique_ptr<Expression> PlanCorrelatedSubquery(Binder &binder, BoundSubqueryExpression &expr,
                                                     unique_ptr<LogicalOperator> &root,
                                                     unique_ptr<LogicalOperator> plan) {
	auto &correlated_columns = expr.binder->correlated_columns;
	// FIXME: there should be a way of disabling decorrelation for ANY queries as well, but not for now...
	bool perform_delim =
	    expr.subquery_type == SubqueryType::ANY ? true : PerformDuplicateElimination(binder, correlated_columns);

however, in our context, because we does not have such information provided by the planner, we have to perform an initial traversal to get them.

After we are done with the traversal, we have a tree which looks exactly like what duckdb has, and the recursion can imitate the same logic

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label May 28, 2025
@github-actions github-actions bot removed the sqllogictest SQL Logic Tests (.slt) label May 28, 2025
@irenjj irenjj mentioned this pull request Jun 2, 2025
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jun 7, 2025
// column
let alias = match e {
Expr::InSubquery(_) | Expr::Exists(_) | Expr::ScalarSubquery(_) => {
subquery_alias_by_offset.get(offset_ref).unwrap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unwrap() is not allowed

Comment on lines +151 to +162
fn rewrite_projection(
&mut self,
original_proj: &Projection,
dependent_join_node: &Node,
current_subquery_depth: usize,
mut current_plan: LogicalPlanBuilder,
subquery_alias_by_offset: HashMap<usize, String>,
) -> Result<LogicalPlanBuilder> {
// everytime we meet a subquery during traversal, we increment this by 1
// we can use this offset to lookup the original subquery info
// in subquery_alias_by_offset
// the reason why we cannot create a hashmap keyed by Subquery object HashMap<Subquery,String>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the shared parts among rewrite_* functions can be extracted

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants