Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn cross_join() {
}

#[tokio::test]
async fn merge_join() {
async fn sort_merge_join_no_spill() {
// Planner chooses MergeJoin only if number of partitions > 1
let config = SessionConfig::new()
.with_target_partitions(2)
Expand All @@ -175,11 +175,32 @@ async fn merge_join() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"Failed to allocate additional",
"SMJStream",
"Disk spilling disabled",
])
.with_memory_limit(1_000)
.with_config(config)
.with_scenario(Scenario::AccessLogStreaming)
.run()
.await
}

#[tokio::test]
async fn sort_merge_join_spill() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case can only make sure the query can run, it may or may not be spilling.

We should have some ways to verify the spilling is actually happened.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately exactly this test case we cannot access any spilling metrics, but there is another test above sort_merge_join_no_spill which is exactly the same but expectedly fails by mem issue and have the spilling disabled explicitly. This test passes without issues and with spilling enabled so we can conclude the spilling happened.

// Planner chooses MergeJoin only if number of partitions > 1
let config = SessionConfig::new()
.with_target_partitions(2)
.set_bool("datafusion.optimizer.prefer_hash_join", false);

TestCase::new()
.with_query(
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_memory_limit(1_000)
.with_config(config)
.with_disk_manager_config(DiskManagerConfig::NewOs)
.with_scenario(Scenario::AccessLogStreaming)
.run()
.await
}
Expand Down Expand Up @@ -453,7 +474,7 @@ impl TestCase {
let table = scenario.table();

let rt_config = RuntimeConfig::new()
// do not allow spilling
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

Expand Down
19 changes: 18 additions & 1 deletion datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`MemoryPool`] for memory management during query execution, [`proxy]` for
//! help with allocation accounting.

use datafusion_common::Result;
use datafusion_common::{internal_err, Result};
use std::{cmp::Ordering, sync::Arc};

mod pool;
Expand Down Expand Up @@ -220,6 +220,23 @@ impl MemoryReservation {
self.size = new_size
}

/// Tries to free `capacity` bytes from this reservation
/// if `capacity` does not exceed [`Self::size`]
/// Returns new reservation size
/// or error if shrinking capacity is more than allocated size
pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
if let Some(new_size) = self.size.checked_sub(capacity) {
self.registration.pool.shrink(self, capacity);
self.size = new_size;
Ok(new_size)
} else {
internal_err!(
"Cannot free the capacity {capacity} out of allocated size {}",
self.size
)
}
}

/// Sets the size of this reservation to `capacity`
pub fn resize(&mut self, capacity: usize) {
match capacity.cmp(&self.size) {
Expand Down
Loading