Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions packages/db-ivm/tests/operators/distinct.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ import { D2 } from "../../src/d2.js"
import { MultiSet } from "../../src/multiset.js"
import { distinct } from "../../src/operators/distinct.js"
import { output } from "../../src/operators/output.js"
import { filter } from "../../src/operators/filter.js"
import { map } from "../../src/operators/map.js"
import { groupBy, sum } from "../../src/operators/groupBy.js"
import { MessageTracker, assertResults } from "../test-utils.js"

describe(`Operators`, () => {
describe(`Efficient distinct operation`, () => {
testDistinct()
})

describe(`Distinct with other operators`, () => {
testDistinctWithOtherOperators()
})
})

function testDistinct() {
Expand Down Expand Up @@ -231,3 +238,207 @@ function testDistinct() {
)
})
}

function testDistinctWithOtherOperators() {
test(`distinct with filter - should apply distinct after filtering`, () => {
const graph = new D2()
const input = graph.newInput<{
id: number
category: string
value: number
}>()
const messages: Array<
MultiSet<{ id: number; category: string; value: number }>
> = []

input.pipe(
filter((item) => item.value > 10),
distinct((item) => item.category),
output((message) => {
messages.push(message)
})
)

graph.finalize()

input.sendData(
new MultiSet([
[{ id: 1, category: `A`, value: 5 }, 1], // Should be filtered out
[{ id: 2, category: `A`, value: 15 }, 1], // Should pass through
[{ id: 3, category: `B`, value: 20 }, 1], // Should pass through
[{ id: 4, category: `A`, value: 25 }, 1], // Should be filtered by distinct (category A already seen)
[{ id: 5, category: `C`, value: 8 }, 1], // Should be filtered out by value
])
)
graph.run()

const data = messages.map((m) => m.getInner())
// Since distinct keeps the last seen item for each category, we expect id: 4 for category A
expect(data).toEqual([
[
[{ id: 4, category: `A`, value: 25 }, 1],
[{ id: 3, category: `B`, value: 20 }, 1],
],
])
})

test(`distinct with map - should apply distinct after mapping`, () => {
const graph = new D2()
const input = graph.newInput<{ id: number; name: string }>()
const messages: Array<MultiSet<string>> = []

input.pipe(
map((item) => item.name.toLowerCase()),
distinct(),
output((message) => {
messages.push(message)
})
)

graph.finalize()

input.sendData(
new MultiSet([
[{ id: 1, name: `Alice` }, 1],
[{ id: 2, name: `ALICE` }, 1], // Should be distinct after mapping to lowercase
[{ id: 3, name: `Bob` }, 1],
[{ id: 4, name: `alice` }, 1], // Should be filtered by distinct
])
)
graph.run()

const data = messages.map((m) => m.getInner())
expect(data).toEqual([
[
[`alice`, 1],
[`bob`, 1],
],
])
})

test(`distinct with groupBy - should work with aggregated data`, () => {
const graph = new D2()
const input = graph.newInput<{ category: string; amount: number }>()
const messages: Array<
MultiSet<[string, { category: string; total: number }]>
> = []

input.pipe(
groupBy((data) => ({ category: data.category }), {
total: sum((data) => data.amount),
}),
distinct(([_, value]) => Math.floor(value.total / 100)), // Distinct by total rounded to hundreds
output((message) => {
messages.push(message)
})
)

graph.finalize()

input.sendData(
new MultiSet([
[{ category: `A`, amount: 100 }, 1],
[{ category: `A`, amount: 50 }, 1], // Total for A = 150
[{ category: `B`, amount: 180 }, 1], // Total for B = 180, same hundred as A
[{ category: `C`, amount: 250 }, 1], // Total for C = 250, different hundred
])
)
graph.run()

const data = messages.map((m) => m.getInner())

// Should have 2 distinct items: one for 100s range and one for 200s range
expect(data[0]).toHaveLength(2)

const totals = data[0].map(([key, _multiplicity]) => {
// Key is [jsonString, aggregatedObject], we want the aggregatedObject
const [_jsonKey, aggregatedValue] = key
return Math.floor(aggregatedValue.total / 100)
})
expect(totals).toContain(1) // 100s range
expect(totals).toContain(2) // 200s range
})

test(`distinct with orderBy - simpler test case`, () => {
const graph = new D2()
const input = graph.newInput<{
id: number
category: string
priority: number
}>()
const messages: Array<
MultiSet<{ id: number; category: string; priority: number }>
> = []

input.pipe(
distinct((item) => item.category),
output((message) => {
messages.push(message)
})
)

graph.finalize()

input.sendData(
new MultiSet([
[{ id: 1, category: `A`, priority: 1 }, 1],
[{ id: 2, category: `B`, priority: 3 }, 1],
[{ id: 3, category: `A`, priority: 2 }, 1], // Should be filtered by distinct
[{ id: 4, category: `C`, priority: 2 }, 1],
])
)
graph.run()

const data = messages.map((m) => m.getInner())

// Should have 3 distinct categories
expect(data[0]).toHaveLength(3)

const categories = data[0].map(([item]) => item.category).sort()
expect(categories).toEqual([`A`, `B`, `C`])
})

test(`complex pipeline: filter -> map -> distinct`, () => {
const graph = new D2()
const input = graph.newInput<{ id: number; name: string; score: number }>()
const messages: Array<
MultiSet<{ id: number; name: string; score: number; grade: string }>
> = []

input.pipe(
filter((item) => item.score >= 60), // Only passing scores
map((item) => ({
...item,
grade: item.score >= 90 ? `A` : item.score >= 80 ? `B` : `C`,
})),
distinct((item) => `${item.name}-${item.grade}`), // Distinct by name-grade combination
output((message) => {
messages.push(message)
})
)

graph.finalize()

input.sendData(
new MultiSet([
[{ id: 1, name: `Alice`, score: 95 }, 1], // A grade
[{ id: 2, name: `Alice`, score: 95 }, 1], // Should be distinct filtered (same name-grade)
[{ id: 3, name: `Bob`, score: 85 }, 1], // B grade
[{ id: 4, name: `Charlie`, score: 75 }, 1], // C grade
[{ id: 5, name: `David`, score: 50 }, 1], // Should be filtered out by score
[{ id: 6, name: `Eve`, score: 65 }, 1], // C grade
])
)
graph.run()

const data = messages.map((m) => m.getInner())

// Should have 4 distinct items: Alice-A, Bob-B, Charlie-C, Eve-C
expect(data[0]).toHaveLength(4)

const nameGradeCombos = data[0]
.map(([item]) => `${item.name}-${item.grade}`)
.sort()
expect(nameGradeCombos).toEqual([`Alice-A`, `Bob-B`, `Charlie-C`, `Eve-C`])
})
}
Loading
Loading