diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6c74a9539ad5f..770e705dddc90 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -471,7 +471,7 @@ jobs: export RUST_MIN_STACK=20971520 export TPCH_DATA=`realpath datafusion/sqllogictest/test_files/tpch/data` cargo test plan_q --package datafusion-benchmarks --profile ci --features=ci -- --test-threads=1 - INCLUDE_TPCH=true cargo test --features backtrace,parquet_encryption,substrait,memory-accounting --profile ci --package datafusion-sqllogictest --test sqllogictests -- --default-pool-size-mb 16384 + INCLUDE_TPCH=true cargo test --features backtrace,parquet_encryption,substrait --profile ci --package datafusion-sqllogictest --test sqllogictests - name: Verify Working Directory Clean run: git diff --exit-code diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index e50f72632b3f2..2b36ee7f40add 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -18,7 +18,7 @@ //! [`MemoryPool`] for memory management during query execution, [`proxy`] for //! help with allocation accounting. -use datafusion_common::{Result, internal_datafusion_err, not_impl_err}; +use datafusion_common::{Result, internal_datafusion_err}; use std::any::Any; use std::fmt::Display; use std::hash::{Hash, Hasher}; @@ -223,16 +223,6 @@ pub trait MemoryPool: Any + Send + Sync + std::fmt::Debug + Display { fn memory_limit(&self) -> MemoryLimit { MemoryLimit::Unknown } - - /// Attempt to update this pool's limit in place to `new_limit` bytes. - /// - /// Default impl returns `Err`. Callers that route through - /// [`crate::runtime_env::RuntimeEnvBuilder::with_memory_limit`] fall - /// back to replacing the pool wholesale on `Err`, preserving historical - /// behavior for pools that can't be resized in place. - fn try_resize(&self, _new_limit: usize) -> Result<()> { - not_impl_err!("{} does not support resize", self.name()) - } } impl dyn MemoryPool { diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index ecbc2bd5c6f82..52b601d5cd78b 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -73,15 +73,9 @@ impl Display for UnboundedMemoryPool { /// This pool works well for queries that do not need to spill or have /// a single spillable operator. See [`FairSpillPool`] if there are /// multiple spillable operators that all will spill. -/// -/// Supports [`MemoryPool::try_resize`] for in-place limit adjustment, so -/// callers routing through -/// [`RuntimeEnvBuilder::with_memory_limit`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit) -/// can keep the existing pool (and any wrappers around it) rather than -/// replacing it on every change. #[derive(Debug)] pub struct GreedyMemoryPool { - pool_size: AtomicUsize, + pool_size: usize, used: AtomicUsize, } @@ -90,7 +84,7 @@ impl GreedyMemoryPool { pub fn new(pool_size: usize) -> Self { debug!("Created new GreedyMemoryPool(pool_size={pool_size})"); Self { - pool_size: AtomicUsize::new(pool_size), + pool_size, used: AtomicUsize::new(0), } } @@ -110,17 +104,16 @@ impl MemoryPool for GreedyMemoryPool { } fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { - let pool_size = self.pool_size.load(Ordering::Relaxed); self.used .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| { let new_used = used + additional; - (new_used <= pool_size).then_some(new_used) + (new_used <= self.pool_size).then_some(new_used) }) .map_err(|used| { insufficient_capacity_err( reservation, additional, - pool_size.saturating_sub(used), + self.pool_size.saturating_sub(used), self, ) })?; @@ -132,25 +125,19 @@ impl MemoryPool for GreedyMemoryPool { } fn memory_limit(&self) -> MemoryLimit { - MemoryLimit::Finite(self.pool_size.load(Ordering::Relaxed)) - } - - fn try_resize(&self, new_limit: usize) -> Result<()> { - self.pool_size.store(new_limit, Ordering::Relaxed); - Ok(()) + MemoryLimit::Finite(self.pool_size) } } impl Display for GreedyMemoryPool { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let used = self.used.load(Ordering::Relaxed); - let pool_size = self.pool_size.load(Ordering::Relaxed); write!( f, "{}(used: {}, pool_size: {})", &self.name(), human_readable_size(used), - human_readable_size(pool_size) + human_readable_size(self.pool_size) ) } } @@ -613,10 +600,6 @@ impl MemoryPool for TrackConsumersPool { fn memory_limit(&self) -> MemoryLimit { self.inner.memory_limit() } - - fn try_resize(&self, new_limit: usize) -> Result<()> { - self.inner.try_resize(new_limit) - } } fn provide_top_memory_consumers_to_error_msg( @@ -1063,43 +1046,4 @@ mod tests { "TrackConsumersPool Display" ); } - - #[test] - fn test_greedy_try_resize_in_place() { - let pool: Arc = Arc::new(GreedyMemoryPool::new(100)); - let r = MemoryConsumer::new("r").register(&pool); - - // Fill the pool, then verify it rejects further growth. - r.try_grow(100).unwrap(); - r.try_grow(1).unwrap_err(); - - // Resize *up*: previously-rejected growth now succeeds. - pool.try_resize(200).unwrap(); - assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(200))); - r.try_grow(50).unwrap(); - assert_eq!(pool.reserved(), 150); - - // Resize *down* below current usage: subsequent grows fail because - // reserved (150) already exceeds the new limit (120). Already-issued - // reservations are not retroactively shrunk. - pool.try_resize(120).unwrap(); - assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(120))); - r.try_grow(1).unwrap_err(); - } - - #[test] - fn test_track_consumers_try_resize_forwards() { - let pool: Arc = Arc::new(TrackConsumersPool::new( - GreedyMemoryPool::new(100), - NonZeroUsize::new(3).unwrap(), - )); - pool.try_resize(500).unwrap(); - assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(500))); - } - - #[test] - fn test_unbounded_try_resize_returns_err() { - let pool: Arc = Arc::new(UnboundedMemoryPool::default()); - assert!(pool.try_resize(100).is_err()); - } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 31f663e19557b..5b90f28a141ef 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -409,23 +409,12 @@ impl RuntimeEnvBuilder { /// Specify the total memory to use while running the DataFusion /// plan to `max_memory * memory_fraction` in bytes. /// - /// If a memory pool is already configured on this builder, this first - /// attempts to resize it in place via [`MemoryPool::try_resize`]. Pools - /// that support resize (e.g. [`GreedyMemoryPool`]) keep their identity - /// — useful for any wrapper that needs to observe limit changes (e.g. - /// to retune external accounting). Pools whose [`MemoryPool::try_resize`] - /// returns `Err` (the default) fall back to wholesale replacement - /// with a [`TrackConsumersPool`]-wrapped [`GreedyMemoryPool`] (top 5 - /// consumers), preserving the historical behavior. + /// This defaults to using [`GreedyMemoryPool`] wrapped in the + /// [`TrackConsumersPool`] with a maximum of 5 consumers. /// /// Note DataFusion does not yet respect this limit in all cases. pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self { let pool_size = (max_memory as f64 * memory_fraction) as usize; - if let Some(existing) = &self.memory_pool - && existing.try_resize(pool_size).is_ok() - { - return self; - } self.with_memory_pool(Arc::new(TrackConsumersPool::new( GreedyMemoryPool::new(pool_size), NonZeroUsize::new(5).unwrap(), @@ -573,48 +562,3 @@ impl RuntimeEnvBuilder { docs } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::memory_pool::{GreedyMemoryPool, MemoryLimit, UnboundedMemoryPool}; - - #[test] - fn with_memory_limit_resizes_in_place_when_pool_supports_it() { - let pool: Arc = Arc::new(GreedyMemoryPool::new(100)); - let pool_ptr = Arc::as_ptr(&pool); - - let env = RuntimeEnvBuilder::new() - .with_memory_pool(Arc::clone(&pool)) - .with_memory_limit(500, 1.0) - .build() - .unwrap(); - - // Same Arc as before — wrapper-or-other-resize-capable pools survive. - assert!(std::ptr::eq(Arc::as_ptr(&env.memory_pool), pool_ptr)); - assert!(matches!( - env.memory_pool.memory_limit(), - MemoryLimit::Finite(500) - )); - } - - #[test] - fn with_memory_limit_falls_back_to_replace_when_resize_unsupported() { - let pool: Arc = Arc::new(UnboundedMemoryPool::default()); - let pool_ptr = Arc::as_ptr(&pool); - - let env = RuntimeEnvBuilder::new() - .with_memory_pool(Arc::clone(&pool)) - .with_memory_limit(500, 1.0) - .build() - .unwrap(); - - // Different Arc — wholesale replacement happened because Unbounded's - // default `try_resize` returns Err. - assert!(!std::ptr::eq(Arc::as_ptr(&env.memory_pool), pool_ptr)); - assert!(matches!( - env.memory_pool.memory_limit(), - MemoryLimit::Finite(500) - )); - } -} diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index cda73ba4e8766..a642fbe22a6e3 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -70,10 +70,6 @@ tokio-postgres = { version = "0.7.17", optional = true } [features] avro = ["datafusion/avro"] backtrace = ["datafusion/backtrace"] -# Enable the `AccountingAllocator` `GlobalAlloc` wrapper and its thread-local -# byte counter. The binary still has to declare `#[global_allocator]` for it -# to actually take effect — building with this feature on alone is harmless. -memory-accounting = [] postgres = [ "bytes", "chrono", diff --git a/datafusion/sqllogictest/README.md b/datafusion/sqllogictest/README.md index 57aabca361553..f0a54cf978fbf 100644 --- a/datafusion/sqllogictest/README.md +++ b/datafusion/sqllogictest/README.md @@ -360,35 +360,6 @@ For focusing on one specific failing test, a file:line filter can be used: cargo test --test sqllogictests -- --substrait-round-trip binary.slt:23 ``` -## Running tests: allocator-level memory accounting - -Build with `--features memory-accounting` to install a global allocator -wrapper that tracks actual bytes allocated per SLT file and reconciles them -against DataFusion's voluntary `MemoryPool` tracking. The point isn't to -enforce a process-wide budget — it's to catch DataFusion lying about how -much memory it's using. If `MemoryPool` reports 1 MB while the allocator -sees 100 MB go by, _that gap is the bug_. - -```shell -cargo test --features memory-accounting --test sqllogictests -- \ - --default-pool-size-mb 16384 -``` - -`--default-pool-size-mb` seeds each per-file SLT context's MemoryPool with -the given size in MB and arms the bank as a no-op until a test opts in. - -**Opting an individual test in.** Add `SET datafusion.runtime.memory_limit = 'N'` at the top of the `.slt`. The wrapping `AccountingMemoryPool` then -tightens its allocator-level bank to `N * 1.10` (10% headroom). If the test -allocates more than that — including bytes DataFusion's tracker didn't see -— the test panics with an `OverdraftPanic` reporting the actual balance at -panic time. SLTs without a `SET` of `memory_limit` see no change in -behavior; the bank stays loose and `SHOW ALL` continues to render the limit -as `unlimited`. - -Inside the runner each file gets its own multi-thread Tokio runtime so -context-ids stamped onto worker threads stay stable for the allocator -hook, and per-file accounts in the bank are isolated from each other. - ## `.slt` file format [`sqllogictest`] was originally written for SQLite to verify the diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 2b08769bf5208..e43f03fcf46a7 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -15,11 +15,6 @@ // specific language governing permissions and limitations // under the License. -#[cfg(feature = "memory-accounting")] -#[global_allocator] -static GLOBAL: datafusion_sqllogictest::AccountingAllocator = - datafusion_sqllogictest::AccountingAllocator::system(); - use clap::{ColorChoice, Parser}; use datafusion::common::instant::Instant; use datafusion::common::utils::get_available_parallelism; @@ -143,19 +138,6 @@ async fn run_tests() -> Result<()> { options.warn_on_ignored(); - #[cfg(feature = "memory-accounting")] - if let Some(pool_mb) = options.default_pool_size_mb { - let pool_bytes = pool_mb.saturating_mul(1024 * 1024); - // Same value drives the inner MemoryPool's size and the bank's - // default budget. The wrapper renders this value as `unlimited` in - // `SHOW ALL` (sentinel for "no SET has happened"); once a test - // calls `SET datafusion.runtime.memory_limit`, the wrapper retunes - // the bank to that limit + 10% headroom. - datafusion_sqllogictest::set_memory_tracker_limit(pool_bytes); - datafusion_sqllogictest::set_default_budget(pool_bytes as isize); - log::info!("memory-accounting on: default pool size = {pool_mb} MB"); - } - // Print parallelism info for debugging CI performance eprintln!( "Running with {} test threads (available parallelism: {})", @@ -228,7 +210,7 @@ async fn run_tests() -> Result<()> { let currently_running_sql_tracker_clone = currently_running_sql_tracker.clone(); let file_start = Instant::now(); - let body = async move { + SpawnedTask::spawn(async move { let result = match ( options.postgres_runner, options.complete, @@ -301,41 +283,9 @@ async fn run_tests() -> Result<()> { } (result, elapsed) - }; - // Each file gets its own multi-thread runtime so a stable per-file - // context-id (stamped via `on_thread_start`) is readable from the - // global allocator hook. Bank accounting and SET-driven limit - // retuning will key off this id in later steps. The outer - // orchestration runtime hosts this via `spawn_blocking` so its - // worker threads aren't blocked by the per-file `block_on`. - // - // Worker count matches `SLT_TARGET_PARTITIONS` so a query's - // partition streams each get a worker rather than contending. - #[cfg(feature = "memory-accounting")] - let spawned = { - let context_id = datafusion_sqllogictest::next_context_id(); - SpawnedTask::spawn_blocking(move || { - // Stamp this thread too — `block_on` polls `body` here, so - // statements that don't suspend (e.g. `SET memory_limit`, - // pool construction) run on this thread, not a worker. - datafusion_sqllogictest::set_thread_context_id(context_id); - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(datafusion_sqllogictest::SLT_TARGET_PARTITIONS) - .thread_name(format!("slt-file-{context_id}")) - .on_thread_start(move || { - datafusion_sqllogictest::set_thread_context_id(context_id); - }) - .build() - .expect("build per-file Tokio runtime"); - let out = runtime.block_on(body); - runtime.shutdown_background(); - out - }) - }; - #[cfg(not(feature = "memory-accounting"))] - let spawned = SpawnedTask::spawn(body); - spawned.join().map(move |result| { + }) + .join() + .map(move |result| { let elapsed = match &result { Ok((_, elapsed)) => *elapsed, Err(_) => Duration::ZERO, @@ -965,19 +915,6 @@ struct Options { default_value_t = ColorChoice::Auto )] color: ColorChoice, - - #[clap( - long, - help = "Default MemoryPool size in MB for each per-file SLT context. \ - The pool is wrapped in AccountingMemoryPool, which doubles \ - this value as the 'no SET has happened yet' sentinel — until \ - an SLT calls `SET datafusion.runtime.memory_limit`, SHOW ALL \ - renders the limit as 'unlimited' and the allocator bank \ - stays loose. Once a test SETs a limit, the bank tightens to \ - that limit + 10% headroom. Requires the memory-accounting \ - feature; ignored without it." - )] - default_pool_size_mb: Option, } impl Options { diff --git a/datafusion/sqllogictest/src/accounting.rs b/datafusion/sqllogictest/src/accounting.rs deleted file mode 100644 index 46b6120c24d28..0000000000000 --- a/datafusion/sqllogictest/src/accounting.rs +++ /dev/null @@ -1,434 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Allocator-driven memory accounting with per-context budgets. -//! -//! The bank ([`ACCOUNTS`]) holds one [`AtomicIsize`] account per stamped -//! `CONTEXT_ID`, each tracking its own remaining budget. Allocations debit -//! the current thread's account, deallocations credit it; below zero is an -//! overdraft. Threads with `CONTEXT_ID == 0` (main, the outer orchestration -//! runtime, blocking-pool hosts) are untracked and skip the hot path. -//! -//! Per-alloc bookkeeping accumulates in a thread-local `LOCAL_BALANCE` -//! drift counter; it settles into the account once `|drift|` crosses -//! [`SETTLE_THRESHOLD`] (64 KB), amortizing the `RwLock` read + atomic -//! op across thousands of allocations. -//! -//! [`account_balance`] reads the current thread's account; it lags reality -//! by up to one threshold's worth of un-settled drift per thread. -//! -//! # Enforcement -//! -//! An allocation that drives the bank negative on a stamped thread -//! (`CONTEXT_ID != 0`) panics with [`OverdraftPanic`] on the polling thread. -//! Drop-chain credits during unwind never re-panic — `track` only fires on -//! debits (`delta < 0`). Unstamped threads are silently skipped. -//! -//! Compiled in only when the `memory-accounting` feature is on. - -use std::alloc::{GlobalAlloc, Layout, System}; -use std::cell::Cell; -use std::collections::HashMap; -use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering}; -use std::sync::{OnceLock, RwLock}; - -/// Net byte change at which a thread flushes its local count into the bank. -/// 64 KB chosen to keep per-thread drift tight (≤1 MB on a 16-core box) while -/// still settling rarely enough to make the bank's atomic op amortized-free. -const SETTLE_THRESHOLD: isize = 64 * 1024; - -/// The bank: every account, keyed by context-id, valued by remaining budget. -/// Debits on alloc, credits on free, negative = overdraft. ctx-id 0 never -/// gets an entry — that's the "untracked thread" marker. -static ACCOUNTS: OnceLock>> = OnceLock::new(); - -/// Starting budget for any new account, set by [`set_default_budget`] and -/// inherited by per-file SLT contexts spawned after. -static DEFAULT_BUDGET: AtomicIsize = AtomicIsize::new(0); - -fn accounts() -> &'static RwLock> { - ACCOUNTS.get_or_init(|| RwLock::new(HashMap::new())) -} - -/// Run `f` against the current thread's account balance, or return `None` -/// if there isn't one — silently skipping the update is fine on the alloc -/// hot path. -fn with_current_balance(op: impl FnOnce(&AtomicIsize) -> R) -> Option { - let ctx_id = CONTEXT_ID.with(|ctx| ctx.get()); - if ctx_id == 0 { - return None; - } - // PERF: acquires an `RwLock` read on every settle. If it ever shows up - // hot, stash a `&'static AtomicIsize` in a thread-local (set in - // `set_thread_context_id`, backed by `Box::leak`) and skip the lookup. - let accounts_lock = ACCOUNTS.get()?; - let accounts = accounts_lock.read().ok()?; - accounts.get(&ctx_id).map(op) -} - -thread_local! { - static LOCAL_BALANCE: Cell = const { Cell::new(0) }; - - /// Account-id stamped onto worker threads via [`set_thread_context_id`]. - /// Zero = untracked thread; nothing to track, nothing to enforce. - static CONTEXT_ID: Cell = const { Cell::new(0) }; -} - -/// Monotonic source of fresh context-ids. Starts at 1; the zero value is -/// reserved for "no per-file runtime" so callers can distinguish. -static CONTEXT_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); - -/// Returns a fresh, never-before-used context-id. Call once per file in the -/// SLT binary and pass the result into the per-file runtime's -/// `on_thread_start` callback so every worker thread of that runtime shares -/// the same id. -pub fn next_context_id() -> usize { - CONTEXT_ID_COUNTER.fetch_add(1, Ordering::Relaxed) + 1 -} - -/// Stamp the current thread with `id`. Intended for `on_thread_start`. -/// Creates the account if it doesn't already exist. -pub fn set_thread_context_id(id: usize) { - if id == 0 { - CONTEXT_ID.with(|ctx| ctx.set(0)); - return; - } - // Insert under the write lock *before* stamping the thread. A HashMap - // resize allocates → recurses through `track` → `with_current_account`, - // which sees `CONTEXT_ID == 0` and bails out instead of trying to - // read-lock the map we're holding for writing on the same thread. - { - let accounts_lock = accounts(); - let mut accounts = accounts_lock - .write() - .unwrap_or_else(|poison| poison.into_inner()); - accounts - .entry(id) - .or_insert_with(|| AtomicIsize::new(DEFAULT_BUDGET.load(Ordering::Relaxed))); - } - CONTEXT_ID.with(|ctx| ctx.set(id)); -} - -/// Current thread's context-id, or 0 if none has been set. -pub fn current_context_id() -> usize { - CONTEXT_ID.with(|ctx| ctx.get()) -} - -/// Payload attached to allocator-induced panics. Catch with: -/// -/// ```ignore -/// match std::panic::catch_unwind(|| { /* ... */ }) { -/// Err(e) if e.is::() => { /* it was an overdraft */ } -/// ... -/// } -/// ``` -#[derive(Debug, Clone)] -pub struct OverdraftPanic { - /// Account balance at the moment the panic fired (negative — that's the point). - pub account_balance: isize, -} - -/// Set the default budget new accounts will be created with. Existing -/// accounts are untouched. -pub fn set_default_budget(value: isize) { - DEFAULT_BUDGET.store(value, Ordering::Relaxed); -} - -/// Current default budget — what a fresh account starts at and what -/// [`reset_account_to_default`] restores to. -pub fn default_budget() -> isize { - DEFAULT_BUDGET.load(Ordering::Relaxed) -} - -/// Restore the current thread's account to [`default_budget`]. Used by the -/// SLT runner after catching an [`OverdraftPanic`] so the next statement -/// starts clean — otherwise the bank stays negative and every subsequent -/// allocation refires, which is unsafe (allocator hooks must not panic -/// repeatedly within a single thread). -pub fn reset_account_to_default() { - set_account_balance(default_budget()); -} - -/// Set the current thread's account balance to `value`. No-op on untracked -/// threads (`CONTEXT_ID == 0`). -pub fn set_account_balance(value: isize) { - let _ = with_current_balance(|bal| bal.store(value, Ordering::Relaxed)); -} - -/// Cross-module config for DataFusion's voluntary `MemoryPool` limit, set -/// from the SLT binary's CLI and read by test_context when building each -/// per-file `RuntimeEnv`. Zero means "use the default `UnboundedMemoryPool`". -static MEMORY_TRACKER_LIMIT: AtomicUsize = AtomicUsize::new(0); - -/// Set the size (in bytes) the per-file `MemoryPool` should be built with. -/// Zero (the default) leaves the existing `UnboundedMemoryPool` behavior. -pub fn set_memory_tracker_limit(bytes: usize) { - MEMORY_TRACKER_LIMIT.store(bytes, Ordering::Relaxed); -} - -/// Current `MemoryPool` limit configured via [`set_memory_tracker_limit`]. -pub fn memory_tracker_limit() -> usize { - MEMORY_TRACKER_LIMIT.load(Ordering::Relaxed) -} - -/// Current account balance. Negative = overdraft. `0` if untracked. -pub fn account_balance() -> isize { - with_current_balance(|bal| bal.load(Ordering::Relaxed)).unwrap_or(0) -} - -/// Current thread's local balance — not yet reflected in the global bank. -/// Always in `(-SETTLE_THRESHOLD, +SETTLE_THRESHOLD)`. Sign matches the bank: -/// negative on a thread that's net-allocated, positive on one that's net-freed. -pub fn local_balance() -> isize { - LOCAL_BALANCE.with(|loc_bal| loc_bal.get()) -} - -/// Force the current thread to flush its local count into its context bank. -/// No-op on untracked threads (`CONTEXT_ID == 0`). -pub fn settle_thread_local() { - if CONTEXT_ID.with(|ctx| ctx.get()) == 0 { - return; - } - let _ = LOCAL_BALANCE.try_with(|loc_bal| { - let drift = loc_bal.replace(0); - if drift != 0 { - let _ = with_current_balance(|bal| bal.fetch_add(drift, Ordering::Relaxed)); - } - }); -} - -/// Record a delta into the current thread's account: settle local drift into -/// the bank when it crosses `±SETTLE_THRESHOLD`, fire the kill panic on a -/// debit that leaves the account negative. -#[inline(always)] -fn track(delta: isize) { - if CONTEXT_ID.with(|ctx| ctx.get()) == 0 { - return; - } - let _ = LOCAL_BALANCE.try_with(|loc_bal| { - let drift = loc_bal.get() + delta; - // 99% case: drift fits — accumulate locally and bail. - if -SETTLE_THRESHOLD < drift && drift < SETTLE_THRESHOLD { - loc_bal.set(drift); - return; - } - // Drop the read lock *before* maybe_kill — the panic allocates, - // recurses through track, and would self-deadlock on std::sync::RwLock. - let new_bal = with_current_balance(|bal| { - bal.fetch_add(drift, Ordering::Relaxed).wrapping_add(drift) - }); - loc_bal.set(0); - // Only debits fire the kill — credits run inside Drop chains during - // unwinding, where a panic would double-fault and abort the process. - if delta >= 0 { - return; - } - let Some(new_bal) = new_bal else { return }; - if new_bal >= 0 { - return; - } - // Skip if we're already unwinding — `panic_any` boxes the payload, - // which allocates, which re-enters `track`; without this gate the - // second debit would fire a nested panic and abort the process. - if std::thread::panicking() { - return; - } - std::panic::panic_any(OverdraftPanic { - account_balance: new_bal, - }); - }); -} - -/// `GlobalAlloc` wrapper that counts bytes against a thread-local + global bank. -/// -/// Forwards every operation unchanged to the inner allocator; the bookkeeping -/// is a thread-local update on the fast path plus an amortized atomic settle. -pub struct AccountingAllocator { - inner: A, -} - -impl AccountingAllocator { - pub const fn new(inner: A) -> Self { - Self { inner } - } -} - -impl AccountingAllocator { - /// Convenience constructor for the typical `System`-backed case. - pub const fn system() -> Self { - Self { inner: System } - } -} - -unsafe impl GlobalAlloc for AccountingAllocator { - unsafe fn alloc(&self, layout: Layout) -> *mut u8 { - // Account BEFORE the inner alloc. If we panicked AFTER `inner.alloc` - // succeeded, the bytes are physically allocated but no caller ever - // sees the pointer → unwind leaks the very bytes that pushed us - // over the budget — the opposite of what the kill panic is for. - let delta = -(layout.size() as isize); - track(delta); - // SAFETY: layout is forwarded unchanged. - let ptr = unsafe { self.inner.alloc(layout) }; - if ptr.is_null() { - // Allocator refused — refund so the bank matches reality. - track(-delta); - } - ptr - } - - unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { - // SAFETY: caller upholds GlobalAlloc invariants; we forward unchanged. - unsafe { self.inner.dealloc(ptr, layout) }; - // Credit only; `track()` short-circuits on `delta >= 0` and never - // panics, so ordering relative to `inner.dealloc` doesn't matter. - track(layout.size() as isize); - } - - unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { - // Same panic-then-leak hazard as `alloc`; account first. - let delta = -(layout.size() as isize); - track(delta); - // SAFETY: layout is forwarded unchanged. - let ptr = unsafe { self.inner.alloc_zeroed(layout) }; - if ptr.is_null() { - track(-delta); - } - ptr - } - - unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { - // Account BEFORE the inner realloc so a kill panic doesn't strand the - // caller with a freed `ptr`. `inner.realloc` frees `ptr` on success; - // if we panicked after that, the caller's `Vec`-or-similar would - // still hold the old pointer and double-free on unwind (glibc - // "double free or corruption (out)" + SIGABRT). - let delta = layout.size() as isize - new_size as isize; - track(delta); - // SAFETY: caller upholds GlobalAlloc invariants; we forward unchanged. - let new_ptr = unsafe { self.inner.realloc(ptr, layout, new_size) }; - if new_ptr.is_null() { - // Allocator refused — refund so the bank matches reality. - track(-delta); - } - new_ptr - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[global_allocator] - static GLOBAL: AccountingAllocator = AccountingAllocator::system(); - - /// Each test runs on its own thread (cargo-test parallelism) and stamps a - /// fresh context-id, so per-context isolation makes them naturally - /// independent — no shared mutex required. - fn enter_fresh_context() { - set_thread_context_id(next_context_id()); - } - - #[test] - fn alloc_debits_and_free_credits_account() { - enter_fresh_context(); - // Bump budget well above the alloc + this thread's own background - // drift so the test's own activity can't accidentally overdraw. - set_account_balance(10_000_000); - settle_thread_local(); - let before = account_balance(); - - let buf: Vec = vec![0u8; 8192]; - settle_thread_local(); - let mid = account_balance(); - // Alloc debited the account → mid should be at least 8192 below before. - assert!( - before - mid >= 8192, - "alloc didn't debit: before={before} mid={mid}" - ); - - drop(buf); - settle_thread_local(); - let after = account_balance(); - // Free credited the account → after should be at least 8192 above mid. - assert!( - after - mid >= 8192, - "free didn't credit: mid={mid} after={after}" - ); - } - - #[test] - fn set_account_balance_sticks() { - enter_fresh_context(); - set_account_balance(1_000_000); - // Balance drifts a little from this thread's own allocator activity - // between the set and the read, so we expect at-or-below the set value. - let bal = account_balance(); - assert!( - (900_000..=1_000_000).contains(&bal), - "set_account_balance didn't stick: bal={bal}" - ); - } - - #[test] - fn overdraft_on_stamped_thread_panics() { - use std::panic::{AssertUnwindSafe, catch_unwind}; - enter_fresh_context(); - set_account_balance(1024); - - let result = catch_unwind(AssertUnwindSafe(|| { - // Alloc large enough to cross SETTLE_THRESHOLD in one shot — the - // settle drives the bank negative on a stamped thread, which now - // unconditionally panics. - let _buf: Vec = vec![0u8; SETTLE_THRESHOLD as usize + 4096]; - unreachable!("alloc should have panicked"); - })); - - let payload = result.expect_err("alloc should have panicked"); - let overdraft = payload - .downcast_ref::() - .expect("panic payload should be OverdraftPanic"); - assert!( - overdraft.account_balance < 0, - "payload should report negative balance; got {}", - overdraft.account_balance - ); - } - - #[test] - fn threshold_settlement_flushes_to_account() { - enter_fresh_context(); - // Bump budget — the settle on threshold crossing now panics on - // a stamped thread if it goes negative. We just want to observe the - // flush mechanism here, not the kill. - set_account_balance(10_000_000); - settle_thread_local(); - let before = account_balance(); - - let buf: Vec = vec![0u8; SETTLE_THRESHOLD as usize + 1024]; - // Crossing the threshold auto-settles; account balance should have - // dropped by at least SETTLE_THRESHOLD without us calling - // settle_thread_local. - let after_alloc = account_balance(); - assert!( - before - after_alloc >= SETTLE_THRESHOLD, - "balance didn't auto-settle on threshold crossing: \ - before={before} after_alloc={after_alloc}" - ); - drop(buf); - } -} diff --git a/datafusion/sqllogictest/src/accounting_pool.rs b/datafusion/sqllogictest/src/accounting_pool.rs deleted file mode 100644 index a9d2db9f12261..0000000000000 --- a/datafusion/sqllogictest/src/accounting_pool.rs +++ /dev/null @@ -1,174 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`AccountingMemoryPool`] bridges DataFusion's voluntary memory tracking -//! to the allocator-level bank in [`crate::accounting`]. -//! -//! It wraps any [`MemoryPool`] and re-tunes the current thread's bank -//! account whenever the pool's limit changes (via [`MemoryPool::try_resize`], -//! which `RuntimeEnvBuilder::with_memory_limit` triggers on `SET -//! datafusion.runtime.memory_limit = '…'`). -//! -//! Each retune sets the bank to `new_limit * HEADROOM_FACTOR`. A query -//! that allocates past that envelope panics with an `OverdraftPanic` — -//! the gap between DF's voluntary tracker and the allocator's reality -//! is the bug we're hunting. - -use crate::set_account_balance; -use datafusion::common::Result; -use datafusion::execution::memory_pool::{ - MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, -}; -use std::fmt::{self, Display, Formatter}; -use std::sync::Arc; - -/// Headroom over the pool's declared limit. Anything past this is an -/// untracked allocation — by definition, since DF's pool didn't see it. -/// -/// 800% high, but that's what it takes to pass the SLT suite right now. Goal should be ~10% -const HEADROOM_FACTOR: f64 = 8.0; - -pub struct AccountingMemoryPool { - inner: Arc, - /// The operator-configured default pool size, used as a "no SET has - /// happened yet" sentinel by [`Self::memory_limit`]. - default_size: usize, -} - -impl AccountingMemoryPool { - pub fn new(inner: Arc, default_size: usize) -> Self { - Self { - inner, - default_size, - } - } -} - -impl fmt::Debug for AccountingMemoryPool { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("AccountingMemoryPool") - .field("inner", &self.inner) - .field("default_size", &self.default_size) - .finish() - } -} - -impl Display for AccountingMemoryPool { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "accounting({})", self.inner) - } -} - -impl MemoryPool for AccountingMemoryPool { - fn name(&self) -> &str { - "accounting" - } - - fn register(&self, consumer: &MemoryConsumer) { - self.inner.register(consumer) - } - - fn unregister(&self, consumer: &MemoryConsumer) { - self.inner.unregister(consumer) - } - - fn grow(&self, reservation: &MemoryReservation, additional: usize) { - self.inner.grow(reservation, additional) - } - - fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { - self.inner.shrink(reservation, shrink) - } - - fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { - self.inner.try_grow(reservation, additional) - } - - fn reserved(&self) -> usize { - self.inner.reserved() - } - - fn memory_limit(&self) -> MemoryLimit { - // HACK: When the inner pool still reports the operator-configured - // default, no `SET datafusion.runtime.memory_limit` has happened — - // render as `Infinite` so `information_schema.slt`'s `SHOW ALL` - // expectation of `unlimited` for an un-SET context stays satisfied. - // Once a SET fires, `try_resize` mutates the inner pool to some - // other value and we report the real limit. - match self.inner.memory_limit() { - MemoryLimit::Finite(n) if n == self.default_size => MemoryLimit::Infinite, - other => other, - } - } - - fn try_resize(&self, new_limit: usize) -> Result<()> { - self.inner.try_resize(new_limit)?; - set_account_balance((new_limit as f64 * HEADROOM_FACTOR) as isize); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{account_balance, next_context_id, set_thread_context_id}; - use datafusion::execution::memory_pool::GreedyMemoryPool; - - #[test] - fn memory_limit_returns_infinite_for_sentinel() { - let default_size = 1_000_000; - let pool = AccountingMemoryPool::new( - Arc::new(GreedyMemoryPool::new(default_size)), - default_size, - ); - assert!(matches!(pool.memory_limit(), MemoryLimit::Infinite)); - } - - #[test] - fn memory_limit_returns_finite_after_resize() { - let default_size = 1_000_000; - let pool = AccountingMemoryPool::new( - Arc::new(GreedyMemoryPool::new(default_size)), - default_size, - ); - pool.try_resize(50_000).unwrap(); - assert!(matches!(pool.memory_limit(), MemoryLimit::Finite(50_000))); - } - - #[test] - fn try_resize_retunes_current_account_balance() { - // Stamp a fresh context so set_account_balance lands somewhere - // visible. Otherwise CONTEXT_ID == 0 means the call is a no-op. - set_thread_context_id(next_context_id()); - - let default_size = 1_000_000; - let pool = AccountingMemoryPool::new( - Arc::new(GreedyMemoryPool::new(default_size)), - default_size, - ); - pool.try_resize(50_000).unwrap(); - - // Balance is reset to limit * HEADROOM_FACTOR, minus a small - // drift from this test thread's own allocs between set and read. - let expected = (50_000.0 * HEADROOM_FACTOR) as isize; - let bal = account_balance(); - assert!( - (50_000..=expected).contains(&bal), - "balance not in expected range: got {bal}, expected ≤ {expected}" - ); - } -} diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index 0c038fb00fa08..08facc48005dc 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -83,50 +83,6 @@ impl DataFusion { self } - /// Run a single query through the engine. Under the `memory-accounting` - /// feature, allocator-detected overdrafts panic with `OverdraftPanic`; - /// catch them here and translate to a clean `Err`. - async fn run_one(&self, sql: &str) -> Result { - #[cfg(feature = "memory-accounting")] - { - use crate::OverdraftPanic; - use futures::FutureExt; - - let fut = run_query(&self.ctx, is_spark_path(&self.relative_path), sql); - - return match std::panic::AssertUnwindSafe(fut).catch_unwind().await { - Ok(r) => r, - Err(payload) => { - if let Some(od) = payload.downcast_ref::() { - let df_reserved_mb = - (self.ctx.runtime_env().memory_pool.reserved() as u64) - / (1024 * 1024); - warn!( - "[{}] killed by allocator overdraft: \ - account balance = {} bytes, df-pool reserved = {df_reserved_mb} MB; \ - sql = {sql:?}", - self.relative_path.display(), - od.account_balance, - ); - // Restore the bank so the next statement starts clean - crate::reset_account_to_default(); - Err(DFSqlLogicTestError::Other(format!( - "allocator overdraft: account balance at panic = {} bytes", - od.account_balance, - ))) - } else { - // Not our panic — re-raise so test runner sees it. - std::panic::resume_unwind(payload); - } - } - }; - } - #[cfg(not(feature = "memory-accounting"))] - { - run_query(&self.ctx, is_spark_path(&self.relative_path), sql).await - } - } - fn update_slow_count(&self) { let msg = self.pb.message(); let split: Vec<&str> = msg.split(" ").collect(); @@ -198,7 +154,7 @@ impl sqllogictest::AsyncDB for DataFusion { let tracked_sql = self.currently_executing_sql_tracker.set_sql(sql); let start = Instant::now(); - let result = self.run_one(sql).await; + let result = run_query(&self.ctx, is_spark_path(&self.relative_path), sql).await; let duration = start.elapsed(); self.currently_executing_sql_tracker.remove_sql(tracked_sql); diff --git a/datafusion/sqllogictest/src/lib.rs b/datafusion/sqllogictest/src/lib.rs index 54f460958c0ab..6b6c40365f855 100644 --- a/datafusion/sqllogictest/src/lib.rs +++ b/datafusion/sqllogictest/src/lib.rs @@ -26,23 +26,9 @@ //! DataFusion sqllogictest driver -#[cfg(feature = "memory-accounting")] -mod accounting; -#[cfg(feature = "memory-accounting")] -mod accounting_pool; mod engines; mod test_file; -#[cfg(feature = "memory-accounting")] -pub use accounting::{ - AccountingAllocator, OverdraftPanic, account_balance, current_context_id, - default_budget, local_balance, memory_tracker_limit, next_context_id, - reset_account_to_default, set_account_balance, set_default_budget, - set_memory_tracker_limit, set_thread_context_id, settle_thread_local, -}; -#[cfg(feature = "memory-accounting")] -pub use accounting_pool::AccountingMemoryPool; - pub use engines::CurrentlyExecutingSqlTracker; pub use engines::DFColumnType; pub use engines::DFOutput; @@ -61,6 +47,6 @@ mod test_context; mod util; pub use filters::*; -pub use test_context::{SLT_TARGET_PARTITIONS, TestContext}; +pub use test_context::TestContext; pub use test_file::TestFile; pub use util::*; diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 8d437271fee86..e0aaa91ef6369 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -60,20 +60,12 @@ use async_trait::async_trait; use datafusion::common::cast::as_float64_array; use datafusion::execution::SessionStateBuilder; use datafusion::execution::runtime_env::RuntimeEnv; -#[cfg(feature = "memory-accounting")] -use datafusion::execution::runtime_env::RuntimeEnvBuilder; use log::info; use sqlparser::ast; use tempfile::TempDir; mod range_partitioning; -/// Target partition count used for every SLT file's `SessionConfig`. Hardcoded -/// so query plans are deterministic across machines. The SLT binary also -/// sizes each file's per-file Tokio runtime to this value so partition streams -/// each get a worker rather than contending. -pub const SLT_TARGET_PARTITIONS: usize = 4; - /// Context for running tests pub struct TestContext { /// Context for running queries @@ -99,33 +91,6 @@ impl TypePlanner for SqlLogicTestTypePlanner { } } -/// Construct the per-file `RuntimeEnv`. With the `memory-accounting` feature -/// on and a non-zero `memory_tracker_limit()` configured, this wraps the -/// usual `TrackConsumersPool(GreedyMemoryPool)` in an `AccountingMemoryPool` -/// so the allocator-level bank retunes on every `SET datafusion.runtime. -/// memory_limit`. Otherwise falls back to the historical default. -fn build_runtime_env() -> RuntimeEnv { - #[cfg(feature = "memory-accounting")] - { - use datafusion::execution::memory_pool::{GreedyMemoryPool, TrackConsumersPool}; - use std::num::NonZeroUsize; - - let limit = crate::memory_tracker_limit(); - if limit > 0 { - let tracked = TrackConsumersPool::new( - GreedyMemoryPool::new(limit), - NonZeroUsize::new(5).unwrap(), - ); - let wrapped = crate::AccountingMemoryPool::new(Arc::new(tracked), limit); - return RuntimeEnvBuilder::new() - .with_memory_pool(Arc::new(wrapped)) - .build() - .expect("RuntimeEnvBuilder::build with accounting pool"); - } - } - RuntimeEnv::default() -} - impl TestContext { pub fn new(ctx: SessionContext) -> Self { Self { @@ -142,8 +107,8 @@ impl TestContext { pub async fn try_new_for_test_file(relative_path: &Path) -> Option { let config = SessionConfig::new() // hardcode target partitions so plans are deterministic - .with_target_partitions(SLT_TARGET_PARTITIONS); - let runtime = Arc::new(build_runtime_env()); + .with_target_partitions(4); + let runtime = Arc::new(RuntimeEnv::default()); let mut state_builder = SessionStateBuilder::new() .with_config(config) diff --git a/docs/source/contributor-guide/testing.md b/docs/source/contributor-guide/testing.md index 3e44e3aabaeef..3b644f610b90e 100644 --- a/docs/source/contributor-guide/testing.md +++ b/docs/source/contributor-guide/testing.md @@ -113,18 +113,6 @@ Like similar systems such as [DuckDB](https://duckdb.org/dev/testing), DataFusio DataFusion has integrated [sqlite's test suite](https://sqlite.org/sqllogictest/doc/trunk/about.wiki) as a supplemental test suite that is run whenever a PR is merged into DataFusion. To run it manually please refer to the [README](https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/README.md#running-tests-sqlite) file for instructions. -### Allocator-level memory accounting (`--features memory-accounting`) - -For tests that need to verify DataFusion's voluntary memory tracking -matches actual heap usage, the `sqllogictest` runner ships an optional -`memory-accounting` feature that installs a global allocator wrapper. -Adding `SET datafusion.runtime.memory_limit = 'N'` at the top of an -`.slt` file opts that file into allocator-vs-`MemoryPool` reconciliation -with 10% headroom — any divergence panics the test with an -`OverdraftPanic` reporting the actual allocator balance. See -[the sqllogictest README](https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/README.md#running-tests-allocator-level-memory-accounting) -for the runner flag and the full mechanism. - ## Snapshot testing (`cargo insta`) [Insta](https://github.com/mitsuhiko/insta) is used for snapshot testing. Snapshots are generated