From f151e6b93de7c850f52409c2d737da4027e19dc4 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 21 Apr 2026 22:13:38 -0400 Subject: [PATCH 01/12] Add ability to override function behaviour via registry in VortexSession Signed-off-by: Robert Kruszewski --- vortex-array/public-api.lock | 34 +++++++++++++ vortex-array/src/executor.rs | 8 +-- vortex-array/src/optimizer/mod.rs | 76 ++++++++++++++++++++++++++-- vortex-session/public-api.lock | 24 +++++++++ vortex-session/src/registry.rs | 84 +++++++++++++++++++++++++++++++ vortex/src/lib.rs | 2 + 6 files changed, 219 insertions(+), 9 deletions(-) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 8ee3c97f17b..f9b4c7f4506 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -13360,18 +13360,50 @@ pub fn vortex_array::optimizer::rules::ParentReduceRuleAdapter::matches(&s pub fn vortex_array::optimizer::rules::ParentReduceRuleAdapter::reduce_parent(&self, child: vortex_array::ArrayView<'_, V>, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> +pub mod vortex_array::optimizer::session + +pub struct vortex_array::optimizer::session::OptimizerSession + +impl vortex_array::optimizer::session::OptimizerSession + +pub fn vortex_array::optimizer::session::OptimizerSession::empty() -> Self + +pub fn vortex_array::optimizer::session::OptimizerSession::registry(&self) -> &vortex_session::registry::FnRegistry + +impl core::default::Default for vortex_array::optimizer::session::OptimizerSession + +pub fn vortex_array::optimizer::session::OptimizerSession::default() -> vortex_array::optimizer::session::OptimizerSession + +impl core::fmt::Debug for vortex_array::optimizer::session::OptimizerSession + +pub fn vortex_array::optimizer::session::OptimizerSession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub trait vortex_array::optimizer::session::OptimizerSessionExt: vortex_session::SessionExt + +pub fn vortex_array::optimizer::session::OptimizerSessionExt::optimizer(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::session::OptimizerSession> + +impl vortex_array::optimizer::session::OptimizerSessionExt for S + +pub fn S::optimizer(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::session::OptimizerSession> + pub trait vortex_array::optimizer::ArrayOptimizer pub fn vortex_array::optimizer::ArrayOptimizer::optimize(&self) -> vortex_error::VortexResult +pub fn vortex_array::optimizer::ArrayOptimizer::optimize_ctx(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult + pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self) -> vortex_error::VortexResult impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_ctx(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult + pub fn vortex_array::ArrayRef::optimize_recursive(&self) -> vortex_error::VortexResult +pub type vortex_array::optimizer::ReduceParentFn = fn(child: &vortex_array::ArrayRef, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> + pub mod vortex_array::patches pub struct vortex_array::patches::Patches @@ -22328,6 +22360,8 @@ impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_ctx(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult + pub fn vortex_array::ArrayRef::optimize_recursive(&self) -> vortex_error::VortexResult impl vortex_array::scalar_fn::ReduceNode for vortex_array::ArrayRef diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index e35b485972a..cf99785a89a 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -121,7 +121,7 @@ impl ArrayRef { return Ok(current); } Some(frame) => { - current = frame.put_back(current)?.optimize()?; + current = frame.put_back(current)?.optimize_ctx(ctx)?; continue; } } @@ -137,9 +137,9 @@ impl ArrayRef { "execute_parent rewrote {} -> {}", current, rewritten )); - current = rewritten.optimize()?; + current = rewritten.optimize_ctx(ctx)?; if let Some(frame) = stack.pop() { - current = frame.put_back(current)?.optimize()?; + current = frame.put_back(current)?.optimize_ctx(ctx)?; } continue; } @@ -158,7 +158,7 @@ impl ArrayRef { )); let frame = StackFrame::new(parent, i, done, &child); stack.push(frame); - current = child.optimize()?; + current = child.optimize_ctx(ctx)?; } ExecutionStep::Done => { ctx.log(format_args!("Done: {}", array)); diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 0401e69cb9a..6aad759bb0b 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -6,26 +6,65 @@ //! //! Optimization runs between execution steps, which is what enables cross-step optimizations: //! after a child is decoded, new `reduce_parent` rules may match that were previously blocked. +//! +//! There are two entry points: +//! +//! * [`ArrayOptimizer::optimize`] — runs the static rules only (the child encoding's +//! `PARENT_RULES`). It does not require an execution context and is used by helpers like +//! `ArrayBuiltins::cast` and `ArrayRef::slice` that build wrapped expressions and need them +//! normalized inline. +//! * [`ArrayOptimizer::optimize_ctx`] — runs the static rules and additionally consults the +//! session's [`OptimizerSession`] registry keyed by `(parent_encoding_id, child_encoding_id)` +//! before each `reduce_parent` step. The execute loop calls this entry point so plugin- +//! registered parent-reduce rules fire during execution. + +use std::sync::Arc; use vortex_error::VortexResult; use vortex_error::vortex_bail; +use vortex_session::SessionExt; use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::optimizer::session::OptimizerSession; pub mod rules; +pub mod session; + +/// Pluggable parent-reduce function signature used by [`OptimizerSession`]. +/// +/// A function of this type rewrites the parent array that holds `child` at `child_idx`, given +/// the child itself and its parent. Returns `Ok(None)` when the function doesn't apply. +/// +/// Registered under `(parent_encoding_id, child_encoding_id)`; callers downcast the erased +/// `child`/`parent` to their expected types before applying logic. +pub type ReduceParentFn = + fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult>; /// Extension trait for optimizing array trees using reduce/reduce_parent rules. pub trait ArrayOptimizer { - /// Optimize the root array node only by running reduce and reduce_parent rules to fixpoint. + /// Optimize the root array node by running reduce and reduce_parent rules to fixpoint. + /// + /// Uses only the child encoding's static `PARENT_RULES`. Use [`Self::optimize_ctx`] from + /// inside the execute loop to also consult the session-scoped [`OptimizerSession`] registry. fn optimize(&self) -> VortexResult; - /// Optimize the entire array tree recursively (root and all descendants). + /// Like [`Self::optimize`], but additionally consults the session's [`OptimizerSession`] + /// registry for each `(parent_encoding_id, child_encoding_id)` pair before the static + /// vtable rules. + fn optimize_ctx(&self, ctx: &ExecutionCtx) -> VortexResult; + + /// Optimize the entire array tree recursively (root and all descendants), static rules only. fn optimize_recursive(&self) -> VortexResult; } impl ArrayOptimizer for ArrayRef { fn optimize(&self) -> VortexResult { - Ok(try_optimize(self)?.unwrap_or_else(|| self.clone())) + Ok(try_optimize(self, None)?.unwrap_or_else(|| self.clone())) + } + + fn optimize_ctx(&self, ctx: &ExecutionCtx) -> VortexResult { + Ok(try_optimize(self, Some(ctx))?.unwrap_or_else(|| self.clone())) } fn optimize_recursive(&self) -> VortexResult { @@ -33,7 +72,23 @@ impl ArrayOptimizer for ArrayRef { } } -fn try_optimize(array: &ArrayRef) -> VortexResult> { +/// Resolve a pluggable [`ReduceParentFn`] for `(parent, child)` from the session registry. +/// +/// Returns `None` when no [`OptimizerSession`] is registered, or no function is registered under +/// `(parent.encoding_id(), child.encoding_id())`. The returned `Arc` is owned so the caller is +/// free to drop the session borrow before invoking it. +fn plugin_reduce_parent( + ctx: &ExecutionCtx, + parent: &ArrayRef, + child: &ArrayRef, +) -> Option> { + ctx.session().get_opt::().and_then(|s| { + s.registry() + .find::(parent.encoding_id(), child.encoding_id()) + }) +} + +fn try_optimize(array: &ArrayRef, ctx: Option<&ExecutionCtx>) -> VortexResult> { let mut current_array = array.clone(); let mut any_optimizations = false; @@ -55,6 +110,17 @@ fn try_optimize(array: &ArrayRef) -> VortexResult> { // Its important to take all slots here, as `current_array` can change inside the loop. for (slot_idx, slot) in current_array.slots().iter().enumerate() { let Some(child) = slot else { continue }; + + // Registry-based override: tried before the child encoding's static PARENT_RULES. + if let Some(ctx) = ctx + && let Some(plugin) = plugin_reduce_parent(ctx, ¤t_array, child) + && let Some(new_array) = plugin(child, ¤t_array, slot_idx)? + { + current_array = new_array; + any_optimizations = true; + continue 'outer; + } + if let Some(new_array) = child.reduce_parent(¤t_array, slot_idx)? { // If the parent was replaced, then we attempt to reduce it again. current_array = new_array; @@ -80,7 +146,7 @@ fn try_optimize_recursive(array: &ArrayRef) -> VortexResult> { let mut current_array = array.clone(); let mut any_optimizations = false; - if let Some(new_array) = try_optimize(¤t_array)? { + if let Some(new_array) = try_optimize(¤t_array, None)? { current_array = new_array; any_optimizations = true; } diff --git a/vortex-session/public-api.lock b/vortex-session/public-api.lock index a63e955f77a..93d51f87804 100644 --- a/vortex-session/public-api.lock +++ b/vortex-session/public-api.lock @@ -40,6 +40,30 @@ impl core::default::Default for vortex_session::registry::Context pub fn vortex_session::registry::Context::default() -> Self +pub struct vortex_session::registry::FnRegistry(_) + +impl vortex_session::registry::FnRegistry + +pub fn vortex_session::registry::FnRegistry::contains(&self, outer: vortex_session::registry::Id, inner: vortex_session::registry::Id) -> bool + +pub fn vortex_session::registry::FnRegistry::empty() -> Self + +pub fn vortex_session::registry::FnRegistry::find(&self, outer: vortex_session::registry::Id, inner: vortex_session::registry::Id) -> core::option::Option> + +pub fn vortex_session::registry::FnRegistry::register(&self, outer: vortex_session::registry::Id, inner: vortex_session::registry::Id, f: F) + +impl core::clone::Clone for vortex_session::registry::FnRegistry + +pub fn vortex_session::registry::FnRegistry::clone(&self) -> vortex_session::registry::FnRegistry + +impl core::default::Default for vortex_session::registry::FnRegistry + +pub fn vortex_session::registry::FnRegistry::default() -> vortex_session::registry::FnRegistry + +impl core::fmt::Debug for vortex_session::registry::FnRegistry + +pub fn vortex_session::registry::FnRegistry::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + pub struct vortex_session::registry::Id(_) impl vortex_session::registry::Id diff --git a/vortex-session/src/registry.rs b/vortex-session/src/registry.rs index a739b9fdda3..2f1745d3384 100644 --- a/vortex-session/src/registry.rs +++ b/vortex-session/src/registry.rs @@ -4,6 +4,7 @@ //! Many session types use a registry of objects that can be looked up by name to construct //! contexts. This module provides a generic registry type for that purpose. +use std::any::Any; use std::cmp::Ordering; use std::fmt; use std::fmt::Debug; @@ -287,3 +288,86 @@ impl Context { self.ids.read().clone() } } + +/// A registry of type-erased function values keyed by a pair of [`Id`] values. +/// +/// Each entry stores an `Arc` wrapping a caller-supplied concrete type `F` +/// (typically a function pointer). Callers recover the original type by passing the same `F` to +/// [`FnRegistry::find`]; `find` returns `Some(Arc)` on a type match and `None` otherwise. +/// +/// Used for pluggable dispatch keyed by an `(outer, inner)` identifier pair — for example the +/// optimizer's parent-reduce registry keys by `(parent_encoding_id, child_encoding_id)` so that +/// downstream crates can override the rule that would normally run from the child encoding's +/// static `PARENT_RULES` set. +#[derive(Clone, Debug, Default)] +pub struct FnRegistry(Arc>>); + +impl FnRegistry { + /// Create a new, empty registry. + pub fn empty() -> Self { + Self::default() + } + + /// Register a function under `(outer, inner)`, replacing any existing entry. + pub fn register(&self, outer: Id, inner: Id, f: F) { + self.0.insert((outer, inner), Arc::new(f)); + } + + /// Look up a function registered under `(outer, inner)`, downcasting to `F`. + /// + /// Returns `None` if no function is registered, or if the registered value is not of type `F`. + pub fn find(&self, outer: Id, inner: Id) -> Option> { + let entry = self.0.get(&(outer, inner))?; + Arc::clone(entry.value()).downcast::().ok() + } + + /// Return `true` if any function is registered under `(outer, inner)`. + pub fn contains(&self, outer: Id, inner: Id) -> bool { + self.0.contains_key(&(outer, inner)) + } +} + +#[cfg(test)] +mod fn_registry_tests { + use super::FnRegistry; + use super::Id; + + type DoubleFn = fn(i64) -> i64; + + fn double(x: i64) -> i64 { + x * 2 + } + + #[test] + fn register_and_find() { + let registry = FnRegistry::default(); + let outer = Id::new("test.double"); + let inner = Id::new("test.int"); + + assert!(!registry.contains(outer, inner)); + registry.register::(outer, inner, double); + + assert!(registry.contains(outer, inner)); + let f = registry.find::(outer, inner).unwrap(); + assert_eq!(f(21), 42); + } + + #[test] + fn find_with_wrong_type_returns_none() { + let registry = FnRegistry::default(); + let outer = Id::new("test.double"); + let inner = Id::new("test.int"); + registry.register::(outer, inner, double); + + type OtherFn = fn(i32) -> i32; + assert!(registry.find::(outer, inner).is_none()); + } + + #[test] + fn missing_entry_returns_none() { + let registry = FnRegistry::default(); + let outer = Id::new("test.missing"); + let inner = Id::new("test.int"); + assert!(registry.find::(outer, inner).is_none()); + } +} diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 12b71709b72..1529190b3e4 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -12,6 +12,7 @@ use vortex_array::dtype::session::DTypeSession; // vortex::expr is in the process of having its dependencies inverted, and will eventually be // pulled back out into a vortex_expr crate. pub use vortex_array::expr; +use vortex_array::optimizer::session::OptimizerSession; pub use vortex_array::scalar_fn; use vortex_array::scalar_fn::session::ScalarFnSession; use vortex_array::session::ArraySession; @@ -165,6 +166,7 @@ impl VortexSessionDefault for VortexSession { .with::() .with::() .with::() + .with::() .with::() .with::(); From 0fd4e69b03a199fe3977aa705651162958b5deba Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 21 Apr 2026 22:20:52 -0400 Subject: [PATCH 02/12] missed this Signed-off-by: Robert Kruszewski --- vortex-array/src/optimizer/session.rs | 43 +++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 vortex-array/src/optimizer/session.rs diff --git a/vortex-array/src/optimizer/session.rs b/vortex-array/src/optimizer/session.rs new file mode 100644 index 00000000000..b9f6e684dc9 --- /dev/null +++ b/vortex-array/src/optimizer/session.rs @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Session state for pluggable parent-reduce rules. +//! +//! [`OptimizerSession`] wraps an [`FnRegistry`] keyed by `(parent_encoding_id, child_encoding_id)` +//! and is consulted by the optimizer during execution, before the child encoding's static +//! `PARENT_RULES` are tried. Entries are typed as [`ReduceParentFn`](super::ReduceParentFn). +//! +//! The registry is empty by default. Downstream crates register `ReduceParentFn` values to add +//! new parent-reduce rules or override ones that the child encoding would otherwise run from its +//! static `PARENT_RULES`. + +use vortex_session::Ref; +use vortex_session::SessionExt; +use vortex_session::registry::FnRegistry; + +/// Session state for pluggable parent-reduce dispatch keyed by `(parent_id, child_id)`. +#[derive(Debug, Default)] +pub struct OptimizerSession { + registry: FnRegistry, +} + +impl OptimizerSession { + /// Create an empty session with no rules registered. + pub fn empty() -> Self { + Self::default() + } + + /// Access the underlying registry for direct registration and lookup. + pub fn registry(&self) -> &FnRegistry { + &self.registry + } +} + +/// Extension trait for accessing the optimizer registry from a Vortex session. +pub trait OptimizerSessionExt: SessionExt { + /// Returns the optimizer session variable. + fn optimizer(&self) -> Ref<'_, OptimizerSession> { + self.get::() + } +} +impl OptimizerSessionExt for S {} From c9796b4779c6c67be73a250dc19a2cbd1b42b3ae Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 22 Apr 2026 12:57:49 -0400 Subject: [PATCH 03/12] more Signed-off-by: Robert Kruszewski --- vortex-array/public-api.lock | 6 +++--- .../src/arrays/extension/compute/rules.rs | 12 +++++++++++- vortex-array/src/optimizer/mod.rs | 19 ++++++++++++------- vortex-duckdb/src/datasource.rs | 2 +- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index f9b4c7f4506..b9bc9ac4753 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -13392,7 +13392,7 @@ pub fn vortex_array::optimizer::ArrayOptimizer::optimize(&self) -> vortex_error: pub fn vortex_array::optimizer::ArrayOptimizer::optimize_ctx(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult -pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self) -> vortex_error::VortexResult +pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef @@ -13400,7 +13400,7 @@ pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult vortex_error::VortexResult -pub fn vortex_array::ArrayRef::optimize_recursive(&self) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_recursive(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub type vortex_array::optimizer::ReduceParentFn = fn(child: &vortex_array::ArrayRef, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> @@ -22362,7 +22362,7 @@ pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult vortex_error::VortexResult -pub fn vortex_array::ArrayRef::optimize_recursive(&self) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_recursive(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult impl vortex_array::scalar_fn::ReduceNode for vortex_array::ArrayRef diff --git a/vortex-array/src/arrays/extension/compute/rules.rs b/vortex-array/src/arrays/extension/compute/rules.rs index b6e2d5a1e06..2b643bf7de2 100644 --- a/vortex-array/src/arrays/extension/compute/rules.rs +++ b/vortex-array/src/arrays/extension/compute/rules.rs @@ -78,13 +78,17 @@ impl ArrayParentReduceRule for ExtensionFilterPushDownRule { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_mask::Mask; + use vortex_session::VortexSession; use crate::IntoArray; #[expect(deprecated)] use crate::ToCanonical as _; + use crate::VortexSessionExecute; use crate::arrays::Constant; use crate::arrays::ConstantArray; use crate::arrays::Extension; @@ -108,6 +112,10 @@ mod tests { use crate::scalar::ScalarValue; use crate::scalar_fn::fns::binary::Binary; use crate::scalar_fn::fns::operators::Operator; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] struct TestExt; @@ -220,7 +228,9 @@ mod tests { .try_new_array(3, Operator::Lt, [constant_ext, ext_array]) .unwrap(); - let optimized = scalar_fn_array.optimize_recursive().unwrap(); + let optimized = scalar_fn_array + .optimize_recursive(&SESSION.create_execution_ctx()) + .unwrap(); let scalar_fn = optimized.as_opt::().unwrap(); let children = scalar_fn.children(); let constant = children[0] diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 6aad759bb0b..f1659937dda 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -54,8 +54,13 @@ pub trait ArrayOptimizer { /// vtable rules. fn optimize_ctx(&self, ctx: &ExecutionCtx) -> VortexResult; - /// Optimize the entire array tree recursively (root and all descendants), static rules only. - fn optimize_recursive(&self) -> VortexResult; + /// Optimize the entire array tree recursively (root and all descendants). + /// + /// Consults the session's [`OptimizerSession`] registry for each parent/child pair + /// encountered during the recursive walk, so plugin-registered rules apply throughout the + /// tree. Requires a context unconditionally so the registry is always honored when a + /// recursive optimization is requested. + fn optimize_recursive(&self, ctx: &ExecutionCtx) -> VortexResult; } impl ArrayOptimizer for ArrayRef { @@ -67,8 +72,8 @@ impl ArrayOptimizer for ArrayRef { Ok(try_optimize(self, Some(ctx))?.unwrap_or_else(|| self.clone())) } - fn optimize_recursive(&self) -> VortexResult { - Ok(try_optimize_recursive(self)?.unwrap_or_else(|| self.clone())) + fn optimize_recursive(&self, ctx: &ExecutionCtx) -> VortexResult { + Ok(try_optimize_recursive(self, ctx)?.unwrap_or_else(|| self.clone())) } } @@ -142,11 +147,11 @@ fn try_optimize(array: &ArrayRef, ctx: Option<&ExecutionCtx>) -> VortexResult VortexResult> { +fn try_optimize_recursive(array: &ArrayRef, ctx: &ExecutionCtx) -> VortexResult> { let mut current_array = array.clone(); let mut any_optimizations = false; - if let Some(new_array) = try_optimize(¤t_array, None)? { + if let Some(new_array) = try_optimize(¤t_array, Some(ctx))? { current_array = new_array; any_optimizations = true; } @@ -156,7 +161,7 @@ fn try_optimize_recursive(array: &ArrayRef) -> VortexResult> { for slot in current_array.slots() { match slot { Some(child) => { - if let Some(new_child) = try_optimize_recursive(child)? { + if let Some(new_child) = try_optimize_recursive(child, ctx)? { new_slots.push(Some(new_child)); any_slot_optimized = true; } else { diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index c59fe6cbc2b..5fd5a51c182 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -400,7 +400,7 @@ impl TableFunction for T { return Ok(()); }; let (array_result, conversion_cache) = result?; - let array_result = array_result.optimize_recursive()?; + let array_result = array_result.optimize_recursive(&ctx)?; let array_result: StructArray = if let Some(array) = array_result.as_opt::() { From eddb1b176ca851845f69c460d5c1f63dce2bca46 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 23 Apr 2026 00:39:51 -0400 Subject: [PATCH 04/12] more Signed-off-by: Robert Kruszewski --- vortex-array/public-api.lock | 12 ++-- .../src/arrays/extension/compute/rules.rs | 4 +- vortex-array/src/executor.rs | 11 ++-- vortex-array/src/optimizer/mod.rs | 57 +++++++++++-------- vortex-duckdb/src/datasource.rs | 2 +- 5 files changed, 47 insertions(+), 39 deletions(-) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index b9bc9ac4753..f1d74b41be2 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -13390,17 +13390,17 @@ pub trait vortex_array::optimizer::ArrayOptimizer pub fn vortex_array::optimizer::ArrayOptimizer::optimize(&self) -> vortex_error::VortexResult -pub fn vortex_array::optimizer::ArrayOptimizer::optimize_ctx(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::optimizer::ArrayOptimizer::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult -pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult -pub fn vortex_array::ArrayRef::optimize_ctx(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult -pub fn vortex_array::ArrayRef::optimize_recursive(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult pub type vortex_array::optimizer::ReduceParentFn = fn(child: &vortex_array::ArrayRef, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> @@ -22360,9 +22360,9 @@ impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult -pub fn vortex_array::ArrayRef::optimize_ctx(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult -pub fn vortex_array::ArrayRef::optimize_recursive(&self, ctx: &vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult impl vortex_array::scalar_fn::ReduceNode for vortex_array::ArrayRef diff --git a/vortex-array/src/arrays/extension/compute/rules.rs b/vortex-array/src/arrays/extension/compute/rules.rs index 2b643bf7de2..35efe17db07 100644 --- a/vortex-array/src/arrays/extension/compute/rules.rs +++ b/vortex-array/src/arrays/extension/compute/rules.rs @@ -228,9 +228,7 @@ mod tests { .try_new_array(3, Operator::Lt, [constant_ext, ext_array]) .unwrap(); - let optimized = scalar_fn_array - .optimize_recursive(&SESSION.create_execution_ctx()) - .unwrap(); + let optimized = scalar_fn_array.optimize_recursive(&SESSION).unwrap(); let scalar_fn = optimized.as_opt::().unwrap(); let children = scalar_fn.children(); let constant = children[0] diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index cf99785a89a..1a2e569d8c3 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -106,6 +106,9 @@ impl ArrayRef { /// For safety, we will error when the number of execution iterations reaches a configurable /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`). pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { + // Clone the session up-front so each `optimize_ctx` call below borrows from this owned + // value rather than re-borrowing through `&mut ctx`. + let session = ctx.session().clone(); let mut current = self; let mut stack: Vec = Vec::new(); @@ -121,7 +124,7 @@ impl ArrayRef { return Ok(current); } Some(frame) => { - current = frame.put_back(current)?.optimize_ctx(ctx)?; + current = frame.put_back(current)?.optimize_ctx(&session)?; continue; } } @@ -137,9 +140,9 @@ impl ArrayRef { "execute_parent rewrote {} -> {}", current, rewritten )); - current = rewritten.optimize_ctx(ctx)?; + current = rewritten.optimize_ctx(&session)?; if let Some(frame) = stack.pop() { - current = frame.put_back(current)?.optimize_ctx(ctx)?; + current = frame.put_back(current)?.optimize_ctx(&session)?; } continue; } @@ -158,7 +161,7 @@ impl ArrayRef { )); let frame = StackFrame::new(parent, i, done, &child); stack.push(frame); - current = child.optimize_ctx(ctx)?; + current = child.optimize_ctx(&session)?; } ExecutionStep::Done => { ctx.log(format_args!("Done: {}", array)); diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index f1659937dda..fab44a52c5f 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -10,7 +10,7 @@ //! There are two entry points: //! //! * [`ArrayOptimizer::optimize`] — runs the static rules only (the child encoding's -//! `PARENT_RULES`). It does not require an execution context and is used by helpers like +//! `PARENT_RULES`). It does not require a [`VortexSession`] and is used by helpers like //! `ArrayBuiltins::cast` and `ArrayRef::slice` that build wrapped expressions and need them //! normalized inline. //! * [`ArrayOptimizer::optimize_ctx`] — runs the static rules and additionally consults the @@ -23,9 +23,9 @@ use std::sync::Arc; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_session::SessionExt; +use vortex_session::VortexSession; use crate::ArrayRef; -use crate::ExecutionCtx; use crate::optimizer::session::OptimizerSession; pub mod rules; @@ -49,18 +49,19 @@ pub trait ArrayOptimizer { /// inside the execute loop to also consult the session-scoped [`OptimizerSession`] registry. fn optimize(&self) -> VortexResult; - /// Like [`Self::optimize`], but additionally consults the session's [`OptimizerSession`] - /// registry for each `(parent_encoding_id, child_encoding_id)` pair before the static - /// vtable rules. - fn optimize_ctx(&self, ctx: &ExecutionCtx) -> VortexResult; + /// Like [`Self::optimize`], but additionally consults the [`OptimizerSession`] registered on + /// `session` for each `(parent_encoding_id, child_encoding_id)` pair before the static + /// vtable rules. If `session` does not have an [`OptimizerSession`] registered, falls + /// through to the static rules. + fn optimize_ctx(&self, session: &VortexSession) -> VortexResult; /// Optimize the entire array tree recursively (root and all descendants). /// - /// Consults the session's [`OptimizerSession`] registry for each parent/child pair + /// Consults the [`OptimizerSession`] registered on `session` for each parent/child pair /// encountered during the recursive walk, so plugin-registered rules apply throughout the - /// tree. Requires a context unconditionally so the registry is always honored when a - /// recursive optimization is requested. - fn optimize_recursive(&self, ctx: &ExecutionCtx) -> VortexResult; + /// tree. Requires a [`VortexSession`] unconditionally so the registry is always honored + /// when a recursive optimization is requested. + fn optimize_recursive(&self, session: &VortexSession) -> VortexResult; } impl ArrayOptimizer for ArrayRef { @@ -68,32 +69,35 @@ impl ArrayOptimizer for ArrayRef { Ok(try_optimize(self, None)?.unwrap_or_else(|| self.clone())) } - fn optimize_ctx(&self, ctx: &ExecutionCtx) -> VortexResult { - Ok(try_optimize(self, Some(ctx))?.unwrap_or_else(|| self.clone())) + fn optimize_ctx(&self, session: &VortexSession) -> VortexResult { + Ok(try_optimize(self, Some(session))?.unwrap_or_else(|| self.clone())) } - fn optimize_recursive(&self, ctx: &ExecutionCtx) -> VortexResult { - Ok(try_optimize_recursive(self, ctx)?.unwrap_or_else(|| self.clone())) + fn optimize_recursive(&self, session: &VortexSession) -> VortexResult { + Ok(try_optimize_recursive(self, session)?.unwrap_or_else(|| self.clone())) } } -/// Resolve a pluggable [`ReduceParentFn`] for `(parent, child)` from the session registry. +/// Resolve a pluggable [`ReduceParentFn`] for `(parent, child)` from `session`. /// /// Returns `None` when no [`OptimizerSession`] is registered, or no function is registered under -/// `(parent.encoding_id(), child.encoding_id())`. The returned `Arc` is owned so the caller is -/// free to drop the session borrow before invoking it. +/// `(parent.encoding_id(), child.encoding_id())`. The returned `Arc` is owned so the caller can +/// drop the session borrow before invoking it. fn plugin_reduce_parent( - ctx: &ExecutionCtx, + session: &VortexSession, parent: &ArrayRef, child: &ArrayRef, ) -> Option> { - ctx.session().get_opt::().and_then(|s| { + session.get_opt::().and_then(|s| { s.registry() .find::(parent.encoding_id(), child.encoding_id()) }) } -fn try_optimize(array: &ArrayRef, ctx: Option<&ExecutionCtx>) -> VortexResult> { +fn try_optimize( + array: &ArrayRef, + session: Option<&VortexSession>, +) -> VortexResult> { let mut current_array = array.clone(); let mut any_optimizations = false; @@ -117,8 +121,8 @@ fn try_optimize(array: &ArrayRef, ctx: Option<&ExecutionCtx>) -> VortexResult) -> VortexResult VortexResult> { +fn try_optimize_recursive( + array: &ArrayRef, + session: &VortexSession, +) -> VortexResult> { let mut current_array = array.clone(); let mut any_optimizations = false; - if let Some(new_array) = try_optimize(¤t_array, Some(ctx))? { + if let Some(new_array) = try_optimize(¤t_array, Some(session))? { current_array = new_array; any_optimizations = true; } @@ -161,7 +168,7 @@ fn try_optimize_recursive(array: &ArrayRef, ctx: &ExecutionCtx) -> VortexResult< for slot in current_array.slots() { match slot { Some(child) => { - if let Some(new_child) = try_optimize_recursive(child, ctx)? { + if let Some(new_child) = try_optimize_recursive(child, session)? { new_slots.push(Some(new_child)); any_slot_optimized = true; } else { diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 5fd5a51c182..15d7da9a34d 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -400,7 +400,7 @@ impl TableFunction for T { return Ok(()); }; let (array_result, conversion_cache) = result?; - let array_result = array_result.optimize_recursive(&ctx)?; + let array_result = array_result.optimize_recursive(ctx.session())?; let array_result: StructArray = if let Some(array) = array_result.as_opt::() { From 4a5cded2bdb589535a6419547446d2aa7e4754ab Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 23 Apr 2026 13:43:46 -0400 Subject: [PATCH 05/12] noclone Signed-off-by: Robert Kruszewski --- vortex-array/src/executor.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 1a2e569d8c3..9e102d550cb 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -108,7 +108,6 @@ impl ArrayRef { pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { // Clone the session up-front so each `optimize_ctx` call below borrows from this owned // value rather than re-borrowing through `&mut ctx`. - let session = ctx.session().clone(); let mut current = self; let mut stack: Vec = Vec::new(); @@ -124,7 +123,7 @@ impl ArrayRef { return Ok(current); } Some(frame) => { - current = frame.put_back(current)?.optimize_ctx(&session)?; + current = frame.put_back(current)?.optimize_ctx(ctx.session())?; continue; } } @@ -140,9 +139,9 @@ impl ArrayRef { "execute_parent rewrote {} -> {}", current, rewritten )); - current = rewritten.optimize_ctx(&session)?; + current = rewritten.optimize_ctx(ctx.session())?; if let Some(frame) = stack.pop() { - current = frame.put_back(current)?.optimize_ctx(&session)?; + current = frame.put_back(current)?.optimize_ctx(ctx.session())?; } continue; } @@ -161,7 +160,7 @@ impl ArrayRef { )); let frame = StackFrame::new(parent, i, done, &child); stack.push(frame); - current = child.optimize_ctx(&session)?; + current = child.optimize_ctx(ctx.session())?; } ExecutionStep::Done => { ctx.log(format_args!("Done: {}", array)); From 167b2bbedfbb27be822844ca748f8b46bb177faf Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 23 Apr 2026 17:00:59 -0400 Subject: [PATCH 06/12] fixes Signed-off-by: Robert Kruszewski --- Cargo.lock | 2 +- Cargo.toml | 2 +- .../src/arrays/extension/compute/rules.rs | 1 - vortex-array/src/optimizer/kernels.rs | 87 +++++++++++++++++++ vortex-array/src/optimizer/mod.rs | 34 +++----- vortex-array/src/optimizer/session.rs | 43 --------- vortex-session/Cargo.toml | 2 +- vortex-session/src/registry.rs | 48 ++++++---- vortex/src/lib.rs | 4 +- 9 files changed, 136 insertions(+), 87 deletions(-) create mode 100644 vortex-array/src/optimizer/kernels.rs delete mode 100644 vortex-array/src/optimizer/session.rs diff --git a/Cargo.lock b/Cargo.lock index 069aa68ebab..961d650d6ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10814,7 +10814,7 @@ dependencies = [ name = "vortex-session" version = "0.1.0" dependencies = [ - "arcref", + "arc-swap", "dashmap", "lasso", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 92e08f1338f..4ddcfbe3d43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,7 +89,7 @@ version = "0.1.0" aho-corasick = "1.1.3" anyhow = "1.0.97" arbitrary = "1.3.2" -arc-swap = "1.8" +arc-swap = "1.9" arcref = "0.2.0" arrow-arith = "58" arrow-array = "58" diff --git a/vortex-array/src/arrays/extension/compute/rules.rs b/vortex-array/src/arrays/extension/compute/rules.rs index 35efe17db07..d7c8469d1dc 100644 --- a/vortex-array/src/arrays/extension/compute/rules.rs +++ b/vortex-array/src/arrays/extension/compute/rules.rs @@ -88,7 +88,6 @@ mod tests { use crate::IntoArray; #[expect(deprecated)] use crate::ToCanonical as _; - use crate::VortexSessionExecute; use crate::arrays::Constant; use crate::arrays::ConstantArray; use crate::arrays::Extension; diff --git a/vortex-array/src/optimizer/kernels.rs b/vortex-array/src/optimizer/kernels.rs new file mode 100644 index 00000000000..3169c94669d --- /dev/null +++ b/vortex-array/src/optimizer/kernels.rs @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Session state for pluggable parent-reduce rules. +//! +//! [`ArrayKernels`] wraps an [`FnRegistry`] keyed by `(parent_encoding_id, child_encoding_id)` +//! and is consulted by the optimizer during execution, before the child encoding's static +//! `PARENT_RULES` are tried. Entries are typed as [`ReduceParentFn`](super::ReduceParentFn). +//! +//! The registry is empty by default. Downstream crates register `ReduceParentFn` values to add +//! new parent-reduce rules or override ones that the child encoding would otherwise run from its +//! static `PARENT_RULES`. + +use std::hash::BuildHasher; +use std::sync::Arc; +use std::sync::LazyLock; + +use vortex_error::VortexResult; +use vortex_session::Ref; +use vortex_session::SessionExt; +use vortex_session::registry::FnRegistry; +use vortex_session::registry::Id; +use vortex_utils::aliases::DefaultHashBuilder; + +use crate::ArrayRef; + +static FN_HASHER: LazyLock = LazyLock::new(DefaultHashBuilder::default); + +/// Pluggable parent-reduce function signature used by [`ArrayKernels`]. +/// +/// A function of this type rewrites the parent array that holds `child` at `child_idx`, given +/// the child itself and its parent. Returns `Ok(None)` when the function doesn't apply. +/// +/// Registered under `(parent_encoding_id, child_encoding_id)`; callers downcast the erased +/// `child`/`parent` to their expected types before applying logic. +pub type ReduceParentFn = + fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult>; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +enum FnKind { + Reduce, + ReduceParent, + ExecuteParent, + Execute, +} + +/// Session state for pluggable parent-reduce dispatch keyed by `(parent_id, child_id)`. +#[derive(Debug, Default)] +pub struct ArrayKernels { + registry: FnRegistry, +} + +impl ArrayKernels { + /// Create an empty session with no rules registered. + pub fn empty() -> Self { + Self::default() + } + + pub fn register_reduce_parent(&self, outer: Id, child: Id, f: ReduceParentFn) { + self.registry + .register(self.hash_fn_ids(outer, child, FnKind::ReduceParent), f) + } + + pub fn find_reduce_parent(&self, outer: Id, child: Id) -> Option> { + self.registry + .find(self.hash_fn_ids(outer, child, FnKind::ReduceParent)) + } + + pub fn contains_reduce_parent(&self, outer: Id, child: Id) -> bool { + self.registry + .contains(self.hash_fn_ids(outer, child, FnKind::ReduceParent)) + } + + fn hash_fn_ids(&self, outer: Id, child: Id, fn_kind: FnKind) -> u64 { + FN_HASHER.hash_one((outer, child, fn_kind)) + } +} + +/// Extension trait for accessing the optimizer registry from a Vortex session. +pub trait ArrayKernelsExt: SessionExt { + /// Returns the optimizer session variable. + fn kernels(&self) -> Ref<'_, ArrayKernels> { + self.get::() + } +} + +impl ArrayKernelsExt for S {} diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index fab44a52c5f..3bc8ce0f74d 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -14,7 +14,7 @@ //! `ArrayBuiltins::cast` and `ArrayRef::slice` that build wrapped expressions and need them //! normalized inline. //! * [`ArrayOptimizer::optimize_ctx`] — runs the static rules and additionally consults the -//! session's [`OptimizerSession`] registry keyed by `(parent_encoding_id, child_encoding_id)` +//! session's [`ArrayKernels`] registry keyed by `(parent_encoding_id, child_encoding_id)` //! before each `reduce_parent` step. The execute loop calls this entry point so plugin- //! registered parent-reduce rules fire during execution. @@ -26,38 +26,29 @@ use vortex_session::SessionExt; use vortex_session::VortexSession; use crate::ArrayRef; -use crate::optimizer::session::OptimizerSession; +use crate::optimizer::kernels::ArrayKernels; +use crate::optimizer::kernels::ReduceParentFn; +pub mod kernels; pub mod rules; -pub mod session; - -/// Pluggable parent-reduce function signature used by [`OptimizerSession`]. -/// -/// A function of this type rewrites the parent array that holds `child` at `child_idx`, given -/// the child itself and its parent. Returns `Ok(None)` when the function doesn't apply. -/// -/// Registered under `(parent_encoding_id, child_encoding_id)`; callers downcast the erased -/// `child`/`parent` to their expected types before applying logic. -pub type ReduceParentFn = - fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult>; /// Extension trait for optimizing array trees using reduce/reduce_parent rules. pub trait ArrayOptimizer { /// Optimize the root array node by running reduce and reduce_parent rules to fixpoint. /// /// Uses only the child encoding's static `PARENT_RULES`. Use [`Self::optimize_ctx`] from - /// inside the execute loop to also consult the session-scoped [`OptimizerSession`] registry. + /// inside the execute loop to also consult the session-scoped [`ArrayKernels`] registry. fn optimize(&self) -> VortexResult; - /// Like [`Self::optimize`], but additionally consults the [`OptimizerSession`] registered on + /// Like [`Self::optimize`], but additionally consults the [`ArrayKernels`] registered on /// `session` for each `(parent_encoding_id, child_encoding_id)` pair before the static - /// vtable rules. If `session` does not have an [`OptimizerSession`] registered, falls + /// vtable rules. If `session` does not have an [`ArrayKernels`] registered, falls /// through to the static rules. fn optimize_ctx(&self, session: &VortexSession) -> VortexResult; /// Optimize the entire array tree recursively (root and all descendants). /// - /// Consults the [`OptimizerSession`] registered on `session` for each parent/child pair + /// Consults the [`ArrayKernels`] registered on `session` for each parent/child pair /// encountered during the recursive walk, so plugin-registered rules apply throughout the /// tree. Requires a [`VortexSession`] unconditionally so the registry is always honored /// when a recursive optimization is requested. @@ -80,7 +71,7 @@ impl ArrayOptimizer for ArrayRef { /// Resolve a pluggable [`ReduceParentFn`] for `(parent, child)` from `session`. /// -/// Returns `None` when no [`OptimizerSession`] is registered, or no function is registered under +/// Returns `None` when no [`ArrayKernels`] is registered, or no function is registered under /// `(parent.encoding_id(), child.encoding_id())`. The returned `Arc` is owned so the caller can /// drop the session borrow before invoking it. fn plugin_reduce_parent( @@ -88,10 +79,9 @@ fn plugin_reduce_parent( parent: &ArrayRef, child: &ArrayRef, ) -> Option> { - session.get_opt::().and_then(|s| { - s.registry() - .find::(parent.encoding_id(), child.encoding_id()) - }) + session + .get_opt::() + .and_then(|s| s.find_reduce_parent(parent.encoding_id(), child.encoding_id())) } fn try_optimize( diff --git a/vortex-array/src/optimizer/session.rs b/vortex-array/src/optimizer/session.rs deleted file mode 100644 index b9f6e684dc9..00000000000 --- a/vortex-array/src/optimizer/session.rs +++ /dev/null @@ -1,43 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! Session state for pluggable parent-reduce rules. -//! -//! [`OptimizerSession`] wraps an [`FnRegistry`] keyed by `(parent_encoding_id, child_encoding_id)` -//! and is consulted by the optimizer during execution, before the child encoding's static -//! `PARENT_RULES` are tried. Entries are typed as [`ReduceParentFn`](super::ReduceParentFn). -//! -//! The registry is empty by default. Downstream crates register `ReduceParentFn` values to add -//! new parent-reduce rules or override ones that the child encoding would otherwise run from its -//! static `PARENT_RULES`. - -use vortex_session::Ref; -use vortex_session::SessionExt; -use vortex_session::registry::FnRegistry; - -/// Session state for pluggable parent-reduce dispatch keyed by `(parent_id, child_id)`. -#[derive(Debug, Default)] -pub struct OptimizerSession { - registry: FnRegistry, -} - -impl OptimizerSession { - /// Create an empty session with no rules registered. - pub fn empty() -> Self { - Self::default() - } - - /// Access the underlying registry for direct registration and lookup. - pub fn registry(&self) -> &FnRegistry { - &self.registry - } -} - -/// Extension trait for accessing the optimizer registry from a Vortex session. -pub trait OptimizerSessionExt: SessionExt { - /// Returns the optimizer session variable. - fn optimizer(&self) -> Ref<'_, OptimizerSession> { - self.get::() - } -} -impl OptimizerSessionExt for S {} diff --git a/vortex-session/Cargo.toml b/vortex-session/Cargo.toml index 263c8c8500c..32d4d6de428 100644 --- a/vortex-session/Cargo.toml +++ b/vortex-session/Cargo.toml @@ -20,7 +20,7 @@ all-features = true workspace = true [dependencies] -arcref = { workspace = true } +arc-swap = { workspace = true } dashmap = { workspace = true } lasso = { workspace = true } parking_lot = { workspace = true } diff --git a/vortex-session/src/registry.rs b/vortex-session/src/registry.rs index 2f1745d3384..e64d57b7df4 100644 --- a/vortex-session/src/registry.rs +++ b/vortex-session/src/registry.rs @@ -10,16 +10,19 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use std::hash::Hash; use std::ops::Deref; use std::sync::Arc; use std::sync::LazyLock; use std::sync::OnceLock; +use arc_swap::ArcSwap; use lasso::Spur; use lasso::ThreadedRodeo; use parking_lot::RwLock; use vortex_error::VortexExpect; use vortex_utils::aliases::dash_map::DashMap; +use vortex_utils::aliases::hash_map::HashMap; /// Global string interner for [`Id`] values. static INTERNER: LazyLock = LazyLock::new(ThreadedRodeo::new); @@ -299,8 +302,8 @@ impl Context { /// optimizer's parent-reduce registry keys by `(parent_encoding_id, child_encoding_id)` so that /// downstream crates can override the rule that would normally run from the child encoding's /// static `PARENT_RULES` set. -#[derive(Clone, Debug, Default)] -pub struct FnRegistry(Arc>>); +#[derive(Debug, Default)] +pub struct FnRegistry(ArcSwap>>); impl FnRegistry { /// Create a new, empty registry. @@ -309,26 +312,35 @@ impl FnRegistry { } /// Register a function under `(outer, inner)`, replacing any existing entry. - pub fn register(&self, outer: Id, inner: Id, f: F) { - self.0.insert((outer, inner), Arc::new(f)); + pub fn register(&self, id: u64, f: F) { + let registry = self.0.load(); + let mut owned_registry = registry.as_ref().clone(); + owned_registry.insert(id, Arc::new(f)); + self.0.store(Arc::new(owned_registry)); } /// Look up a function registered under `(outer, inner)`, downcasting to `F`. /// /// Returns `None` if no function is registered, or if the registered value is not of type `F`. - pub fn find(&self, outer: Id, inner: Id) -> Option> { - let entry = self.0.get(&(outer, inner))?; - Arc::clone(entry.value()).downcast::().ok() + pub fn find(&self, id: u64) -> Option> { + let map = self.0.load(); + let entry = map.get(&id)?; + Arc::clone(entry).downcast::().ok() } /// Return `true` if any function is registered under `(outer, inner)`. - pub fn contains(&self, outer: Id, inner: Id) -> bool { - self.0.contains_key(&(outer, inner)) + pub fn contains(&self, id: u64) -> bool { + let map = self.0.load(); + map.contains_key(&id) } } #[cfg(test)] mod fn_registry_tests { + use std::hash::BuildHasher; + + use vortex_utils::aliases::DefaultHashBuilder; + use super::FnRegistry; use super::Id; @@ -343,12 +355,13 @@ mod fn_registry_tests { let registry = FnRegistry::default(); let outer = Id::new("test.double"); let inner = Id::new("test.int"); + let id = DefaultHashBuilder::default().hash_one((outer, inner)); - assert!(!registry.contains(outer, inner)); - registry.register::(outer, inner, double); + assert!(!registry.contains(id)); + registry.register::(id, double); - assert!(registry.contains(outer, inner)); - let f = registry.find::(outer, inner).unwrap(); + assert!(registry.contains(id)); + let f = registry.find::(id).unwrap(); assert_eq!(f(21), 42); } @@ -357,10 +370,12 @@ mod fn_registry_tests { let registry = FnRegistry::default(); let outer = Id::new("test.double"); let inner = Id::new("test.int"); - registry.register::(outer, inner, double); + let id = DefaultHashBuilder::default().hash_one((outer, inner)); + + registry.register::(id, double); type OtherFn = fn(i32) -> i32; - assert!(registry.find::(outer, inner).is_none()); + assert!(registry.find::(id).is_none()); } #[test] @@ -368,6 +383,7 @@ mod fn_registry_tests { let registry = FnRegistry::default(); let outer = Id::new("test.missing"); let inner = Id::new("test.int"); - assert!(registry.find::(outer, inner).is_none()); + let id = DefaultHashBuilder::default().hash_one((outer, inner)); + assert!(registry.find::(id).is_none()); } } diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 1529190b3e4..ebed5df1187 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -12,7 +12,7 @@ use vortex_array::dtype::session::DTypeSession; // vortex::expr is in the process of having its dependencies inverted, and will eventually be // pulled back out into a vortex_expr crate. pub use vortex_array::expr; -use vortex_array::optimizer::session::OptimizerSession; +use vortex_array::optimizer::kernels::ArrayKernels; pub use vortex_array::scalar_fn; use vortex_array::scalar_fn::session::ScalarFnSession; use vortex_array::session::ArraySession; @@ -166,7 +166,7 @@ impl VortexSessionDefault for VortexSession { .with::() .with::() .with::() - .with::() + .with::() .with::() .with::(); From c91631fd03820c9a4b2c644ef1cce362d4a09387 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 23 Apr 2026 17:14:34 -0400 Subject: [PATCH 07/12] docs Signed-off-by: Robert Kruszewski --- vortex-array/src/optimizer/kernels.rs | 51 +++++++++++++++++++++------ vortex-array/src/optimizer/mod.rs | 19 +++++----- vortex-session/src/registry.rs | 26 ++++++++------ 3 files changed, 67 insertions(+), 29 deletions(-) diff --git a/vortex-array/src/optimizer/kernels.rs b/vortex-array/src/optimizer/kernels.rs index 3169c94669d..09fb4d843c2 100644 --- a/vortex-array/src/optimizer/kernels.rs +++ b/vortex-array/src/optimizer/kernels.rs @@ -1,15 +1,17 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! Session state for pluggable parent-reduce rules. +//! Session-scoped registry of pluggable array kernels. //! -//! [`ArrayKernels`] wraps an [`FnRegistry`] keyed by `(parent_encoding_id, child_encoding_id)` -//! and is consulted by the optimizer during execution, before the child encoding's static -//! `PARENT_RULES` are tried. Entries are typed as [`ReduceParentFn`](super::ReduceParentFn). +//! [`ArrayKernels`] is a [`VortexSession`](vortex_session::VortexSession) variable that holds an +//! [`FnRegistry`] of kernel function pointers keyed by the identities of the arrays they operate +//! on. It is consulted by the optimizer during execution — before the child encoding's static +//! `PARENT_RULES` — so that a plugin can add a new rule or override a built-in without touching +//! the encoding's vtable. Entries for the parent-reduce kind are typed as [`ReduceParentFn`]. //! -//! The registry is empty by default. Downstream crates register `ReduceParentFn` values to add -//! new parent-reduce rules or override ones that the child encoding would otherwise run from its -//! static `PARENT_RULES`. +//! The registry is empty by default. Downstream crates obtain [`ArrayKernels`] via +//! [`ArrayKernelsExt::kernels`] and register kernel function pointers through the typed helpers +//! like [`ArrayKernels::register_reduce_parent`]. use std::hash::BuildHasher; use std::sync::Arc; @@ -24,6 +26,9 @@ use vortex_utils::aliases::DefaultHashBuilder; use crate::ArrayRef; +/// Shared hasher used to combine `(outer, child, FnKind)` tuples into `FnRegistry` keys. A single +/// global instance is cheap and ensures all callers produce the same hash for the same tuple, so +/// lookups succeed across modules. static FN_HASHER: LazyLock = LazyLock::new(DefaultHashBuilder::default); /// Pluggable parent-reduce function signature used by [`ArrayKernels`]. @@ -36,6 +41,9 @@ static FN_HASHER: LazyLock = LazyLock::new(DefaultHashBuilde pub type ReduceParentFn = fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult>; +/// Kind tag mixed into a registry key so that the same `(outer, child)` encoding pair can hold +/// different kernel kinds (parent-reduce, reduce, execute, etc.) in one shared [`FnRegistry`] +/// without collisions. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] enum FnKind { Reduce, @@ -44,41 +52,62 @@ enum FnKind { Execute, } -/// Session state for pluggable parent-reduce dispatch keyed by `(parent_id, child_id)`. +/// Session-scoped registry of pluggable kernel function pointers. +/// +/// Entries are keyed by an `(outer_id, child_id, FnKind)` tuple that the typed `register_*` and +/// `find_*` helpers hash into the underlying [`FnRegistry`] key. Callers should always go through +/// the typed helpers rather than the raw registry so the hash scheme stays consistent. #[derive(Debug, Default)] pub struct ArrayKernels { registry: FnRegistry, } impl ArrayKernels { - /// Create an empty session with no rules registered. + /// Create an empty [`ArrayKernels`] with no kernels registered. pub fn empty() -> Self { Self::default() } + /// Register a [`ReduceParentFn`] for `(outer, child)`, replacing any previously registered + /// entry for the same pair. + /// + /// The optimizer will invoke `f` when it sees a parent with encoding id `outer` holding a + /// child with encoding id `child` during a `reduce_parent` step, before trying the child + /// encoding's static `PARENT_RULES`. `outer` is typically the parent's encoding id — for a + /// `ScalarFnArray`, this is the scalar function's id (e.g. `Cast.id()`). pub fn register_reduce_parent(&self, outer: Id, child: Id, f: ReduceParentFn) { self.registry .register(self.hash_fn_ids(outer, child, FnKind::ReduceParent), f) } + /// Look up the [`ReduceParentFn`] registered for `(outer, child)`. + /// + /// Returns `None` when no function is registered for this pair. The returned `Arc` is + /// owned, so callers can drop the borrow on this [`ArrayKernels`] before invoking the + /// function. pub fn find_reduce_parent(&self, outer: Id, child: Id) -> Option> { self.registry .find(self.hash_fn_ids(outer, child, FnKind::ReduceParent)) } + /// Return `true` if a [`ReduceParentFn`] is registered for `(outer, child)`. pub fn contains_reduce_parent(&self, outer: Id, child: Id) -> bool { self.registry .contains(self.hash_fn_ids(outer, child, FnKind::ReduceParent)) } + /// Combine a `(outer, child, fn_kind)` tuple into the `u64` key expected by the underlying + /// [`FnRegistry`]. Using the shared [`FN_HASHER`] guarantees register and find produce the + /// same key for the same logical pair. fn hash_fn_ids(&self, outer: Id, child: Id, fn_kind: FnKind) -> u64 { FN_HASHER.hash_one((outer, child, fn_kind)) } } -/// Extension trait for accessing the optimizer registry from a Vortex session. +/// Extension trait for accessing [`ArrayKernels`] from a [`VortexSession`](vortex_session::VortexSession). pub trait ArrayKernelsExt: SessionExt { - /// Returns the optimizer session variable. + /// Returns the [`ArrayKernels`] session variable, inserting a default-constructed one if + /// none has been registered on the session yet. fn kernels(&self) -> Ref<'_, ArrayKernels> { self.get::() } diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 3bc8ce0f74d..02029439703 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -7,16 +7,19 @@ //! Optimization runs between execution steps, which is what enables cross-step optimizations: //! after a child is decoded, new `reduce_parent` rules may match that were previously blocked. //! -//! There are two entry points: +//! There are three entry points, all on the [`ArrayOptimizer`] trait: //! //! * [`ArrayOptimizer::optimize`] — runs the static rules only (the child encoding's //! `PARENT_RULES`). It does not require a [`VortexSession`] and is used by helpers like //! `ArrayBuiltins::cast` and `ArrayRef::slice` that build wrapped expressions and need them //! normalized inline. //! * [`ArrayOptimizer::optimize_ctx`] — runs the static rules and additionally consults the -//! session's [`ArrayKernels`] registry keyed by `(parent_encoding_id, child_encoding_id)` -//! before each `reduce_parent` step. The execute loop calls this entry point so plugin- -//! registered parent-reduce rules fire during execution. +//! [`ArrayKernels`] registered on the session for each +//! `(parent_encoding_id, child_encoding_id)` pair, before each `reduce_parent` step. The +//! execute loop calls this entry point so plugin-registered parent-reduce rules fire during +//! execution. +//! * [`ArrayOptimizer::optimize_recursive`] — like [`ArrayOptimizer::optimize_ctx`], but also +//! descends into every child of the array so the whole tree is driven to fixpoint. use std::sync::Arc; @@ -69,11 +72,11 @@ impl ArrayOptimizer for ArrayRef { } } -/// Resolve a pluggable [`ReduceParentFn`] for `(parent, child)` from `session`. +/// Resolve a pluggable [`ReduceParentFn`] for the `(parent, child)` pair on `session`. /// -/// Returns `None` when no [`ArrayKernels`] is registered, or no function is registered under -/// `(parent.encoding_id(), child.encoding_id())`. The returned `Arc` is owned so the caller can -/// drop the session borrow before invoking it. +/// Returns `None` when `session` has no [`ArrayKernels`] variable, or when no function is +/// registered for `(parent.encoding_id(), child.encoding_id())`. The returned `Arc` is owned so +/// the caller can drop the [`ArrayKernels`] borrow before invoking it. fn plugin_reduce_parent( session: &VortexSession, parent: &ArrayRef, diff --git a/vortex-session/src/registry.rs b/vortex-session/src/registry.rs index e64d57b7df4..6b0d9fbd414 100644 --- a/vortex-session/src/registry.rs +++ b/vortex-session/src/registry.rs @@ -292,16 +292,21 @@ impl Context { } } -/// A registry of type-erased function values keyed by a pair of [`Id`] values. +/// A registry of type-erased function values keyed by a pre-computed `u64` identifier. /// /// Each entry stores an `Arc` wrapping a caller-supplied concrete type `F` -/// (typically a function pointer). Callers recover the original type by passing the same `F` to +/// — typically a function pointer. Callers recover the original type by passing the same `F` to /// [`FnRegistry::find`]; `find` returns `Some(Arc)` on a type match and `None` otherwise. /// -/// Used for pluggable dispatch keyed by an `(outer, inner)` identifier pair — for example the -/// optimizer's parent-reduce registry keys by `(parent_encoding_id, child_encoding_id)` so that -/// downstream crates can override the rule that would normally run from the child encoding's -/// static `PARENT_RULES` set. +/// The `u64` key is produced by the caller, usually by hashing a tuple that uniquely identifies +/// the registered function. This lets each consumer choose its own key layout rather than +/// committing this registry to a specific schema. For example, `ArrayKernels` hashes +/// `(parent_encoding_id, child_encoding_id, FnKind)` so one registry can hold several kinds of +/// pluggable kernel keyed by encoding pairs without collisions. +/// +/// The registry is thread-safe and lock-free on the read path — the underlying map is held +/// behind an [`ArcSwap`], so `find`/`contains` never block. Writes copy-on-write the map, so +/// concurrent registration is serialized but does not block readers. #[derive(Debug, Default)] pub struct FnRegistry(ArcSwap>>); @@ -311,7 +316,7 @@ impl FnRegistry { Self::default() } - /// Register a function under `(outer, inner)`, replacing any existing entry. + /// Register a function under `id`, replacing any existing entry with the same key. pub fn register(&self, id: u64, f: F) { let registry = self.0.load(); let mut owned_registry = registry.as_ref().clone(); @@ -319,16 +324,17 @@ impl FnRegistry { self.0.store(Arc::new(owned_registry)); } - /// Look up a function registered under `(outer, inner)`, downcasting to `F`. + /// Look up a function registered under `id`, downcasting to `F`. /// - /// Returns `None` if no function is registered, or if the registered value is not of type `F`. + /// Returns `None` if no function is registered for `id`, or if the registered value is not + /// of type `F`. pub fn find(&self, id: u64) -> Option> { let map = self.0.load(); let entry = map.get(&id)?; Arc::clone(entry).downcast::().ok() } - /// Return `true` if any function is registered under `(outer, inner)`. + /// Return `true` if any function is registered under `id`, regardless of type. pub fn contains(&self, id: u64) -> bool { let map = self.0.load(); map.contains_key(&id) From 85fedd2238edd28a9627024a1739e24471665803 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 23 Apr 2026 17:21:15 -0400 Subject: [PATCH 08/12] fixes Signed-off-by: Robert Kruszewski --- vortex-array/public-api.lock | 60 ++++++++++++++------------- vortex-array/src/optimizer/kernels.rs | 1 + vortex-session/public-api.lock | 10 ++--- 3 files changed, 36 insertions(+), 35 deletions(-) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index f1d74b41be2..58d978fd878 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -13222,6 +13222,38 @@ pub vortex_array::normalize::NormalizeOptions::operation: vortex_array::normaliz pub mod vortex_array::optimizer +pub mod vortex_array::optimizer::kernels + +pub struct vortex_array::optimizer::kernels::ArrayKernels + +impl vortex_array::optimizer::kernels::ArrayKernels + +pub fn vortex_array::optimizer::kernels::ArrayKernels::contains_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id) -> bool + +pub fn vortex_array::optimizer::kernels::ArrayKernels::empty() -> Self + +pub fn vortex_array::optimizer::kernels::ArrayKernels::find_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id) -> core::option::Option> + +pub fn vortex_array::optimizer::kernels::ArrayKernels::register_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id, f: vortex_array::optimizer::kernels::ReduceParentFn) + +impl core::default::Default for vortex_array::optimizer::kernels::ArrayKernels + +pub fn vortex_array::optimizer::kernels::ArrayKernels::default() -> vortex_array::optimizer::kernels::ArrayKernels + +impl core::fmt::Debug for vortex_array::optimizer::kernels::ArrayKernels + +pub fn vortex_array::optimizer::kernels::ArrayKernels::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub trait vortex_array::optimizer::kernels::ArrayKernelsExt: vortex_session::SessionExt + +pub fn vortex_array::optimizer::kernels::ArrayKernelsExt::kernels(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::kernels::ArrayKernels> + +impl vortex_array::optimizer::kernels::ArrayKernelsExt for S + +pub fn S::kernels(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::kernels::ArrayKernels> + +pub type vortex_array::optimizer::kernels::ReduceParentFn = fn(child: &vortex_array::ArrayRef, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> + pub mod vortex_array::optimizer::rules pub struct vortex_array::optimizer::rules::ParentReduceRuleAdapter @@ -13360,32 +13392,6 @@ pub fn vortex_array::optimizer::rules::ParentReduceRuleAdapter::matches(&s pub fn vortex_array::optimizer::rules::ParentReduceRuleAdapter::reduce_parent(&self, child: vortex_array::ArrayView<'_, V>, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> -pub mod vortex_array::optimizer::session - -pub struct vortex_array::optimizer::session::OptimizerSession - -impl vortex_array::optimizer::session::OptimizerSession - -pub fn vortex_array::optimizer::session::OptimizerSession::empty() -> Self - -pub fn vortex_array::optimizer::session::OptimizerSession::registry(&self) -> &vortex_session::registry::FnRegistry - -impl core::default::Default for vortex_array::optimizer::session::OptimizerSession - -pub fn vortex_array::optimizer::session::OptimizerSession::default() -> vortex_array::optimizer::session::OptimizerSession - -impl core::fmt::Debug for vortex_array::optimizer::session::OptimizerSession - -pub fn vortex_array::optimizer::session::OptimizerSession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result - -pub trait vortex_array::optimizer::session::OptimizerSessionExt: vortex_session::SessionExt - -pub fn vortex_array::optimizer::session::OptimizerSessionExt::optimizer(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::session::OptimizerSession> - -impl vortex_array::optimizer::session::OptimizerSessionExt for S - -pub fn S::optimizer(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::session::OptimizerSession> - pub trait vortex_array::optimizer::ArrayOptimizer pub fn vortex_array::optimizer::ArrayOptimizer::optimize(&self) -> vortex_error::VortexResult @@ -13402,8 +13408,6 @@ pub fn vortex_array::ArrayRef::optimize_ctx(&self, session: &vortex_session::Vor pub fn vortex_array::ArrayRef::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult -pub type vortex_array::optimizer::ReduceParentFn = fn(child: &vortex_array::ArrayRef, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> - pub mod vortex_array::patches pub struct vortex_array::patches::Patches diff --git a/vortex-array/src/optimizer/kernels.rs b/vortex-array/src/optimizer/kernels.rs index 09fb4d843c2..c30bcd01344 100644 --- a/vortex-array/src/optimizer/kernels.rs +++ b/vortex-array/src/optimizer/kernels.rs @@ -45,6 +45,7 @@ pub type ReduceParentFn = /// different kernel kinds (parent-reduce, reduce, execute, etc.) in one shared [`FnRegistry`] /// without collisions. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[expect(unused)] enum FnKind { Reduce, ReduceParent, diff --git a/vortex-session/public-api.lock b/vortex-session/public-api.lock index 93d51f87804..c7f1d1d950b 100644 --- a/vortex-session/public-api.lock +++ b/vortex-session/public-api.lock @@ -44,17 +44,13 @@ pub struct vortex_session::registry::FnRegistry(_) impl vortex_session::registry::FnRegistry -pub fn vortex_session::registry::FnRegistry::contains(&self, outer: vortex_session::registry::Id, inner: vortex_session::registry::Id) -> bool +pub fn vortex_session::registry::FnRegistry::contains(&self, id: u64) -> bool pub fn vortex_session::registry::FnRegistry::empty() -> Self -pub fn vortex_session::registry::FnRegistry::find(&self, outer: vortex_session::registry::Id, inner: vortex_session::registry::Id) -> core::option::Option> +pub fn vortex_session::registry::FnRegistry::find(&self, id: u64) -> core::option::Option> -pub fn vortex_session::registry::FnRegistry::register(&self, outer: vortex_session::registry::Id, inner: vortex_session::registry::Id, f: F) - -impl core::clone::Clone for vortex_session::registry::FnRegistry - -pub fn vortex_session::registry::FnRegistry::clone(&self) -> vortex_session::registry::FnRegistry +pub fn vortex_session::registry::FnRegistry::register(&self, id: u64, f: F) impl core::default::Default for vortex_session::registry::FnRegistry From 56894e0d486c024aa550c17220ff1aa472f63947 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 23 Apr 2026 17:30:09 -0400 Subject: [PATCH 09/12] comment Signed-off-by: Robert Kruszewski --- vortex-array/src/executor.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 9e102d550cb..11613f7405a 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -106,8 +106,6 @@ impl ArrayRef { /// For safety, we will error when the number of execution iterations reaches a configurable /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`). pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { - // Clone the session up-front so each `optimize_ctx` call below borrows from this owned - // value rather than re-borrowing through `&mut ctx`. let mut current = self; let mut stack: Vec = Vec::new(); From 73e251f0be9d9569e8fcf26d3f5950f1cab02501 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 23 Apr 2026 17:43:13 -0400 Subject: [PATCH 10/12] simpler Signed-off-by: Robert Kruszewski --- vortex-array/src/executor.rs | 19 +++++--- vortex-array/src/optimizer/kernels.rs | 68 +++++++++++++-------------- vortex-array/src/optimizer/mod.rs | 46 ++++++++---------- vortex-session/src/registry.rs | 13 +++-- vortex/src/lib.rs | 3 +- 5 files changed, 75 insertions(+), 74 deletions(-) diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 11613f7405a..1ca27bc1de5 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -10,9 +10,10 @@ //! 3. **`execute_parent`** -- child-driven fused execution (may read buffers). //! 4. **`execute`** -- the encoding's own decode step (most expensive). //! -//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack +//! The main entry point is [`ArrayRef::execute_until`], which uses an explicit work stack //! to drive execution iteratively without recursion. Between steps, the optimizer runs -//! reduce/reduce_parent rules to fixpoint. +//! reduce/reduce_parent rules to fixpoint using the active [`ExecutionCtx`] session, so +//! session-registered optimizer kernels participate during execution. //! //! See for a full description //! of the model. @@ -88,17 +89,21 @@ impl ArrayRef { /// /// Each iteration proceeds through three steps in order: /// - /// 1. **Done / canonical check** — if `current` satisfies the active done predicate or is + /// 1. **Done / canonical check** - if `current` satisfies the active done predicate or is /// canonical, splice it back into the stacked parent (if any) and continue, or return. - /// 2. **`execute_parent` on children** — try each child's `execute_parent` against `current` + /// 2. **`execute_parent` on children** - try each child's `execute_parent` against `current` /// as the parent (e.g. `Filter(RunEnd)` → `FilterExecuteAdaptor` fires from RunEnd). /// If there is a stacked parent frame, the rewritten child is spliced back into it so /// that optimize and further `execute_parent` can fire on the reconstructed parent /// (e.g. `Slice(RunEnd)` → `RunEnd` spliced into stacked `Filter` → `Filter(RunEnd)` /// whose `FilterExecuteAdaptor` fires on the next iteration). - /// 3. **`execute`** — call the encoding's own execute step, which either returns `Done` or + /// 3. **`execute`** - call the encoding's own execute step, which either returns `Done` or /// `ExecuteSlot(i)` to push a child onto the stack for focused execution. /// + /// Optimizer calls in this loop use [`ExecutionCtx::session`], so kernels registered on the + /// session's [`ArrayKernels`](crate::optimizer::kernels::ArrayKernels) are visible between + /// execution steps. + /// /// Note: the returned array may not match `M`. If execution converges to a canonical form /// that does not match `M`, the canonical array is returned since no further execution /// progress is possible. @@ -110,7 +115,7 @@ impl ArrayRef { let mut stack: Vec = Vec::new(); for _ in 0..max_iterations() { - // Step 1: done / canonical — splice back into stacked parent or return. + // Step 1: done / canonical - splice back into stacked parent or return. let is_done = stack .last() .map_or(M::matches as DonePredicate, |frame| frame.done); @@ -523,7 +528,7 @@ macro_rules! require_child { /// execution of child `$idx`. /// /// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone -/// `$parent` — it is moved into the early-return path. +/// `$parent` - it is moved into the early-return path. /// /// ```ignore /// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive); diff --git a/vortex-array/src/optimizer/kernels.rs b/vortex-array/src/optimizer/kernels.rs index c30bcd01344..c0523f31af4 100644 --- a/vortex-array/src/optimizer/kernels.rs +++ b/vortex-array/src/optimizer/kernels.rs @@ -1,17 +1,21 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! Session-scoped registry of pluggable array kernels. +//! Session-scoped registry for optimizer kernels. //! -//! [`ArrayKernels`] is a [`VortexSession`](vortex_session::VortexSession) variable that holds an -//! [`FnRegistry`] of kernel function pointers keyed by the identities of the arrays they operate -//! on. It is consulted by the optimizer during execution — before the child encoding's static -//! `PARENT_RULES` — so that a plugin can add a new rule or override a built-in without touching -//! the encoding's vtable. Entries for the parent-reduce kind are typed as [`ReduceParentFn`]. +//! [`ArrayKernels`] stores function pointers that participate in array optimization without +//! adding rules to an encoding vtable. The optimizer currently consults it for parent-reduce +//! rewrites before the child encoding's static `PARENT_RULES`. A registered function can +//! therefore add a rule for an extension encoding or take precedence over a built-in rule. //! -//! The registry is empty by default. Downstream crates obtain [`ArrayKernels`] via -//! [`ArrayKernelsExt::kernels`] and register kernel function pointers through the typed helpers -//! like [`ArrayKernels::register_reduce_parent`]. +//! Kernel entries are addressed by `(outer_id, child_id, kind)`. For parent-reduce kernels, +//! `outer_id` is the id returned by the parent array's `encoding_id()` and `child_id` is the +//! child array's `encoding_id()`. For [`ScalarFn`](crate::arrays::ScalarFn) parents, the parent +//! id is the scalar function id. +//! +//! Sessions created by the top-level `vortex` crate install an empty registry by default. Other +//! sessions can add it with [`VortexSession::with`](vortex_session::VortexSession::with) or rely +//! on [`ArrayKernelsExt::kernels`] to insert the default value. use std::hash::BuildHasher; use std::sync::Arc; @@ -26,24 +30,21 @@ use vortex_utils::aliases::DefaultHashBuilder; use crate::ArrayRef; -/// Shared hasher used to combine `(outer, child, FnKind)` tuples into `FnRegistry` keys. A single -/// global instance is cheap and ensures all callers produce the same hash for the same tuple, so -/// lookups succeed across modules. +/// Shared hasher used to combine `(outer, child, FnKind)` tuples into [`FnRegistry`] keys. static FN_HASHER: LazyLock = LazyLock::new(DefaultHashBuilder::default); -/// Pluggable parent-reduce function signature used by [`ArrayKernels`]. +/// Function pointer for a plugin-provided parent-reduce rewrite. /// -/// A function of this type rewrites the parent array that holds `child` at `child_idx`, given -/// the child itself and its parent. Returns `Ok(None)` when the function doesn't apply. +/// The optimizer calls this with the matched `child`, its `parent`, and the slot index where the +/// child appears. Return `Ok(Some(new_parent))` to replace the parent, or `Ok(None)` when the +/// rewrite does not apply. /// -/// Registered under `(parent_encoding_id, child_encoding_id)`; callers downcast the erased -/// `child`/`parent` to their expected types before applying logic. +/// Implementations must preserve the parent's logical length and dtype, matching the invariant +/// required of static parent-reduce rules. pub type ReduceParentFn = fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult>; -/// Kind tag mixed into a registry key so that the same `(outer, child)` encoding pair can hold -/// different kernel kinds (parent-reduce, reduce, execute, etc.) in one shared [`FnRegistry`] -/// without collisions. +/// Disambiguates kernel kinds that share the same `(outer, child)` id pair. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[expect(unused)] enum FnKind { @@ -53,11 +54,10 @@ enum FnKind { Execute, } -/// Session-scoped registry of pluggable kernel function pointers. +/// Session-scoped registry of optimizer kernel functions. /// -/// Entries are keyed by an `(outer_id, child_id, FnKind)` tuple that the typed `register_*` and -/// `find_*` helpers hash into the underlying [`FnRegistry`] key. Callers should always go through -/// the typed helpers rather than the raw registry so the hash scheme stays consistent. +/// Use the typed `register_*`, `find_*`, and `contains_*` methods rather than depending on the +/// internal hash format. #[derive(Debug, Default)] pub struct ArrayKernels { registry: FnRegistry, @@ -69,13 +69,14 @@ impl ArrayKernels { Self::default() } - /// Register a [`ReduceParentFn`] for `(outer, child)`, replacing any previously registered - /// entry for the same pair. + /// Register a [`ReduceParentFn`] for `(outer, child)`. /// /// The optimizer will invoke `f` when it sees a parent with encoding id `outer` holding a /// child with encoding id `child` during a `reduce_parent` step, before trying the child - /// encoding's static `PARENT_RULES`. `outer` is typically the parent's encoding id — for a - /// `ScalarFnArray`, this is the scalar function's id (e.g. `Cast.id()`). + /// encoding's static `PARENT_RULES`. `outer` is usually the parent array's encoding id. For + /// `ScalarFnArray`, it is the scalar function id, for example `Cast.id()`. + /// + /// Replaces any function already registered for the same pair. pub fn register_reduce_parent(&self, outer: Id, child: Id, f: ReduceParentFn) { self.registry .register(self.hash_fn_ids(outer, child, FnKind::ReduceParent), f) @@ -83,8 +84,7 @@ impl ArrayKernels { /// Look up the [`ReduceParentFn`] registered for `(outer, child)`. /// - /// Returns `None` when no function is registered for this pair. The returned `Arc` is - /// owned, so callers can drop the borrow on this [`ArrayKernels`] before invoking the + /// Returns an owned [`Arc`] so the session-variable borrow can be dropped before invoking the /// function. pub fn find_reduce_parent(&self, outer: Id, child: Id) -> Option> { self.registry @@ -97,15 +97,15 @@ impl ArrayKernels { .contains(self.hash_fn_ids(outer, child, FnKind::ReduceParent)) } - /// Combine a `(outer, child, fn_kind)` tuple into the `u64` key expected by the underlying - /// [`FnRegistry`]. Using the shared [`FN_HASHER`] guarantees register and find produce the - /// same key for the same logical pair. + /// Combine a typed kernel id tuple into the `u64` key expected by the underlying + /// [`FnRegistry`]. All typed helpers use this path so registration and lookup agree. fn hash_fn_ids(&self, outer: Id, child: Id, fn_kind: FnKind) -> u64 { FN_HASHER.hash_one((outer, child, fn_kind)) } } -/// Extension trait for accessing [`ArrayKernels`] from a [`VortexSession`](vortex_session::VortexSession). +/// Extension trait for accessing optimizer kernels from a +/// [`VortexSession`](vortex_session::VortexSession). pub trait ArrayKernelsExt: SessionExt { /// Returns the [`ArrayKernels`] session variable, inserting a default-constructed one if /// none has been registered on the session yet. diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 02029439703..3058fc5a8f6 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -7,19 +7,13 @@ //! Optimization runs between execution steps, which is what enables cross-step optimizations: //! after a child is decoded, new `reduce_parent` rules may match that were previously blocked. //! -//! There are three entry points, all on the [`ArrayOptimizer`] trait: +//! There are three public entry points on [`ArrayOptimizer`]: //! -//! * [`ArrayOptimizer::optimize`] — runs the static rules only (the child encoding's -//! `PARENT_RULES`). It does not require a [`VortexSession`] and is used by helpers like -//! `ArrayBuiltins::cast` and `ArrayRef::slice` that build wrapped expressions and need them -//! normalized inline. -//! * [`ArrayOptimizer::optimize_ctx`] — runs the static rules and additionally consults the -//! [`ArrayKernels`] registered on the session for each -//! `(parent_encoding_id, child_encoding_id)` pair, before each `reduce_parent` step. The -//! execute loop calls this entry point so plugin-registered parent-reduce rules fire during -//! execution. -//! * [`ArrayOptimizer::optimize_recursive`] — like [`ArrayOptimizer::optimize_ctx`], but also -//! descends into every child of the array so the whole tree is driven to fixpoint. +//! - [`ArrayOptimizer::optimize`] uses only static rules registered on encoding vtables. +//! - [`ArrayOptimizer::optimize_ctx`] also consults session-scoped [`ArrayKernels`] before +//! static parent-reduce rules, so this is the entry point used by execution. +//! - [`ArrayOptimizer::optimize_recursive`] applies the session-aware optimizer to the root and +//! every descendant. use std::sync::Arc; @@ -39,22 +33,21 @@ pub mod rules; pub trait ArrayOptimizer { /// Optimize the root array node by running reduce and reduce_parent rules to fixpoint. /// - /// Uses only the child encoding's static `PARENT_RULES`. Use [`Self::optimize_ctx`] from - /// inside the execute loop to also consult the session-scoped [`ArrayKernels`] registry. + /// This uses only static rules registered on encoding vtables. Use [`Self::optimize_ctx`] + /// when session-registered [`ArrayKernels`] should participate. fn optimize(&self) -> VortexResult; - /// Like [`Self::optimize`], but additionally consults the [`ArrayKernels`] registered on - /// `session` for each `(parent_encoding_id, child_encoding_id)` pair before the static - /// vtable rules. If `session` does not have an [`ArrayKernels`] registered, falls - /// through to the static rules. + /// Optimize the root array node using static rules and any [`ArrayKernels`] on `session`. + /// + /// Session kernels are checked for each `(parent_encoding_id, child_encoding_id)` pair before + /// the child's static `PARENT_RULES`. If `session` does not contain [`ArrayKernels`], this + /// behaves like [`Self::optimize`]. fn optimize_ctx(&self, session: &VortexSession) -> VortexResult; /// Optimize the entire array tree recursively (root and all descendants). /// - /// Consults the [`ArrayKernels`] registered on `session` for each parent/child pair - /// encountered during the recursive walk, so plugin-registered rules apply throughout the - /// tree. Requires a [`VortexSession`] unconditionally so the registry is always honored - /// when a recursive optimization is requested. + /// This uses the same session-aware rule ordering as [`Self::optimize_ctx`] for every node in + /// the tree. fn optimize_recursive(&self, session: &VortexSession) -> VortexResult; } @@ -72,11 +65,10 @@ impl ArrayOptimizer for ArrayRef { } } -/// Resolve a pluggable [`ReduceParentFn`] for the `(parent, child)` pair on `session`. +/// Resolve a session-registered [`ReduceParentFn`] for the `(parent, child)` pair. /// -/// Returns `None` when `session` has no [`ArrayKernels`] variable, or when no function is -/// registered for `(parent.encoding_id(), child.encoding_id())`. The returned `Arc` is owned so -/// the caller can drop the [`ArrayKernels`] borrow before invoking it. +/// The returned [`Arc`] is owned so the caller can drop the [`ArrayKernels`] borrow before +/// invoking the function. fn plugin_reduce_parent( session: &VortexSession, parent: &ArrayRef, @@ -113,7 +105,7 @@ fn try_optimize( for (slot_idx, slot) in current_array.slots().iter().enumerate() { let Some(child) = slot else { continue }; - // Registry-based override: tried before the child encoding's static PARENT_RULES. + // Session kernels take precedence over the child encoding's static PARENT_RULES. if let Some(session) = session && let Some(plugin) = plugin_reduce_parent(session, ¤t_array, child) && let Some(new_array) = plugin(child, ¤t_array, slot_idx)? diff --git a/vortex-session/src/registry.rs b/vortex-session/src/registry.rs index 6b0d9fbd414..14a6f6ab26d 100644 --- a/vortex-session/src/registry.rs +++ b/vortex-session/src/registry.rs @@ -292,10 +292,10 @@ impl Context { } } -/// A registry of type-erased function values keyed by a pre-computed `u64` identifier. +/// Copy-on-write registry of type-erased function values keyed by caller-provided `u64` IDs. /// /// Each entry stores an `Arc` wrapping a caller-supplied concrete type `F` -/// — typically a function pointer. Callers recover the original type by passing the same `F` to +/// (typically a function pointer). Callers recover the original type by passing the same `F` to /// [`FnRegistry::find`]; `find` returns `Some(Arc)` on a type match and `None` otherwise. /// /// The `u64` key is produced by the caller, usually by hashing a tuple that uniquely identifies @@ -304,9 +304,10 @@ impl Context { /// `(parent_encoding_id, child_encoding_id, FnKind)` so one registry can hold several kinds of /// pluggable kernel keyed by encoding pairs without collisions. /// -/// The registry is thread-safe and lock-free on the read path — the underlying map is held -/// behind an [`ArcSwap`], so `find`/`contains` never block. Writes copy-on-write the map, so -/// concurrent registration is serialized but does not block readers. +/// This type is intended for read-heavy registries that are populated during session setup and +/// then shared. Reads load a snapshot through [`ArcSwap`] and do not block on writers. Writes +/// clone the current map and swap in a new snapshot, so externally serialize concurrent +/// [`FnRegistry::register`] calls if every write must be retained. #[derive(Debug, Default)] pub struct FnRegistry(ArcSwap>>); @@ -317,6 +318,8 @@ impl FnRegistry { } /// Register a function under `id`, replacing any existing entry with the same key. + /// + /// Intended to be called while constructing a session or under an external writer lock. pub fn register(&self, id: u64, f: F) { let registry = self.0.load(); let mut owned_registry = registry.as_ref().clone(); diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index ebed5df1187..ae803ee98ae 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -155,7 +155,8 @@ pub mod encodings { /// Extension trait to create a default Vortex session. pub trait VortexSessionDefault { - /// Creates a default Vortex session with the standard arrays, layouts, and expressions. + /// Creates a default Vortex session with standard arrays, layouts, scalar functions, + /// optimizer kernels, expressions, aggregate functions, and runtime support. fn default() -> VortexSession; } From a17c2e63561864b7efc3324a3ceb5fa95f6edc21 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 24 Apr 2026 14:44:25 -0400 Subject: [PATCH 11/12] move Signed-off-by: Robert Kruszewski --- Cargo.lock | 2 +- vortex-array/Cargo.toml | 1 + vortex-array/src/optimizer/kernels.rs | 54 ++++++------- vortex-array/src/optimizer/mod.rs | 15 ++-- vortex-session/Cargo.toml | 1 - vortex-session/src/registry.rs | 108 -------------------------- 6 files changed, 36 insertions(+), 145 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 961d650d6ab..efeaea1074e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10018,6 +10018,7 @@ name = "vortex-array" version = "0.1.0" dependencies = [ "arbitrary", + "arc-swap", "arcref", "arrow-arith 58.1.0", "arrow-array 58.1.0", @@ -10814,7 +10815,6 @@ dependencies = [ name = "vortex-session" version = "0.1.0" dependencies = [ - "arc-swap", "dashmap", "lasso", "parking_lot", diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index f9adbeb99db..f8676d76ef0 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -21,6 +21,7 @@ workspace = true [dependencies] arbitrary = { workspace = true, optional = true } +arc-swap = { workspace = true } arcref = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true, features = ["ffi"] } diff --git a/vortex-array/src/optimizer/kernels.rs b/vortex-array/src/optimizer/kernels.rs index c0523f31af4..af70d0f9b9b 100644 --- a/vortex-array/src/optimizer/kernels.rs +++ b/vortex-array/src/optimizer/kernels.rs @@ -21,12 +21,13 @@ use std::hash::BuildHasher; use std::sync::Arc; use std::sync::LazyLock; +use arc_swap::ArcSwap; use vortex_error::VortexResult; use vortex_session::Ref; use vortex_session::SessionExt; -use vortex_session::registry::FnRegistry; use vortex_session::registry::Id; use vortex_utils::aliases::DefaultHashBuilder; +use vortex_utils::aliases::hash_map::HashMap; use crate::ArrayRef; @@ -44,23 +45,10 @@ static FN_HASHER: LazyLock = LazyLock::new(DefaultHashBuilde pub type ReduceParentFn = fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult>; -/// Disambiguates kernel kinds that share the same `(outer, child)` id pair. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -#[expect(unused)] -enum FnKind { - Reduce, - ReduceParent, - ExecuteParent, - Execute, -} - /// Session-scoped registry of optimizer kernel functions. -/// -/// Use the typed `register_*`, `find_*`, and `contains_*` methods rather than depending on the -/// internal hash format. #[derive(Debug, Default)] pub struct ArrayKernels { - registry: FnRegistry, + reduce_parent: ArcSwap>>, } impl ArrayKernels { @@ -77,30 +65,38 @@ impl ArrayKernels { /// `ScalarFnArray`, it is the scalar function id, for example `Cast.id()`. /// /// Replaces any function already registered for the same pair. - pub fn register_reduce_parent(&self, outer: Id, child: Id, f: ReduceParentFn) { - self.registry - .register(self.hash_fn_ids(outer, child, FnKind::ReduceParent), f) + pub fn register_reduce_parent>( + &self, + parent: Id, + child: Id, + fns: I, + ) { + let registry = self.reduce_parent.load(); + let id = self.hash_fn_ids(parent, child); + let mut owned_registry = registry.as_ref().clone(); + if let Some(existing) = owned_registry.remove(&id) { + owned_registry.insert(id, existing.as_ref().iter().cloned().chain(fns).collect()); + } else { + owned_registry.insert(id, fns.into_iter().collect()); + } + self.reduce_parent.store(Arc::new(owned_registry)); } /// Look up the [`ReduceParentFn`] registered for `(outer, child)`. /// /// Returns an owned [`Arc`] so the session-variable borrow can be dropped before invoking the /// function. - pub fn find_reduce_parent(&self, outer: Id, child: Id) -> Option> { - self.registry - .find(self.hash_fn_ids(outer, child, FnKind::ReduceParent)) - } - - /// Return `true` if a [`ReduceParentFn`] is registered for `(outer, child)`. - pub fn contains_reduce_parent(&self, outer: Id, child: Id) -> bool { - self.registry - .contains(self.hash_fn_ids(outer, child, FnKind::ReduceParent)) + pub fn find_reduce_parent(&self, parent: Id, child: Id) -> Option> { + let id = self.hash_fn_ids(parent, child); + let map = self.reduce_parent.load(); + let entry = map.get(&id)?; + Some(Arc::clone(entry)) } /// Combine a typed kernel id tuple into the `u64` key expected by the underlying /// [`FnRegistry`]. All typed helpers use this path so registration and lookup agree. - fn hash_fn_ids(&self, outer: Id, child: Id, fn_kind: FnKind) -> u64 { - FN_HASHER.hash_one((outer, child, fn_kind)) + fn hash_fn_ids(&self, parent: Id, child: Id) -> u64 { + FN_HASHER.hash_one((parent, child)) } } diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 3058fc5a8f6..70a041bcc18 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -73,7 +73,7 @@ fn plugin_reduce_parent( session: &VortexSession, parent: &ArrayRef, child: &ArrayRef, -) -> Option> { +) -> Option> { session .get_opt::() .and_then(|s| s.find_reduce_parent(parent.encoding_id(), child.encoding_id())) @@ -107,12 +107,15 @@ fn try_optimize( // Session kernels take precedence over the child encoding's static PARENT_RULES. if let Some(session) = session - && let Some(plugin) = plugin_reduce_parent(session, ¤t_array, child) - && let Some(new_array) = plugin(child, ¤t_array, slot_idx)? + && let Some(plugins) = plugin_reduce_parent(session, ¤t_array, child) { - current_array = new_array; - any_optimizations = true; - continue 'outer; + for plugin in plugins.as_ref() { + if let Some(new_array) = plugin(child, ¤t_array, slot_idx)? { + current_array = new_array; + any_optimizations = true; + continue 'outer; + } + } } if let Some(new_array) = child.reduce_parent(¤t_array, slot_idx)? { diff --git a/vortex-session/Cargo.toml b/vortex-session/Cargo.toml index 32d4d6de428..d4e047223c1 100644 --- a/vortex-session/Cargo.toml +++ b/vortex-session/Cargo.toml @@ -20,7 +20,6 @@ all-features = true workspace = true [dependencies] -arc-swap = { workspace = true } dashmap = { workspace = true } lasso = { workspace = true } parking_lot = { workspace = true } diff --git a/vortex-session/src/registry.rs b/vortex-session/src/registry.rs index 14a6f6ab26d..36c03828844 100644 --- a/vortex-session/src/registry.rs +++ b/vortex-session/src/registry.rs @@ -4,7 +4,6 @@ //! Many session types use a registry of objects that can be looked up by name to construct //! contexts. This module provides a generic registry type for that purpose. -use std::any::Any; use std::cmp::Ordering; use std::fmt; use std::fmt::Debug; @@ -16,13 +15,11 @@ use std::sync::Arc; use std::sync::LazyLock; use std::sync::OnceLock; -use arc_swap::ArcSwap; use lasso::Spur; use lasso::ThreadedRodeo; use parking_lot::RwLock; use vortex_error::VortexExpect; use vortex_utils::aliases::dash_map::DashMap; -use vortex_utils::aliases::hash_map::HashMap; /// Global string interner for [`Id`] values. static INTERNER: LazyLock = LazyLock::new(ThreadedRodeo::new); @@ -291,108 +288,3 @@ impl Context { self.ids.read().clone() } } - -/// Copy-on-write registry of type-erased function values keyed by caller-provided `u64` IDs. -/// -/// Each entry stores an `Arc` wrapping a caller-supplied concrete type `F` -/// (typically a function pointer). Callers recover the original type by passing the same `F` to -/// [`FnRegistry::find`]; `find` returns `Some(Arc)` on a type match and `None` otherwise. -/// -/// The `u64` key is produced by the caller, usually by hashing a tuple that uniquely identifies -/// the registered function. This lets each consumer choose its own key layout rather than -/// committing this registry to a specific schema. For example, `ArrayKernels` hashes -/// `(parent_encoding_id, child_encoding_id, FnKind)` so one registry can hold several kinds of -/// pluggable kernel keyed by encoding pairs without collisions. -/// -/// This type is intended for read-heavy registries that are populated during session setup and -/// then shared. Reads load a snapshot through [`ArcSwap`] and do not block on writers. Writes -/// clone the current map and swap in a new snapshot, so externally serialize concurrent -/// [`FnRegistry::register`] calls if every write must be retained. -#[derive(Debug, Default)] -pub struct FnRegistry(ArcSwap>>); - -impl FnRegistry { - /// Create a new, empty registry. - pub fn empty() -> Self { - Self::default() - } - - /// Register a function under `id`, replacing any existing entry with the same key. - /// - /// Intended to be called while constructing a session or under an external writer lock. - pub fn register(&self, id: u64, f: F) { - let registry = self.0.load(); - let mut owned_registry = registry.as_ref().clone(); - owned_registry.insert(id, Arc::new(f)); - self.0.store(Arc::new(owned_registry)); - } - - /// Look up a function registered under `id`, downcasting to `F`. - /// - /// Returns `None` if no function is registered for `id`, or if the registered value is not - /// of type `F`. - pub fn find(&self, id: u64) -> Option> { - let map = self.0.load(); - let entry = map.get(&id)?; - Arc::clone(entry).downcast::().ok() - } - - /// Return `true` if any function is registered under `id`, regardless of type. - pub fn contains(&self, id: u64) -> bool { - let map = self.0.load(); - map.contains_key(&id) - } -} - -#[cfg(test)] -mod fn_registry_tests { - use std::hash::BuildHasher; - - use vortex_utils::aliases::DefaultHashBuilder; - - use super::FnRegistry; - use super::Id; - - type DoubleFn = fn(i64) -> i64; - - fn double(x: i64) -> i64 { - x * 2 - } - - #[test] - fn register_and_find() { - let registry = FnRegistry::default(); - let outer = Id::new("test.double"); - let inner = Id::new("test.int"); - let id = DefaultHashBuilder::default().hash_one((outer, inner)); - - assert!(!registry.contains(id)); - registry.register::(id, double); - - assert!(registry.contains(id)); - let f = registry.find::(id).unwrap(); - assert_eq!(f(21), 42); - } - - #[test] - fn find_with_wrong_type_returns_none() { - let registry = FnRegistry::default(); - let outer = Id::new("test.double"); - let inner = Id::new("test.int"); - let id = DefaultHashBuilder::default().hash_one((outer, inner)); - - registry.register::(id, double); - - type OtherFn = fn(i32) -> i32; - assert!(registry.find::(id).is_none()); - } - - #[test] - fn missing_entry_returns_none() { - let registry = FnRegistry::default(); - let outer = Id::new("test.missing"); - let inner = Id::new("test.int"); - let id = DefaultHashBuilder::default().hash_one((outer, inner)); - assert!(registry.find::(id).is_none()); - } -} From 7878615d5243ff38f912905349f55cc4cf75ea95 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 24 Apr 2026 14:51:02 -0400 Subject: [PATCH 12/12] api Signed-off-by: Robert Kruszewski --- vortex-array/public-api.lock | 6 ++---- vortex-session/public-api.lock | 20 -------------------- 2 files changed, 2 insertions(+), 24 deletions(-) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 58d978fd878..02d767b3cb1 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -13228,13 +13228,11 @@ pub struct vortex_array::optimizer::kernels::ArrayKernels impl vortex_array::optimizer::kernels::ArrayKernels -pub fn vortex_array::optimizer::kernels::ArrayKernels::contains_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id) -> bool - pub fn vortex_array::optimizer::kernels::ArrayKernels::empty() -> Self -pub fn vortex_array::optimizer::kernels::ArrayKernels::find_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id) -> core::option::Option> +pub fn vortex_array::optimizer::kernels::ArrayKernels::find_reduce_parent(&self, parent: vortex_session::registry::Id, child: vortex_session::registry::Id) -> core::option::Option> -pub fn vortex_array::optimizer::kernels::ArrayKernels::register_reduce_parent(&self, outer: vortex_session::registry::Id, child: vortex_session::registry::Id, f: vortex_array::optimizer::kernels::ReduceParentFn) +pub fn vortex_array::optimizer::kernels::ArrayKernels::register_reduce_parent>(&self, parent: vortex_session::registry::Id, child: vortex_session::registry::Id, fns: I) impl core::default::Default for vortex_array::optimizer::kernels::ArrayKernels diff --git a/vortex-session/public-api.lock b/vortex-session/public-api.lock index c7f1d1d950b..a63e955f77a 100644 --- a/vortex-session/public-api.lock +++ b/vortex-session/public-api.lock @@ -40,26 +40,6 @@ impl core::default::Default for vortex_session::registry::Context pub fn vortex_session::registry::Context::default() -> Self -pub struct vortex_session::registry::FnRegistry(_) - -impl vortex_session::registry::FnRegistry - -pub fn vortex_session::registry::FnRegistry::contains(&self, id: u64) -> bool - -pub fn vortex_session::registry::FnRegistry::empty() -> Self - -pub fn vortex_session::registry::FnRegistry::find(&self, id: u64) -> core::option::Option> - -pub fn vortex_session::registry::FnRegistry::register(&self, id: u64, f: F) - -impl core::default::Default for vortex_session::registry::FnRegistry - -pub fn vortex_session::registry::FnRegistry::default() -> vortex_session::registry::FnRegistry - -impl core::fmt::Debug for vortex_session::registry::FnRegistry - -pub fn vortex_session::registry::FnRegistry::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result - pub struct vortex_session::registry::Id(_) impl vortex_session::registry::Id