diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 762d5e00fb0..c89418e67a6 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -142,7 +142,6 @@ impl DynAccumulator for Accumulator { } let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().kernels; // 1. Kernel registry first: a registered `(encoding, aggregate_fn)` kernel is strictly // more specific than the vtable's `try_accumulate` short-circuit. Checking the @@ -150,13 +149,9 @@ impl DynAccumulator for Accumulator { // `Combined::try_accumulate` always returns true, so a later kernel check would be // unreachable. { - let kernels_r = kernels.read(); - let batch_id = batch.encoding_id(); - let kernel = kernels_r - .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(batch_id, None))) - .copied(); - drop(kernels_r); + let kernel = session + .aggregate_fns() + .find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, batch, ctx)? { @@ -187,14 +182,9 @@ impl DynAccumulator for Accumulator { break; } - let kernels_r = kernels.read(); - let batch_id = batch.encoding_id(); - let kernel = kernels_r - .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(batch_id, None))) - .copied(); - drop(kernels_r); - if let Some(kernel) = kernel + if let Some(kernel) = session + .aggregate_fns() + .find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()) && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { vortex_ensure!( diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index a751a7c5749..3fae7a85bf3 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -163,17 +163,15 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels; for _ in 0..max_iterations() { if elements.is::() { break; } - let kernels_r = kernels.read(); - if let Some(result) = kernels_r - .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(elements.encoding_id(), None))) + if let Some(result) = session + .aggregate_fns() + .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -255,17 +253,15 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels; for _ in 0..64 { if elements.is::() { break; } - let kernels_r = kernels.read(); - if let Some(result) = kernels_r - .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(elements.encoding_id(), None))) + if let Some(result) = session + .aggregate_fns() + .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { diff --git a/vortex-array/src/aggregate_fn/proto.rs b/vortex-array/src/aggregate_fn/proto.rs index 9bcfab5818c..bff198b3955 100644 --- a/vortex-array/src/aggregate_fn/proto.rs +++ b/vortex-array/src/aggregate_fn/proto.rs @@ -36,7 +36,7 @@ impl AggregateFnRef { /// Note: the serialization format is not stable and may change between versions. pub fn from_proto(proto: &pb::AggregateFn, session: &VortexSession) -> VortexResult { let agg_fn_id: AggregateFnId = AggregateFnId::new(proto.id.as_str()); - let agg_fn = if let Some(plugin) = session.aggregate_fns().registry().find(&agg_fn_id) { + let agg_fn = if let Some(plugin) = session.aggregate_fns().find_plugin(&agg_fn_id) { plugin.deserialize(proto.metadata(), session)? } else if session.allows_unknown() { new_foreign_aggregate_fn(agg_fn_id, proto.metadata().to_vec()) diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index d89f9da069d..edbafdf386f 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -4,12 +4,9 @@ use std::any::Any; use std::sync::Arc; -use parking_lot::RwLock; use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::SessionVar; -use vortex_session::registry::Registry; -use vortex_utils::aliases::hash_map::HashMap; use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnPluginRef; @@ -34,6 +31,7 @@ use crate::aggregate_fn::fns::sum::Sum; use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use crate::aggregate_fn::kernels::DynAggregateKernel; use crate::aggregate_fn::kernels::DynGroupedAggregateKernel; +use crate::arc_swap_map::ArcSwapMap; use crate::array::ArrayId; use crate::array::VTable; use crate::arrays::Chunked; @@ -43,16 +41,17 @@ use crate::arrays::dict::compute::is_constant::DictIsConstantKernel; use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel; use crate::arrays::dict::compute::min_max::DictMinMaxKernel; -/// Registry of aggregate function vtables. -pub type AggregateFnRegistry = Registry; - -/// Session state for aggregate function vtables. +/// Session state for aggregate functions and encoding-specific aggregate kernels. +/// +/// The default session registers the built-in aggregate functions and kernels. Additional +/// aggregate functions and kernels may be registered by extensions when they are added to a +/// [`VortexSession`](vortex_session::VortexSession). #[derive(Debug)] pub struct AggregateFnSession { - registry: AggregateFnRegistry, + registry: ArcSwapMap, - pub(super) kernels: RwLock>, - pub(super) grouped_kernels: RwLock>, + kernels: ArcSwapMap, + grouped_kernels: ArcSwapMap, } impl SessionVar for AggregateFnSession { @@ -70,9 +69,9 @@ type KernelKey = (ArrayId, Option); impl Default for AggregateFnSession { fn default() -> Self { let this = Self { - registry: AggregateFnRegistry::default(), - kernels: RwLock::new(HashMap::default()), - grouped_kernels: RwLock::new(HashMap::default()), + registry: ArcSwapMap::default(), + kernels: ArcSwapMap::default(), + grouped_kernels: ArcSwapMap::default(), }; // Register the built-in aggregate functions @@ -106,34 +105,88 @@ impl Default for AggregateFnSession { } impl AggregateFnSession { - /// Returns the aggregate function registry. - pub fn registry(&self) -> &AggregateFnRegistry { - &self.registry + /// Returns the aggregate function plugin registered for `id`, if any. + pub fn find_plugin(&self, id: &AggregateFnId) -> Option { + self.registry.get(id) } /// Register an aggregate function vtable in the session, replacing any existing vtable with /// the same ID. pub fn register(&self, vtable: V) { - self.registry - .register(vtable.id(), Arc::new(vtable) as AggregateFnPluginRef); + let id = vtable.id(); + let pluginref = Arc::new(vtable) as AggregateFnPluginRef; + self.registry.insert(id, pluginref); } - /// Register an aggregate function kernel for a specific aggregate function and array type. + /// Returns the aggregate kernel registered for `array_id` and `agg_fn_id`, if any. + /// + /// Lookup first checks for a kernel registered for the exact aggregate function, then falls + /// back to a kernel registered for all aggregate functions on the same array encoding. + pub fn find_aggregate_kernel( + &self, + array_id: impl Into, + agg_fn_id: impl Into, + ) -> Option<&'static dyn DynAggregateKernel> { + let id = array_id.into(); + let fn_id = agg_fn_id.into(); + self.kernels.read(|kernels| { + kernels + .get(&(id, Some(fn_id))) + .or_else(|| kernels.get(&(id, None))) + .copied() + }) + } + + /// Registers an aggregate kernel for an array encoding. + /// + /// When `agg_fn_id` is `Some`, the kernel is used only for that aggregate function. When + /// `agg_fn_id` is `None`, the kernel is used as the fallback for aggregate functions on the + /// array encoding that do not have a more specific kernel. pub fn register_aggregate_kernel( &self, array_id: impl Into, agg_fn_id: Option>, kernel: &'static dyn DynAggregateKernel, ) { - self.kernels - .write() - .insert((array_id.into(), agg_fn_id.map(|id| id.into())), kernel); + let id = (array_id.into(), agg_fn_id.map(|id| id.into())); + self.kernels.insert(id, kernel); + } + + /// Returns the grouped aggregate kernel registered for `array_id` and `agg_fn_id`, if any. + /// + /// Lookup first checks for a kernel registered for the exact aggregate function, then falls + /// back to a kernel registered for all aggregate functions on the same array encoding. + pub fn find_grouped_kernel( + &self, + array_id: impl Into, + agg_fn_id: impl Into, + ) -> Option<&'static dyn DynGroupedAggregateKernel> { + let id = array_id.into(); + let fn_id = agg_fn_id.into(); + self.grouped_kernels.read(|kernels| { + kernels + .get(&(id, Some(fn_id))) + .or_else(|| kernels.get(&(id, None))) + .copied() + }) + } + + /// Registers a grouped aggregate kernel for a specific aggregate function and array encoding. + pub fn register_grouped_kernel( + &self, + array_id: impl Into, + agg_fn_id: impl Into, + kernel: &'static dyn DynGroupedAggregateKernel, + ) { + let id = array_id.into(); + let fn_id = agg_fn_id.into(); + self.grouped_kernels.insert((id, Some(fn_id)), kernel) } } /// Extension trait for accessing aggregate function session data. pub trait AggregateFnSessionExt: SessionExt { - /// Returns the aggregate function vtable registry. + /// Returns the aggregate function session data. fn aggregate_fns(&self) -> Ref<'_, AggregateFnSession> { self.get::() } diff --git a/vortex-array/src/arc_swap_map.rs b/vortex-array/src/arc_swap_map.rs new file mode 100644 index 00000000000..aff46c22d73 --- /dev/null +++ b/vortex-array/src/arc_swap_map.rs @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! A concurrent, copy-on-write map backed by an [`ArcSwap`]. + +use std::borrow::Borrow; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::hash::Hash; +use std::sync::Arc; + +use arc_swap::ArcSwap; +use vortex_utils::aliases::hash_map::HashMap; + +/// A concurrent [`HashMap`] backed by an [`ArcSwap`], offering lock-free reads +/// and copy-on-write writes. +/// +/// Reads load the current snapshot without blocking writers. Writes clone the +/// whole map, apply their change, and atomically publish the new version, so a +/// reader always observes a consistent snapshot and writers never block readers. +/// +/// This is the shared building block behind the session-scoped registries (the +/// optimizer-kernel and aggregate-function registries). Because every write +/// clones the entire map, it is intended for maps that are written rarely +/// (typically only while a session is being configured) and read often. +pub(crate) struct ArcSwapMap { + inner: ArcSwap>, +} + +impl Default for ArcSwapMap { + fn default() -> Self { + Self { + inner: ArcSwap::from_pointee(HashMap::default()), + } + } +} + +impl Debug for ArcSwapMap { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.read(|map| f.debug_tuple("ArcSwapMap").field(map).finish()) + } +} + +impl ArcSwapMap { + /// Read the current snapshot, passing it to `f`. + /// + /// Every lookup inside `f` observes the same snapshot, which matters when a + /// single logical read consults more than one key. + pub(crate) fn read(&self, f: impl FnOnce(&HashMap) -> R) -> R { + f(&self.inner.load()) + } + + /// Replace the map with the result of applying `f` to a private copy. + /// + /// Writes are copy-on-write via [`ArcSwap::rcu`], so `f` may run more than + /// once under contention and must not move out of its captures. + fn modify(&self, f: impl Fn(&mut HashMap)) + where + K: Clone, + V: Clone, + { + self.inner.rcu(|existing| { + let mut map = existing.as_ref().clone(); + f(&mut map); + map + }); + } +} + +impl ArcSwapMap { + /// Return a clone of the value stored under `key`, if present. + pub(crate) fn get(&self, key: &Q) -> Option + where + K: Borrow, + Q: Eq + Hash + ?Sized, + { + self.inner.load().get(key).cloned() + } + + /// Insert `value` under `key`, replacing any existing value. + pub(crate) fn insert(&self, key: K, value: V) + where + K: Clone, + { + self.modify(|map| { + map.insert(key.clone(), value.clone()); + }); + } +} + +impl ArcSwapMap> { + /// Append `values` to the list stored under `key`, creating it if absent. + /// + /// Each key maps to an immutable `Arc<[T]>`; appending rebuilds that slice + /// copy-on-write so existing readers keep their previous snapshot. + pub(crate) fn extend(&self, key: K, values: &[T]) { + self.modify(|map| { + let merged: Arc<[T]> = match map.get(&key) { + Some(existing) => existing.iter().chain(values).cloned().collect(), + None => values.into(), + }; + map.insert(key.clone(), merged); + }); + } + + /// Append a single `value` to the list stored under `key`, creating it if + /// absent. + pub(crate) fn push(&self, key: K, value: T) { + self.extend(key, &[value]); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn get_and_insert() { + let map = ArcSwapMap::::default(); + assert_eq!(map.get(&1), None); + map.insert(1, 10); + map.insert(1, 20); + assert_eq!(map.get(&1), Some(20)); + } + + #[test] + fn extend_appends_per_key() { + let map = ArcSwapMap::>::default(); + map.extend(1, &[1, 2]); + map.extend(1, &[3]); + map.extend(2, &[4]); + assert_eq!(map.get(&1).as_deref(), Some([1, 2, 3].as_slice())); + assert_eq!(map.get(&2).as_deref(), Some([4].as_slice())); + } + + #[test] + fn push_appends_single_values() { + let map = ArcSwapMap::>::default(); + map.push(1, 1); + map.push(1, 2); + assert_eq!(map.get(&1).as_deref(), Some([1, 2].as_slice())); + } + + #[test] + fn read_observes_a_single_snapshot() { + let map = ArcSwapMap::::default(); + map.insert(1, 1); + map.insert(2, 2); + assert_eq!(map.read(|m| m.values().sum::()), 3); + } +} diff --git a/vortex-array/src/arrow/session.rs b/vortex-array/src/arrow/session.rs index 9aa83cbbd2e..598a903cfa2 100644 --- a/vortex-array/src/arrow/session.rs +++ b/vortex-array/src/arrow/session.rs @@ -24,7 +24,6 @@ use std::any::Any; use std::fmt::Debug; use std::sync::Arc; -use arc_swap::ArcSwap; use arrow_array::Array as _; use arrow_array::ArrayRef as ArrowArrayRef; use arrow_array::RecordBatch; @@ -44,11 +43,11 @@ use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::SessionVar; use vortex_session::registry::Id; -use vortex_utils::aliases::hash_map::HashMap; use crate::ArrayRef; use crate::ExecutionCtx; use crate::IntoArray; +use crate::arc_swap_map::ArcSwapMap; use crate::arrays::StructArray; use crate::arrow::FromArrowArray; use crate::arrow::convert::nulls; @@ -61,6 +60,7 @@ use crate::dtype::Nullability; use crate::dtype::StructFields; use crate::dtype::arrow::FromArrowType; use crate::dtype::arrow::to_data_type_naive; +use crate::dtype::extension::ExtId; use crate::extension::datetime::AnyTemporal; use crate::extension::uuid::Uuid; use crate::validity::Validity; @@ -154,10 +154,6 @@ pub trait ArrowImportVTable: 'static + Send + Sync + Debug { pub type ArrowExportVTableRef = Arc; pub type ArrowImportVTableRef = Arc; -type ExportMap = HashMap>; -type ImportMap = HashMap>; -type ExportDTypeMap = HashMap>; - /// Session-scoped registry of Arrow extension plugins. /// /// Exporters are stored in two indices: one keyed by Arrow extension Id (used for @@ -169,17 +165,17 @@ type ExportDTypeMap = HashMap>; /// need plugins. #[derive(Debug)] pub struct ArrowSession { - exporters: ArcSwap, - exporters_by_vortex: ArcSwap, - importers: ArcSwap, + exporters: ArcSwapMap>, + exporters_by_vortex: ArcSwapMap>, + importers: ArcSwapMap>, } impl Default for ArrowSession { fn default() -> Self { let session = Self { - exporters: ArcSwap::from_pointee(ExportMap::default()), - exporters_by_vortex: ArcSwap::from_pointee(ExportDTypeMap::default()), - importers: ArcSwap::from_pointee(ImportMap::default()), + exporters: ArcSwapMap::default(), + exporters_by_vortex: ArcSwapMap::default(), + importers: ArcSwapMap::default(), }; session.register_exporter(Arc::new(Uuid)); @@ -193,56 +189,31 @@ impl ArrowSession { /// Register an [`ArrowExportVTable`] under its target Arrow extension Id (for dispatch) /// and its source Vortex extension Id (for schema inference). pub fn register_exporter(&self, exporter: ArrowExportVTableRef) { - Self::insert( - &self.exporters, + self.exporters.push( exporter.arrow_ext_id(), ArrowExportVTableRef::clone(&exporter), ); - Self::insert(&self.exporters_by_vortex, exporter.vortex_id(), exporter); + self.exporters_by_vortex + .push(exporter.vortex_id(), exporter); } /// Register an [`ArrowImportVTable`] under its source Arrow extension name. pub fn register_importer(&self, importer: ArrowImportVTableRef) { - Self::insert(&self.importers, importer.arrow_ext_id(), importer); - } - - fn insert(slot: &ArcSwap>>, key: K, value: T) - where - K: Clone + Eq + std::hash::Hash, - T: Clone, - { - slot.rcu(move |map| { - let mut next = (**map).clone(); - let entry = next.entry(key.clone()).or_insert_with(|| Arc::from([])); - let mut extended: Vec = entry.iter().cloned().collect(); - extended.push(value.clone()); - *entry = Arc::from(extended); - next - }); + self.importers.push(importer.arrow_ext_id(), importer); } fn exporters(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> { - self.exporters - .load() - .get(id) - .cloned() - .unwrap_or_else(|| Arc::from([])) + self.exporters.get(id).unwrap_or_else(|| Arc::from([])) } fn exporters_by_vortex(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> { self.exporters_by_vortex - .load() .get(id) - .cloned() .unwrap_or_else(|| Arc::from([])) } fn importers(&self, id: &Id) -> Arc<[ArrowImportVTableRef]> { - self.importers - .load() - .get(id) - .cloned() - .unwrap_or_else(|| Arc::from([])) + self.importers.get(id).unwrap_or_else(|| Arc::from([])) } /// Build the Arrow [`Field`] for a Vortex [`DType`]. diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 0a9c5969ecc..1f3189eecbb 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -33,6 +33,7 @@ pub mod accessor; pub mod aggregate_fn; #[doc(hidden)] pub mod aliases; +mod arc_swap_map; mod array; pub mod arrays; pub mod arrow; diff --git a/vortex-array/src/optimizer/kernels.rs b/vortex-array/src/optimizer/kernels.rs index d38bc9402d1..93407a2c42d 100644 --- a/vortex-array/src/optimizer/kernels.rs +++ b/vortex-array/src/optimizer/kernels.rs @@ -26,21 +26,19 @@ use std::any::Any; use std::borrow::Borrow; use std::hash::BuildHasher; -use std::hash::Hash; use std::sync::Arc; use std::sync::LazyLock; -use arc_swap::ArcSwap; use vortex_error::VortexResult; use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::SessionVar; use vortex_session::registry::Id; use vortex_utils::aliases::DefaultHashBuilder; -use vortex_utils::aliases::hash_map::HashMap; use crate::ArrayRef; use crate::ExecutionCtx; +use crate::arc_swap_map::ArcSwapMap; use crate::array::VTable; use crate::arrays::Struct; use crate::arrays::struct_::compute::cast::struct_cast_execute_parent; @@ -115,8 +113,8 @@ impl Borrow for ExecuteParentFnId { /// functions for an existing key appends them to that key's ordered list. #[derive(Debug)] pub struct ArrayKernels { - reduce_parent: ArcSwap>>, - execute_parent: ArcSwap>>, + reduce_parent: ArcSwapMap>, + execute_parent: ArcSwapMap>, } impl Default for ArrayKernels { @@ -132,8 +130,8 @@ impl ArrayKernels { /// Create an empty [`ArrayKernels`] with no kernels registered. pub fn empty() -> Self { Self { - reduce_parent: ArcSwap::from_pointee(HashMap::default()), - execute_parent: ArcSwap::from_pointee(HashMap::default()), + reduce_parent: ArcSwapMap::default(), + execute_parent: ArcSwapMap::default(), } } @@ -164,9 +162,8 @@ impl ArrayKernels { /// If functions have already been registered for the same pair, these functions are appended /// after them. pub fn register_reduce_parent(&self, parent: Id, child: Id, fns: &[ReduceParentFn]) { - self.reduce_parent.rcu(move |registry| { - update_fns(registry.as_ref().clone(), hash_fn_id(parent, child), fns) - }); + self.reduce_parent + .extend(hash_fn_id(parent, child).into(), fns); } /// Look up the [`ReduceParentFn`]s registered for `(parent, child)`. @@ -174,8 +171,7 @@ impl ArrayKernels { /// Returns an owned [`Arc`] so the session-variable borrow can be dropped before invoking the /// functions. pub fn find_reduce_parent(&self, parent: Id, child: Id) -> Option> { - let id = hash_fn_id(parent, child); - self.reduce_parent.load().get(&id).cloned() + self.reduce_parent.get(&hash_fn_id(parent, child)) } /// Register [`ExecuteParentFn`]s for `(parent, child)`. @@ -187,9 +183,8 @@ impl ArrayKernels { /// If functions have already been registered for the same pair, these functions are appended /// after them. pub fn register_execute_parent(&self, parent: Id, child: Id, fns: &[ExecuteParentFn]) { - self.execute_parent.rcu(move |registry| { - update_fns(registry.as_ref().clone(), hash_fn_id(parent, child), fns) - }); + self.execute_parent + .extend(hash_fn_id(parent, child).into(), fns); } /// Look up the [`ExecuteParentFn`]s registered for `(parent, child)`. @@ -197,8 +192,7 @@ impl ArrayKernels { /// Returns an owned [`Arc`] so the session-variable borrow can be dropped before invoking the /// functions. pub fn find_execute_parent(&self, parent: Id, child: Id) -> Option> { - let id = hash_fn_id(parent, child); - self.execute_parent.load().get(&id).cloned() + self.execute_parent.get(&hash_fn_id(parent, child)) } } @@ -206,22 +200,6 @@ fn hash_fn_id(parent: Id, child: Id) -> u64 { FN_HASHER.hash_one((parent, child)) } -fn update_fns + Eq + Hash + From>( - mut existing: HashMap>, - id: u64, - fns: &[F], -) -> HashMap> { - if let Some(existing_fns) = existing.remove(&id) { - existing.insert( - id.into(), - existing_fns.as_ref().iter().chain(fns).cloned().collect(), - ); - } else { - existing.insert(id.into(), fns.into()); - } - existing -} - impl SessionVar for ArrayKernels { fn as_any(&self) -> &dyn Any { self