Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,21 @@ jobs:
run: |
sudo systemctl start postgresql
psql -c 'CREATE EXTENSION IF NOT EXISTS vchord CASCADE;'
sqllogictest --db $USER --user $USER './tests/**/*.slt'
sqllogictest --db $USER --user $USER './tests/general/*.slt'

- name: Sqllogictest(PostgreSQL 17 features)
if: matrix.version == '17'
run: |
sudo systemctl start postgresql
psql -c 'CREATE EXTENSION IF NOT EXISTS vchord CASCADE;'
sqllogictest --db $USER --user $USER './tests/pg17/*.slt'

- name: Sqllogictest(PostgreSQL 16 features)
if: matrix.version == '16'
run: |
sudo systemctl start postgresql
psql -c 'CREATE EXTENSION IF NOT EXISTS vchord CASCADE;'
sqllogictest --db $USER --user $USER './tests/pg16/*.slt'

- name: Package
env:
Expand Down
7 changes: 3 additions & 4 deletions crates/algorithm/src/fast_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ impl<T: Ord> From<Vec<T>> for FastHeap<T> {
impl<T: Ord> Sequence for FastHeap<T> {
type Item = T;
type Inner = std::vec::IntoIter<T>;
fn peek(&mut self) -> Option<&T> {
<FastHeap<T>>::peek(self)
}
fn next(&mut self) -> Option<T> {
self.pop()
}
fn next_if(&mut self, predicate: impl FnOnce(&T) -> bool) -> Option<T> {
let first = self.peek()?;
if predicate(first) { self.pop() } else { None }
}
fn into_inner(self) -> Self::Inner {
match self {
FastHeap::Sorted(sort_heap) => sort_heap.inner.into_iter(),
Expand Down
36 changes: 27 additions & 9 deletions crates/algorithm/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,49 @@ type Item<'b> = (
AlwaysEqual<&'b mut (u32, u16, &'b mut [u32])>,
);

pub fn insert<'r, 'b: 'r, R: RelationRead + RelationWrite, O: Operator>(
pub fn insert_vector<R: RelationRead + RelationWrite, O: Operator>(
index: &R,
payload: NonZero<u64>,
vector: &O::Vector,
) -> (Vec<u32>, u16) {
// `insert_vector` returns a tuple `(list, head)` which will be used in `insert_index` later:
// - `list`: Represents the list of elements to be inserted into the index.
// - `head`: Represents the head of the list, used as a starting point for insertion.
let meta_guard = index.read(0);
let meta_bytes = meta_guard.get(1).expect("data corruption");
let meta_tuple = MetaTuple::deserialize_ref(meta_bytes);
let dims = meta_tuple.dims();
let rerank_in_heap = meta_tuple.rerank_in_heap();
assert_eq!(dims, vector.as_borrowed().dims(), "unmatched dimensions");
let vectors_first = meta_tuple.vectors_first();
drop(meta_guard);

if !rerank_in_heap {
vectors::append::<O>(index, vectors_first, vector.as_borrowed(), payload)
} else {
(Vec::new(), 0)
}
}

pub fn insert_index<'r, 'b: 'r, R: RelationRead + RelationWrite, O: Operator>(
index: &'r R,
payload: NonZero<u64>,
vector: O::Vector,
bump: &'b impl Bump,
mut prefetch_h1_vectors: impl PrefetcherHeapFamily<'r, R>,
list: Vec<u32>,
head: u16,
) {
let meta_guard = index.read(0);
let meta_bytes = meta_guard.get(1).expect("data corruption");
let meta_tuple = MetaTuple::deserialize_ref(meta_bytes);
let dims = meta_tuple.dims();
let is_residual = meta_tuple.is_residual();
let rerank_in_heap = meta_tuple.rerank_in_heap();
let height_of_root = meta_tuple.height_of_root();
assert_eq!(dims, vector.as_borrowed().dims(), "unmatched dimensions");
let root_prefetch = meta_tuple.root_prefetch().to_vec();
let root_head = meta_tuple.root_head();
let root_first = meta_tuple.root_first();
let vectors_first = meta_tuple.vectors_first();
drop(meta_guard);

let default_block_lut = if !is_residual {
Expand All @@ -57,12 +81,6 @@ pub fn insert<'r, 'b: 'r, R: RelationRead + RelationWrite, O: Operator>(
None
};

let (list, head) = if !rerank_in_heap {
vectors::append::<O>(index, vectors_first, vector.as_borrowed(), payload)
} else {
(Vec::new(), 0)
};

type State<O> = (u32, Option<<O as Operator>::Vector>);
let mut state: State<O> = {
if is_residual {
Expand Down
66 changes: 57 additions & 9 deletions crates/algorithm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub use bulkdelete::bulkdelete;
pub use cache::cache;
pub use cost::cost;
pub use fast_heap::FastHeap;
pub use insert::insert;
pub use insert::{insert_index, insert_vector};
pub use maintain::maintain;
pub use prefetcher::*;
pub use prewarm::prewarm;
Expand Down Expand Up @@ -219,24 +219,64 @@ impl<'b, T, A, B> Fetch for (T, AlwaysEqual<&'b mut (A, B, &'b mut [u32])>) {
}
}

pub struct Filter<S, P> {
pub iter: S,
pub filter: P,
}

impl<S: Sequence, P: FnMut(&S::Item) -> bool> Sequence for Filter<S, P> {
type Item = S::Item;
type Inner = S::Inner;

fn peek(&mut self) -> Option<&Self::Item> {
loop {
let item = self.iter.peek()?;
if (self.filter)(item) {
return self.iter.peek();
} else {
self.iter.next();
continue;
}
}
}
fn next(&mut self) -> Option<S::Item> {
loop {
let item = self.iter.peek()?;
if (self.filter)(item) {
return self.iter.next();
} else {
self.iter.next();
continue;
}
}
}

fn into_inner(self) -> Self::Inner {
self.iter.into_inner()
}
}

pub trait Sequence {
type Item;
type Inner: Iterator<Item = Self::Item>;
fn peek(&mut self) -> Option<&Self::Item>;
fn next(&mut self) -> Option<Self::Item>;
fn next_if(&mut self, predicate: impl FnOnce(&Self::Item) -> bool) -> Option<Self::Item>;
fn next_if(&mut self, predicate: impl FnOnce(&Self::Item) -> bool) -> Option<Self::Item> {
let peek = self.peek()?;
if predicate(peek) { self.next() } else { None }
}
fn into_inner(self) -> Self::Inner;
}

impl<T: Ord> Sequence for BinaryHeap<T> {
type Item = T;
type Inner = std::vec::IntoIter<T>;
fn peek(&mut self) -> Option<&T> {
<BinaryHeap<T>>::peek(self)
}
fn next(&mut self) -> Option<T> {
self.pop()
}
fn next_if(&mut self, predicate: impl FnOnce(&T) -> bool) -> Option<T> {
let peek = self.peek()?;
if predicate(peek) { self.pop() } else { None }
}
fn into_inner(self) -> Self::Inner {
self.into_vec().into_iter()
}
Expand All @@ -245,13 +285,21 @@ impl<T: Ord> Sequence for BinaryHeap<T> {
impl<I: Iterator> Sequence for Peekable<I> {
type Item = I::Item;
type Inner = Peekable<I>;
fn peek(&mut self) -> Option<&I::Item> {
Comment thread
cutecutecat marked this conversation as resolved.
Peekable::peek(self)
}
fn next(&mut self) -> Option<I::Item> {
Iterator::next(self)
}
fn next_if(&mut self, predicate: impl FnOnce(&I::Item) -> bool) -> Option<I::Item> {
Peekable::next_if(self, predicate)
}
fn into_inner(self) -> Self::Inner {
self
}
}

pub fn seq_filter<T, F>(heap: impl Sequence<Item = T>, filter: F) -> impl Sequence<Item = T>
where
F: FnMut(&T) -> bool,
T: Ord,
{
Filter { iter: heap, filter }
}
20 changes: 16 additions & 4 deletions src/index/algorithm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,42 +322,54 @@ pub fn insert(
match (vector, opfamily.distance_kind()) {
(OwnedVector::Vecf32(vector), DistanceKind::L2) => {
assert!(opfamily.vector_kind() == VectorKind::Vecf32);
Comment thread
cutecutecat marked this conversation as resolved.
algorithm::insert::<_, Op<VectOwned<f32>, L2>>(
let (list, head) = insert_vector::<_, Op<VectOwned<f32>, L2>>(index, payload, &vector);
insert_index::<_, Op<VectOwned<f32>, L2>>(
index,
payload,
RandomProject::project(vector.as_borrowed()),
&bump,
make_h1_plain_prefetcher,
list,
head,
)
}
(OwnedVector::Vecf32(vector), DistanceKind::Dot) => {
assert!(opfamily.vector_kind() == VectorKind::Vecf32);
algorithm::insert::<_, Op<VectOwned<f32>, Dot>>(
let (list, head) = insert_vector::<_, Op<VectOwned<f32>, Dot>>(index, payload, &vector);
insert_index::<_, Op<VectOwned<f32>, Dot>>(
index,
payload,
RandomProject::project(vector.as_borrowed()),
&bump,
make_h1_plain_prefetcher,
list,
head,
)
}
(OwnedVector::Vecf16(vector), DistanceKind::L2) => {
assert!(opfamily.vector_kind() == VectorKind::Vecf16);
algorithm::insert::<_, Op<VectOwned<f16>, L2>>(
let (list, head) = insert_vector::<_, Op<VectOwned<f16>, L2>>(index, payload, &vector);
insert_index::<_, Op<VectOwned<f16>, L2>>(
index,
payload,
RandomProject::project(vector.as_borrowed()),
&bump,
make_h1_plain_prefetcher,
list,
head,
)
}
(OwnedVector::Vecf16(vector), DistanceKind::Dot) => {
assert!(opfamily.vector_kind() == VectorKind::Vecf16);
algorithm::insert::<_, Op<VectOwned<f16>, Dot>>(
let (list, head) = insert_vector::<_, Op<VectOwned<f16>, Dot>>(index, payload, &vector);
insert_index::<_, Op<VectOwned<f16>, Dot>>(
index,
payload,
RandomProject::project(vector.as_borrowed()),
&bump,
make_h1_plain_prefetcher,
list,
head,
)
}
}
Expand Down
60 changes: 37 additions & 23 deletions src/index/am/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
pub mod am_build;

use super::algorithm::BumpAlloc;
use super::gucs::prererank_filtering;
use crate::index::gucs;
use crate::index::lazy_cell::LazyCell;
use crate::index::opclass::{Opfamily, opfamily};
Expand Down Expand Up @@ -581,28 +580,6 @@ impl SearchFetcher for HeapFetcher {
if !fetch_row_version(self.heap_relation, &mut ctid, self.snapshot, self.slot) {
return None;
}
if !self.hack.is_null() && prererank_filtering() {
if let Some(qual) = NonNull::new((*self.hack).ss.ps.qual) {
use pgrx::datum::FromDatum;
use pgrx::memcxt::PgMemoryContexts;
assert!(qual.as_ref().flags & pgrx::pg_sys::EEO_FLAG_IS_QUAL as u8 != 0);
let evalfunc = qual.as_ref().evalfunc.expect("no evalfunc for qual");
if !(*self.hack).ss.ps.ps_ExprContext.is_null() {
let econtext = (*self.hack).ss.ps.ps_ExprContext;
(*econtext).ecxt_scantuple = self.slot;
pgrx::pg_sys::MemoryContextReset((*econtext).ecxt_per_tuple_memory);
let result = PgMemoryContexts::For((*econtext).ecxt_per_tuple_memory)
.switch_to(|_| {
let mut is_null = true;
let datum = evalfunc(qual.as_ptr(), econtext, &mut is_null);
bool::from_datum(datum, is_null)
});
if result != Some(true) {
return None;
}
}
}
}
(*self.econtext).ecxt_scantuple = self.slot;
pgrx::pg_sys::MemoryContextReset((*self.econtext).ecxt_per_tuple_memory);
pgrx::pg_sys::FormIndexDatum(
Expand All @@ -615,6 +592,43 @@ impl SearchFetcher for HeapFetcher {
Some((&self.values, &self.is_nulls))
}
}

fn filter(&mut self, key: [u16; 3]) -> bool {
if self.hack.is_null() {
return true;
}
unsafe {
let mut ctid = key_to_ctid(key);
let table_am = (*self.heap_relation).rd_tableam;
let fetch_row_version = (*table_am)
.tuple_fetch_row_version
.expect("unsupported heap access method");
if !fetch_row_version(self.heap_relation, &mut ctid, self.snapshot, self.slot) {
return false;
}
if let Some(qual) = NonNull::new((*self.hack).ss.ps.qual) {
use pgrx::datum::FromDatum;
use pgrx::memcxt::PgMemoryContexts;
assert!(qual.as_ref().flags & pgrx::pg_sys::EEO_FLAG_IS_QUAL as u8 != 0);
let evalfunc = qual.as_ref().evalfunc.expect("no evalfunc for qual");
if !(*self.hack).ss.ps.ps_ExprContext.is_null() {
let econtext = (*self.hack).ss.ps.ps_ExprContext;
(*econtext).ecxt_scantuple = self.slot;
pgrx::pg_sys::MemoryContextReset((*econtext).ecxt_per_tuple_memory);
let result = PgMemoryContexts::For((*econtext).ecxt_per_tuple_memory)
.switch_to(|_| {
let mut is_null = true;
let datum = evalfunc(qual.as_ptr(), econtext, &mut is_null);
bool::from_datum(datum, is_null)
});
if result != Some(true) {
return false;
}
}
}
true
}
}
}

struct Index {
Expand Down
14 changes: 7 additions & 7 deletions src/index/gucs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ static MAX_SCAN_TUPLES: GucSetting<i32> = GucSetting::<i32>::new(-1);
static MAXSIM_REFINE: GucSetting<i32> = GucSetting::<i32>::new(0);
static MAXSIM_THRESHOLD: GucSetting<i32> = GucSetting::<i32>::new(0);

static PRERERANK_FILTERING: GucSetting<bool> = GucSetting::<bool>::new(false);
static PREFILTER: GucSetting<bool> = GucSetting::<bool>::new(false);

static IO_SEARCH: GucSetting<PostgresIo> = GucSetting::<PostgresIo>::new(
#[cfg(any(feature = "pg13", feature = "pg14", feature = "pg15", feature = "pg16"))]
Expand Down Expand Up @@ -110,10 +110,10 @@ pub fn init() {
GucFlags::default(),
);
GucRegistry::define_bool_guc(
"vchordrq.prererank_filtering",
"`prererank_filtering` argument of vchordrq.",
"`prererank_filtering` argument of vchordrq.",
&PRERERANK_FILTERING,
"vchordrq.prefilter",
"`prefilter` argument of vchordrq.",
"`prefilter` argument of vchordrq.",
&PREFILTER,
GucContext::Userset,
GucFlags::default(),
);
Expand Down Expand Up @@ -207,8 +207,8 @@ pub fn prewarm_dim() -> Vec<u32> {
}
}

pub fn prererank_filtering() -> bool {
PRERERANK_FILTERING.get()
pub fn prefilter() -> bool {
PREFILTER.get()
}

pub fn io_search() -> Io {
Expand Down
Loading
Loading