Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/modern-trees-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Fix bug where too much data would be loaded when the lazy collection of a join contains an offset and/or limit clause.
174 changes: 95 additions & 79 deletions packages/db/src/query/compiler/joins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,90 +187,106 @@ function processJoin(
}

if (activeCollection) {
// This join can be optimized by having the active collection
// dynamically load keys into the lazy collection
// based on the value of the joinKey and by looking up
// matching rows in the index of the lazy collection

// Mark the lazy collection as lazy
// this Set is passed by the liveQueryCollection to the compiler
// such that the liveQueryCollection can check it after compilation
// to know which collections are lazy collections
lazyCollections.add(lazyCollection.id)

const activePipeline =
activeCollection === `main` ? mainPipeline : joinedPipeline

let index: BaseIndex<string | number> | undefined

const lazyCollectionJoinExpr =
activeCollection === `main`
? (joinedExpr as PropRef)
: (mainExpr as PropRef)

const followRefResult = followRef(
rawQuery,
lazyCollectionJoinExpr,
lazyCollection
)!
const followRefCollection = followRefResult.collection

const fieldName = followRefResult.path[0]
if (fieldName) {
ensureIndexForField(fieldName, followRefResult.path, followRefCollection)
}

let deoptimized = false

const activePipelineWithLoading: IStreamBuilder<
[key: unknown, [originalKey: string, namespacedRow: NamespacedRow]]
> = activePipeline.pipe(
tap(([joinKey, _]) => {
if (deoptimized) {
return
}

// Find the index for the path we join on
// we need to find the index inside the map operator
// because the indexes are only available after the initial sync
// so we can't fetch it during compilation
index ??= findIndexForField(
followRefCollection.indexes,
followRefResult.path
// If the lazy collection comes from a subquery that has a limit and/or an offset clause
// then we need to deoptimize the join because we don't know which rows are in the result set
// since we simply lookup matching keys in the index but the index contains all rows
// (not just the ones that pass the limit and offset clauses)
const lazyFrom =
activeCollection === `main` ? joinClause.from : rawQuery.from
const limitedSubquery =
lazyFrom.type === `queryRef` &&
(lazyFrom.query.limit || lazyFrom.query.offset)

if (!limitedSubquery) {
// This join can be optimized by having the active collection
// dynamically load keys into the lazy collection
// based on the value of the joinKey and by looking up
// matching rows in the index of the lazy collection

// Mark the lazy collection as lazy
// this Set is passed by the liveQueryCollection to the compiler
// such that the liveQueryCollection can check it after compilation
// to know which collections are lazy collections
lazyCollections.add(lazyCollection.id)

const activePipeline =
activeCollection === `main` ? mainPipeline : joinedPipeline

let index: BaseIndex<string | number> | undefined

const lazyCollectionJoinExpr =
activeCollection === `main`
? (joinedExpr as PropRef)
: (mainExpr as PropRef)

const followRefResult = followRef(
rawQuery,
lazyCollectionJoinExpr,
lazyCollection
)!
const followRefCollection = followRefResult.collection

const fieldName = followRefResult.path[0]
if (fieldName) {
ensureIndexForField(
fieldName,
followRefResult.path,
followRefCollection
)
}

// The `callbacks` object is passed by the liveQueryCollection to the compiler.
// It contains a function to lazy load keys for each lazy collection
// as well as a function to switch back to a regular collection
// (useful when there's no index for available for lazily loading the collection)
const collectionCallbacks = callbacks[lazyCollection.id]
if (!collectionCallbacks) {
throw new Error(
`Internal error: callbacks for collection are missing in join pipeline. Make sure the live query collection sets them before running the pipeline.`
let deoptimized = false

const activePipelineWithLoading: IStreamBuilder<
[key: unknown, [originalKey: string, namespacedRow: NamespacedRow]]
> = activePipeline.pipe(
tap(([joinKey, _]) => {
if (deoptimized) {
return
}

// Find the index for the path we join on
// we need to find the index inside the map operator
// because the indexes are only available after the initial sync
// so we can't fetch it during compilation
index ??= findIndexForField(
followRefCollection.indexes,
followRefResult.path
)
}

const { loadKeys, loadInitialState } = collectionCallbacks

if (index && index.supports(`eq`)) {
// Use the index to fetch the PKs of the rows in the lazy collection
// that match this row from the active collection based on the value of the joinKey
const matchingKeys = index.lookup(`eq`, joinKey)
// Inform the lazy collection that those keys need to be loaded
loadKeys(matchingKeys)
} else {
// We can't optimize the join because there is no index for the join key
// on the lazy collection, so we load the initial state
deoptimized = true
loadInitialState()
}
})
)
// The `callbacks` object is passed by the liveQueryCollection to the compiler.
// It contains a function to lazy load keys for each lazy collection
// as well as a function to switch back to a regular collection
// (useful when there's no index for available for lazily loading the collection)
const collectionCallbacks = callbacks[lazyCollection.id]
if (!collectionCallbacks) {
throw new Error(
`Internal error: callbacks for collection are missing in join pipeline. Make sure the live query collection sets them before running the pipeline.`
)
}

const { loadKeys, loadInitialState } = collectionCallbacks

if (index && index.supports(`eq`)) {
// Use the index to fetch the PKs of the rows in the lazy collection
// that match this row from the active collection based on the value of the joinKey
const matchingKeys = index.lookup(`eq`, joinKey)
// Inform the lazy collection that those keys need to be loaded
loadKeys(matchingKeys)
} else {
// We can't optimize the join because there is no index for the join key
// on the lazy collection, so we load the initial state
deoptimized = true
loadInitialState()
}
})
)

if (activeCollection === `main`) {
mainPipeline = activePipelineWithLoading
} else {
joinedPipeline = activePipelineWithLoading
if (activeCollection === `main`) {
mainPipeline = activePipelineWithLoading
} else {
joinedPipeline = activePipelineWithLoading
}
}
}

Expand Down
78 changes: 78 additions & 0 deletions packages/db/tests/query/join-subquery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,84 @@ function createJoinSubqueryTests(autoIndex: `off` | `eager`): void {
})
})

