diff --git a/Cargo.lock b/Cargo.lock index 069aa68ebab..efeaea1074e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10018,6 +10018,7 @@ name = "vortex-array" version = "0.1.0" dependencies = [ "arbitrary", + "arc-swap", "arcref", "arrow-arith 58.1.0", "arrow-array 58.1.0", @@ -10814,7 +10815,6 @@ dependencies = [ name = "vortex-session" version = "0.1.0" dependencies = [ - "arcref", "dashmap", "lasso", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 92e08f1338f..4ddcfbe3d43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,7 +89,7 @@ version = "0.1.0" aho-corasick = "1.1.3" anyhow = "1.0.97" arbitrary = "1.3.2" -arc-swap = "1.8" +arc-swap = "1.9" arcref = "0.2.0" arrow-arith = "58" arrow-array = "58" diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index f9adbeb99db..f8676d76ef0 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -21,6 +21,7 @@ workspace = true [dependencies] arbitrary = { workspace = true, optional = true } +arc-swap = { workspace = true } arcref = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true, features = ["ffi"] } diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 8ee3c97f17b..02d767b3cb1 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -13222,6 +13222,36 @@ pub vortex_array::normalize::NormalizeOptions::operation: vortex_array::normaliz pub mod vortex_array::optimizer +pub mod vortex_array::optimizer::kernels + +pub struct vortex_array::optimizer::kernels::ArrayKernels + +impl vortex_array::optimizer::kernels::ArrayKernels + +pub fn vortex_array::optimizer::kernels::ArrayKernels::empty() -> Self + +pub fn vortex_array::optimizer::kernels::ArrayKernels::find_reduce_parent(&self, parent: vortex_session::registry::Id, child: vortex_session::registry::Id) -> core::option::Option> + +pub fn vortex_array::optimizer::kernels::ArrayKernels::register_reduce_parent>(&self, parent: vortex_session::registry::Id, child: vortex_session::registry::Id, fns: I) + +impl core::default::Default for vortex_array::optimizer::kernels::ArrayKernels + +pub fn vortex_array::optimizer::kernels::ArrayKernels::default() -> vortex_array::optimizer::kernels::ArrayKernels + +impl core::fmt::Debug for vortex_array::optimizer::kernels::ArrayKernels + +pub fn vortex_array::optimizer::kernels::ArrayKernels::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub trait vortex_array::optimizer::kernels::ArrayKernelsExt: vortex_session::SessionExt + +pub fn vortex_array::optimizer::kernels::ArrayKernelsExt::kernels(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::kernels::ArrayKernels> + +impl vortex_array::optimizer::kernels::ArrayKernelsExt for S + +pub fn S::kernels(&self) -> vortex_session::Ref<'_, vortex_array::optimizer::kernels::ArrayKernels> + +pub type vortex_array::optimizer::kernels::ReduceParentFn = fn(child: &vortex_array::ArrayRef, parent: &vortex_array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult> + pub mod vortex_array::optimizer::rules pub struct vortex_array::optimizer::rules::ParentReduceRuleAdapter @@ -13364,13 +13394,17 @@ pub trait vortex_array::optimizer::ArrayOptimizer pub fn vortex_array::optimizer::ArrayOptimizer::optimize(&self) -> vortex_error::VortexResult -pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self) -> vortex_error::VortexResult +pub fn vortex_array::optimizer::ArrayOptimizer::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::optimizer::ArrayOptimizer::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult -pub fn vortex_array::ArrayRef::optimize_recursive(&self) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::ArrayRef::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult pub mod vortex_array::patches @@ -22328,7 +22362,9 @@ impl vortex_array::optimizer::ArrayOptimizer for vortex_array::ArrayRef pub fn vortex_array::ArrayRef::optimize(&self) -> vortex_error::VortexResult -pub fn vortex_array::ArrayRef::optimize_recursive(&self) -> vortex_error::VortexResult +pub fn vortex_array::ArrayRef::optimize_ctx(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::ArrayRef::optimize_recursive(&self, session: &vortex_session::VortexSession) -> vortex_error::VortexResult impl vortex_array::scalar_fn::ReduceNode for vortex_array::ArrayRef diff --git a/vortex-array/src/arrays/extension/compute/rules.rs b/vortex-array/src/arrays/extension/compute/rules.rs index b6e2d5a1e06..d7c8469d1dc 100644 --- a/vortex-array/src/arrays/extension/compute/rules.rs +++ b/vortex-array/src/arrays/extension/compute/rules.rs @@ -78,9 +78,12 @@ impl ArrayParentReduceRule for ExtensionFilterPushDownRule { #[cfg(test)] mod tests { + use std::sync::LazyLock; + use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_mask::Mask; + use vortex_session::VortexSession; use crate::IntoArray; #[expect(deprecated)] @@ -108,6 +111,10 @@ mod tests { use crate::scalar::ScalarValue; use crate::scalar_fn::fns::binary::Binary; use crate::scalar_fn::fns::operators::Operator; + use crate::session::ArraySession; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] struct TestExt; @@ -220,7 +227,7 @@ mod tests { .try_new_array(3, Operator::Lt, [constant_ext, ext_array]) .unwrap(); - let optimized = scalar_fn_array.optimize_recursive().unwrap(); + let optimized = scalar_fn_array.optimize_recursive(&SESSION).unwrap(); let scalar_fn = optimized.as_opt::().unwrap(); let children = scalar_fn.children(); let constant = children[0] diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index e35b485972a..1ca27bc1de5 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -10,9 +10,10 @@ //! 3. **`execute_parent`** -- child-driven fused execution (may read buffers). //! 4. **`execute`** -- the encoding's own decode step (most expensive). //! -//! The main entry point is [`DynArray::execute_until`], which uses an explicit work stack +//! The main entry point is [`ArrayRef::execute_until`], which uses an explicit work stack //! to drive execution iteratively without recursion. Between steps, the optimizer runs -//! reduce/reduce_parent rules to fixpoint. +//! reduce/reduce_parent rules to fixpoint using the active [`ExecutionCtx`] session, so +//! session-registered optimizer kernels participate during execution. //! //! See for a full description //! of the model. @@ -88,17 +89,21 @@ impl ArrayRef { /// /// Each iteration proceeds through three steps in order: /// - /// 1. **Done / canonical check** — if `current` satisfies the active done predicate or is + /// 1. **Done / canonical check** - if `current` satisfies the active done predicate or is /// canonical, splice it back into the stacked parent (if any) and continue, or return. - /// 2. **`execute_parent` on children** — try each child's `execute_parent` against `current` + /// 2. **`execute_parent` on children** - try each child's `execute_parent` against `current` /// as the parent (e.g. `Filter(RunEnd)` → `FilterExecuteAdaptor` fires from RunEnd). /// If there is a stacked parent frame, the rewritten child is spliced back into it so /// that optimize and further `execute_parent` can fire on the reconstructed parent /// (e.g. `Slice(RunEnd)` → `RunEnd` spliced into stacked `Filter` → `Filter(RunEnd)` /// whose `FilterExecuteAdaptor` fires on the next iteration). - /// 3. **`execute`** — call the encoding's own execute step, which either returns `Done` or + /// 3. **`execute`** - call the encoding's own execute step, which either returns `Done` or /// `ExecuteSlot(i)` to push a child onto the stack for focused execution. /// + /// Optimizer calls in this loop use [`ExecutionCtx::session`], so kernels registered on the + /// session's [`ArrayKernels`](crate::optimizer::kernels::ArrayKernels) are visible between + /// execution steps. + /// /// Note: the returned array may not match `M`. If execution converges to a canonical form /// that does not match `M`, the canonical array is returned since no further execution /// progress is possible. @@ -110,7 +115,7 @@ impl ArrayRef { let mut stack: Vec = Vec::new(); for _ in 0..max_iterations() { - // Step 1: done / canonical — splice back into stacked parent or return. + // Step 1: done / canonical - splice back into stacked parent or return. let is_done = stack .last() .map_or(M::matches as DonePredicate, |frame| frame.done); @@ -121,7 +126,7 @@ impl ArrayRef { return Ok(current); } Some(frame) => { - current = frame.put_back(current)?.optimize()?; + current = frame.put_back(current)?.optimize_ctx(ctx.session())?; continue; } } @@ -137,9 +142,9 @@ impl ArrayRef { "execute_parent rewrote {} -> {}", current, rewritten )); - current = rewritten.optimize()?; + current = rewritten.optimize_ctx(ctx.session())?; if let Some(frame) = stack.pop() { - current = frame.put_back(current)?.optimize()?; + current = frame.put_back(current)?.optimize_ctx(ctx.session())?; } continue; } @@ -158,7 +163,7 @@ impl ArrayRef { )); let frame = StackFrame::new(parent, i, done, &child); stack.push(frame); - current = child.optimize()?; + current = child.optimize_ctx(ctx.session())?; } ExecutionStep::Done => { ctx.log(format_args!("Done: {}", array)); @@ -523,7 +528,7 @@ macro_rules! require_child { /// execution of child `$idx`. /// /// Unlike `require_child!`, this is a statement macro (no value produced) and does not clone -/// `$parent` — it is moved into the early-return path. +/// `$parent` - it is moved into the early-return path. /// /// ```ignore /// require_opt_child!(array, array.patches().map(|p| p.indices()), 1 => Primitive); diff --git a/vortex-array/src/optimizer/kernels.rs b/vortex-array/src/optimizer/kernels.rs new file mode 100644 index 00000000000..af70d0f9b9b --- /dev/null +++ b/vortex-array/src/optimizer/kernels.rs @@ -0,0 +1,113 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Session-scoped registry for optimizer kernels. +//! +//! [`ArrayKernels`] stores function pointers that participate in array optimization without +//! adding rules to an encoding vtable. The optimizer currently consults it for parent-reduce +//! rewrites before the child encoding's static `PARENT_RULES`. A registered function can +//! therefore add a rule for an extension encoding or take precedence over a built-in rule. +//! +//! Kernel entries are addressed by `(outer_id, child_id, kind)`. For parent-reduce kernels, +//! `outer_id` is the id returned by the parent array's `encoding_id()` and `child_id` is the +//! child array's `encoding_id()`. For [`ScalarFn`](crate::arrays::ScalarFn) parents, the parent +//! id is the scalar function id. +//! +//! Sessions created by the top-level `vortex` crate install an empty registry by default. Other +//! sessions can add it with [`VortexSession::with`](vortex_session::VortexSession::with) or rely +//! on [`ArrayKernelsExt::kernels`] to insert the default value. + +use std::hash::BuildHasher; +use std::sync::Arc; +use std::sync::LazyLock; + +use arc_swap::ArcSwap; +use vortex_error::VortexResult; +use vortex_session::Ref; +use vortex_session::SessionExt; +use vortex_session::registry::Id; +use vortex_utils::aliases::DefaultHashBuilder; +use vortex_utils::aliases::hash_map::HashMap; + +use crate::ArrayRef; + +/// Shared hasher used to combine `(outer, child, FnKind)` tuples into [`FnRegistry`] keys. +static FN_HASHER: LazyLock = LazyLock::new(DefaultHashBuilder::default); + +/// Function pointer for a plugin-provided parent-reduce rewrite. +/// +/// The optimizer calls this with the matched `child`, its `parent`, and the slot index where the +/// child appears. Return `Ok(Some(new_parent))` to replace the parent, or `Ok(None)` when the +/// rewrite does not apply. +/// +/// Implementations must preserve the parent's logical length and dtype, matching the invariant +/// required of static parent-reduce rules. +pub type ReduceParentFn = + fn(child: &ArrayRef, parent: &ArrayRef, child_idx: usize) -> VortexResult>; + +/// Session-scoped registry of optimizer kernel functions. +#[derive(Debug, Default)] +pub struct ArrayKernels { + reduce_parent: ArcSwap>>, +} + +impl ArrayKernels { + /// Create an empty [`ArrayKernels`] with no kernels registered. + pub fn empty() -> Self { + Self::default() + } + + /// Register a [`ReduceParentFn`] for `(outer, child)`. + /// + /// The optimizer will invoke `f` when it sees a parent with encoding id `outer` holding a + /// child with encoding id `child` during a `reduce_parent` step, before trying the child + /// encoding's static `PARENT_RULES`. `outer` is usually the parent array's encoding id. For + /// `ScalarFnArray`, it is the scalar function id, for example `Cast.id()`. + /// + /// Replaces any function already registered for the same pair. + pub fn register_reduce_parent>( + &self, + parent: Id, + child: Id, + fns: I, + ) { + let registry = self.reduce_parent.load(); + let id = self.hash_fn_ids(parent, child); + let mut owned_registry = registry.as_ref().clone(); + if let Some(existing) = owned_registry.remove(&id) { + owned_registry.insert(id, existing.as_ref().iter().cloned().chain(fns).collect()); + } else { + owned_registry.insert(id, fns.into_iter().collect()); + } + self.reduce_parent.store(Arc::new(owned_registry)); + } + + /// Look up the [`ReduceParentFn`] registered for `(outer, child)`. + /// + /// Returns an owned [`Arc`] so the session-variable borrow can be dropped before invoking the + /// function. + pub fn find_reduce_parent(&self, parent: Id, child: Id) -> Option> { + let id = self.hash_fn_ids(parent, child); + let map = self.reduce_parent.load(); + let entry = map.get(&id)?; + Some(Arc::clone(entry)) + } + + /// Combine a typed kernel id tuple into the `u64` key expected by the underlying + /// [`FnRegistry`]. All typed helpers use this path so registration and lookup agree. + fn hash_fn_ids(&self, parent: Id, child: Id) -> u64 { + FN_HASHER.hash_one((parent, child)) + } +} + +/// Extension trait for accessing optimizer kernels from a +/// [`VortexSession`](vortex_session::VortexSession). +pub trait ArrayKernelsExt: SessionExt { + /// Returns the [`ArrayKernels`] session variable, inserting a default-constructed one if + /// none has been registered on the session yet. + fn kernels(&self) -> Ref<'_, ArrayKernels> { + self.get::() + } +} + +impl ArrayKernelsExt for S {} diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 0401e69cb9a..70a041bcc18 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -6,34 +6,83 @@ //! //! Optimization runs between execution steps, which is what enables cross-step optimizations: //! after a child is decoded, new `reduce_parent` rules may match that were previously blocked. +//! +//! There are three public entry points on [`ArrayOptimizer`]: +//! +//! - [`ArrayOptimizer::optimize`] uses only static rules registered on encoding vtables. +//! - [`ArrayOptimizer::optimize_ctx`] also consults session-scoped [`ArrayKernels`] before +//! static parent-reduce rules, so this is the entry point used by execution. +//! - [`ArrayOptimizer::optimize_recursive`] applies the session-aware optimizer to the root and +//! every descendant. + +use std::sync::Arc; use vortex_error::VortexResult; use vortex_error::vortex_bail; +use vortex_session::SessionExt; +use vortex_session::VortexSession; use crate::ArrayRef; +use crate::optimizer::kernels::ArrayKernels; +use crate::optimizer::kernels::ReduceParentFn; +pub mod kernels; pub mod rules; /// Extension trait for optimizing array trees using reduce/reduce_parent rules. pub trait ArrayOptimizer { - /// Optimize the root array node only by running reduce and reduce_parent rules to fixpoint. + /// Optimize the root array node by running reduce and reduce_parent rules to fixpoint. + /// + /// This uses only static rules registered on encoding vtables. Use [`Self::optimize_ctx`] + /// when session-registered [`ArrayKernels`] should participate. fn optimize(&self) -> VortexResult; + /// Optimize the root array node using static rules and any [`ArrayKernels`] on `session`. + /// + /// Session kernels are checked for each `(parent_encoding_id, child_encoding_id)` pair before + /// the child's static `PARENT_RULES`. If `session` does not contain [`ArrayKernels`], this + /// behaves like [`Self::optimize`]. + fn optimize_ctx(&self, session: &VortexSession) -> VortexResult; + /// Optimize the entire array tree recursively (root and all descendants). - fn optimize_recursive(&self) -> VortexResult; + /// + /// This uses the same session-aware rule ordering as [`Self::optimize_ctx`] for every node in + /// the tree. + fn optimize_recursive(&self, session: &VortexSession) -> VortexResult; } impl ArrayOptimizer for ArrayRef { fn optimize(&self) -> VortexResult { - Ok(try_optimize(self)?.unwrap_or_else(|| self.clone())) + Ok(try_optimize(self, None)?.unwrap_or_else(|| self.clone())) + } + + fn optimize_ctx(&self, session: &VortexSession) -> VortexResult { + Ok(try_optimize(self, Some(session))?.unwrap_or_else(|| self.clone())) } - fn optimize_recursive(&self) -> VortexResult { - Ok(try_optimize_recursive(self)?.unwrap_or_else(|| self.clone())) + fn optimize_recursive(&self, session: &VortexSession) -> VortexResult { + Ok(try_optimize_recursive(self, session)?.unwrap_or_else(|| self.clone())) } } -fn try_optimize(array: &ArrayRef) -> VortexResult> { +/// Resolve a session-registered [`ReduceParentFn`] for the `(parent, child)` pair. +/// +/// The returned [`Arc`] is owned so the caller can drop the [`ArrayKernels`] borrow before +/// invoking the function. +fn plugin_reduce_parent( + session: &VortexSession, + parent: &ArrayRef, + child: &ArrayRef, +) -> Option> { + session + .get_opt::() + .and_then(|s| s.find_reduce_parent(parent.encoding_id(), child.encoding_id())) +} + +fn try_optimize( + array: &ArrayRef, + session: Option<&VortexSession>, +) -> VortexResult> { let mut current_array = array.clone(); let mut any_optimizations = false; @@ -55,6 +104,20 @@ fn try_optimize(array: &ArrayRef) -> VortexResult> { // Its important to take all slots here, as `current_array` can change inside the loop. for (slot_idx, slot) in current_array.slots().iter().enumerate() { let Some(child) = slot else { continue }; + + // Session kernels take precedence over the child encoding's static PARENT_RULES. + if let Some(session) = session + && let Some(plugins) = plugin_reduce_parent(session, ¤t_array, child) + { + for plugin in plugins.as_ref() { + if let Some(new_array) = plugin(child, ¤t_array, slot_idx)? { + current_array = new_array; + any_optimizations = true; + continue 'outer; + } + } + } + if let Some(new_array) = child.reduce_parent(¤t_array, slot_idx)? { // If the parent was replaced, then we attempt to reduce it again. current_array = new_array; @@ -76,11 +139,14 @@ fn try_optimize(array: &ArrayRef) -> VortexResult> { } } -fn try_optimize_recursive(array: &ArrayRef) -> VortexResult> { +fn try_optimize_recursive( + array: &ArrayRef, + session: &VortexSession, +) -> VortexResult> { let mut current_array = array.clone(); let mut any_optimizations = false; - if let Some(new_array) = try_optimize(¤t_array)? { + if let Some(new_array) = try_optimize(¤t_array, Some(session))? { current_array = new_array; any_optimizations = true; } @@ -90,7 +156,7 @@ fn try_optimize_recursive(array: &ArrayRef) -> VortexResult> { for slot in current_array.slots() { match slot { Some(child) => { - if let Some(new_child) = try_optimize_recursive(child)? { + if let Some(new_child) = try_optimize_recursive(child, session)? { new_slots.push(Some(new_child)); any_slot_optimized = true; } else { diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index c59fe6cbc2b..15d7da9a34d 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -400,7 +400,7 @@ impl TableFunction for T { return Ok(()); }; let (array_result, conversion_cache) = result?; - let array_result = array_result.optimize_recursive()?; + let array_result = array_result.optimize_recursive(ctx.session())?; let array_result: StructArray = if let Some(array) = array_result.as_opt::() { diff --git a/vortex-session/Cargo.toml b/vortex-session/Cargo.toml index 263c8c8500c..d4e047223c1 100644 --- a/vortex-session/Cargo.toml +++ b/vortex-session/Cargo.toml @@ -20,7 +20,6 @@ all-features = true workspace = true [dependencies] -arcref = { workspace = true } dashmap = { workspace = true } lasso = { workspace = true } parking_lot = { workspace = true } diff --git a/vortex-session/src/registry.rs b/vortex-session/src/registry.rs index a739b9fdda3..36c03828844 100644 --- a/vortex-session/src/registry.rs +++ b/vortex-session/src/registry.rs @@ -9,6 +9,7 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use std::hash::Hash; use std::ops::Deref; use std::sync::Arc; use std::sync::LazyLock; diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 12b71709b72..ae803ee98ae 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -12,6 +12,7 @@ use vortex_array::dtype::session::DTypeSession; // vortex::expr is in the process of having its dependencies inverted, and will eventually be // pulled back out into a vortex_expr crate. pub use vortex_array::expr; +use vortex_array::optimizer::kernels::ArrayKernels; pub use vortex_array::scalar_fn; use vortex_array::scalar_fn::session::ScalarFnSession; use vortex_array::session::ArraySession; @@ -154,7 +155,8 @@ pub mod encodings { /// Extension trait to create a default Vortex session. pub trait VortexSessionDefault { - /// Creates a default Vortex session with the standard arrays, layouts, and expressions. + /// Creates a default Vortex session with standard arrays, layouts, scalar functions, + /// optimizer kernels, expressions, aggregate functions, and runtime support. fn default() -> VortexSession; } @@ -165,6 +167,7 @@ impl VortexSessionDefault for VortexSession { .with::() .with::() .with::() + .with::() .with::() .with::();