Skip to content

Commit ecc5694

Browse files
authored
fix: FULL OUTER JOIN and LIMIT produces wrong results (#14338)
* fix: FULL OUTER JOIN and LIMIT produces wrong results * Fix minor slt testing * fix test
1 parent dc445a1 commit ecc5694

File tree

2 files changed

+147
-13
lines changed

2 files changed

+147
-13
lines changed

datafusion/optimizer/src/push_down_limit.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,6 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
255255
match join.join_type {
256256
Left => (Some(limit), None),
257257
Right => (None, Some(limit)),
258-
Full => (Some(limit), Some(limit)),
259258
_ => (None, None),
260259
}
261260
};

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 147 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4240,10 +4240,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 LIMIT 2;
42404240
logical_plan
42414241
01)Limit: skip=0, fetch=2
42424242
02)--Full Join: t0.c1 = t1.c1
4243-
03)----Limit: skip=0, fetch=2
4244-
04)------TableScan: t0 projection=[c1, c2], fetch=2
4245-
05)----Limit: skip=0, fetch=2
4246-
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
4243+
03)----TableScan: t0 projection=[c1, c2]
4244+
04)----TableScan: t1 projection=[c1, c2, c3]
42474245
physical_plan
42484246
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
42494247
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)]
@@ -4257,10 +4255,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2;
42574255
logical_plan
42584256
01)Limit: skip=0, fetch=2
42594257
02)--Full Join: Filter: t0.c2 >= t1.c2
4260-
03)----Limit: skip=0, fetch=2
4261-
04)------TableScan: t0 projection=[c1, c2], fetch=2
4262-
05)----Limit: skip=0, fetch=2
4263-
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
4258+
03)----TableScan: t0 projection=[c1, c2]
4259+
04)----TableScan: t1 projection=[c1, c2, c3]
42644260
physical_plan
42654261
01)GlobalLimitExec: skip=0, fetch=2
42664262
02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1
@@ -4274,16 +4270,155 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT
42744270
logical_plan
42754271
01)Limit: skip=0, fetch=2
42764272
02)--Full Join: t0.c1 = t1.c1 Filter: t0.c2 >= t1.c2
4277-
03)----Limit: skip=0, fetch=2
4278-
04)------TableScan: t0 projection=[c1, c2], fetch=2
4279-
05)----Limit: skip=0, fetch=2
4280-
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
4273+
03)----TableScan: t0 projection=[c1, c2]
4274+
04)----TableScan: t1 projection=[c1, c2, c3]
42814275
physical_plan
42824276
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
42834277
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1
42844278
03)----MemoryExec: partitions=1, partition_sizes=[1]
42854279
04)----MemoryExec: partitions=1, partition_sizes=[1]
42864280

4281+
## Add more test cases for join limit pushdown
4282+
statement ok
4283+
drop table t1
4284+
4285+
## Test limit pushdown through OUTER JOIN including left/right and full outer join cases
4286+
statement ok
4287+
set datafusion.execution.target_partitions = 1;
4288+
4289+
### Limit pushdown through join
4290+
4291+
# Note we use csv as MemoryExec does not support limit push down (so doesn't manifest
4292+
# bugs if limits are improperly pushed down)
4293+
query I
4294+
COPY (values (1), (2), (3), (4), (5)) TO 'test_files/scratch/limit/t1.csv'
4295+
STORED AS CSV
4296+
----
4297+
5
4298+
4299+
# store t2 in different order so the top N rows are not the same as the top N rows of t1
4300+
query I
4301+
COPY (values (5), (4), (3), (2), (1)) TO 'test_files/scratch/limit/t2.csv'
4302+
STORED AS CSV
4303+
----
4304+
5
4305+
4306+
statement ok
4307+
create external table t1(a int) stored as CSV location 'test_files/scratch/limit/t1.csv';
4308+
4309+
statement ok
4310+
create external table t2(b int) stored as CSV location 'test_files/scratch/limit/t2.csv';
4311+
4312+
######
4313+
## LEFT JOIN w/ LIMIT
4314+
######
4315+
query II
4316+
select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
4317+
----
4318+
2 2
4319+
1 1
4320+
4321+
# the output of this query should be two rows from the previous query
4322+
# there should be no nulls
4323+
query II
4324+
select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
4325+
----
4326+
2 2
4327+
1 1
4328+
4329+
# can only push down to t1 (preserved side)
4330+
query TT
4331+
explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
4332+
----
4333+
logical_plan
4334+
01)Limit: skip=0, fetch=2
4335+
02)--Left Join: t1.a = t2.b
4336+
03)----Limit: skip=0, fetch=2
4337+
04)------TableScan: t1 projection=[a], fetch=2
4338+
05)----TableScan: t2 projection=[b]
4339+
physical_plan
4340+
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
4341+
02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)]
4342+
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], limit=2, has_header=true
4343+
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true
4344+
4345+
######
4346+
## RIGHT JOIN w/ LIMIT
4347+
######
4348+
4349+
query II
4350+
select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
4351+
----
4352+
5 5
4353+
4 4
4354+
4355+
# the output of this query should be two rows from the previous query
4356+
# there should be no nulls
4357+
query II
4358+
select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
4359+
----
4360+
5 5
4361+
4 4
4362+
4363+
# can only push down to t2 (preserved side)
4364+
query TT
4365+
explain select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
4366+
----
4367+
logical_plan
4368+
01)Limit: skip=0, fetch=2
4369+
02)--Right Join: t1.a = t2.b
4370+
03)----TableScan: t1 projection=[a]
4371+
04)----Limit: skip=0, fetch=2
4372+
05)------TableScan: t2 projection=[b], fetch=2
4373+
physical_plan
4374+
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
4375+
02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)]
4376+
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true
4377+
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], limit=2, has_header=true
4378+
4379+
######
4380+
## FULL JOIN w/ LIMIT
4381+
######
4382+
query II rowsort
4383+
select * from t1 FULL JOIN t2 ON t1.a = t2.b;
4384+
----
4385+
1 1
4386+
2 2
4387+
3 3
4388+
4 4
4389+
5 5
4390+
4391+
# the output of this query should be two rows from the previous query
4392+
# there should be no nulls
4393+
# Reproducer for https://github.com/apache/datafusion/issues/14335
4394+
query II
4395+
select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
4396+
----
4397+
5 5
4398+
4 4
4399+
4400+
4401+
# can't push limit for full outer join
4402+
query TT
4403+
explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
4404+
----
4405+
logical_plan
4406+
01)Limit: skip=0, fetch=2
4407+
02)--Full Join: t1.a = t2.b
4408+
03)----TableScan: t1 projection=[a]
4409+
04)----TableScan: t2 projection=[b]
4410+
physical_plan
4411+
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
4412+
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)]
4413+
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true
4414+
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true
4415+
4416+
statement ok
4417+
drop table t1;
4418+
4419+
statement ok
4420+
drop table t2;
4421+
42874422
# Test Utf8View as Join Key
42884423
# Issue: https://github.com/apache/datafusion/issues/12468
42894424
statement ok

0 commit comments

Comments
 (0)