test(`should use subquery in LEFT JOIN clause - left join with ordered subquery with limit`, () => {
const joinSubquery = createLiveQueryCollection({
query: (q) => {
return q
.from({ issue: issuesCollection })
.join(
{
users: q
.from({ user: usersCollection })
.where(({ user }) => eq(user.status, `active`))
.orderBy(({ user }) => user.name, `asc`)
.limit(1),
},
({ issue, users }) => eq(issue.userId, users.id),
`left`
)
.orderBy(({ issue }) => issue.id, `desc`)
.limit(1)
},
startSync: true,
})

const results = joinSubquery.toArray
console.log(`results`, results)
expect(results).toEqual([
{
issue: {
id: 5,
title: `Feature 2`,
status: `in_progress`,
projectId: 2,
userId: 2,
duration: 15,
createdAt: `2024-01-05`,
},
},
])
})

test(`should use subquery in RIGHT JOIN clause - left join with ordered subquery with limit`, () => {
const joinSubquery = createLiveQueryCollection({
query: (q) => {
return q
.from({
users: q
.from({ user: usersCollection })
.where(({ user }) => eq(user.status, `active`))
.orderBy(({ user }) => user.name, `asc`)
.limit(1),
})
.join(
{ issue: issuesCollection },
({ issue, users }) => eq(issue.userId, users.id),
`right`
)
.orderBy(({ issue }) => issue.id, `desc`)
.limit(1)
},
startSync: true,
})

const results = joinSubquery.toArray
console.log(`results`, results)
expect(results).toEqual([
{
issue: {
id: 5,
title: `Feature 2`,
status: `in_progress`,
projectId: 2,
userId: 2,
duration: 15,
createdAt: `2024-01-05`,
},
},
])
})

test(`should handle subqueries with SELECT clauses in both FROM and JOIN`, () => {
const joinQuery = createLiveQueryCollection({
startSync: true,
Expand Down
Loading