diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 8a01c8afdf4ae0..a2c32f020ed48a 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -27,7 +27,6 @@ #include "runtime/memory/chunk_allocator.h" #include "runtime/memory/mem_tracker.h" -#include "runtime/thread_context.h" #include "util/bit_util.h" #include "util/doris_metrics.h" @@ -67,6 +66,8 @@ MemPool::~MemPool() { total_bytes_released += chunk.chunk.size; ChunkAllocator::instance()->free(chunk.chunk); } + THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, + ExecEnv::GetInstance()->process_mem_tracker().get()); if (_mem_tracker) _mem_tracker->release(total_bytes_released); DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -87,12 +88,15 @@ void MemPool::free_all() { total_bytes_released += chunk.chunk.size; ChunkAllocator::instance()->free(chunk.chunk); } + THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, + ExecEnv::GetInstance()->process_mem_tracker().get()); if (_mem_tracker) _mem_tracker->release(total_bytes_released); chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; current_chunk_idx_ = -1; total_allocated_bytes_ = 0; total_reserved_bytes_ = 0; + peak_allocated_bytes_ = 0; DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -145,6 +149,7 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) { // Allocate a new chunk. Return early if allocate fails. Chunk chunk; RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk)); + THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->process_mem_tracker().get()); if (_mem_tracker) _mem_tracker->consume(chunk_size); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. @@ -215,7 +220,7 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { src->total_allocated_bytes_ = 0; } - peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_); + reset_peak(); if (!keep_current) src->free_all(); DCHECK(src->check_integrity(false)); diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 84a1b79a303f88..6abbae93e3324b 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -32,7 +32,9 @@ #include "common/status.h" #include "gutil/dynamic_annotations.h" #include "olap/olap_define.h" +#include "runtime/exec_env.h" #include "runtime/memory/chunk.h" +#include "runtime/thread_context.h" #include "util/bit_util.h" namespace doris { @@ -209,6 +211,14 @@ class MemPool { /// data. Otherwise the current chunk can be either empty or full. bool check_integrity(bool check_current_chunk_empty); + void reset_peak() { + if (total_allocated_bytes_ - peak_allocated_bytes_ > 1024) { + THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_, + ExecEnv::GetInstance()->process_mem_tracker().get()); + peak_allocated_bytes_ = total_allocated_bytes_; + } + } + /// Return offset to unoccupied space in current chunk. int64_t get_free_offset() const { if (current_chunk_idx_ == -1) return 0; @@ -240,7 +250,7 @@ class MemPool { DCHECK_LE(info.allocated_bytes + size, info.chunk.size); info.allocated_bytes += padding + size; total_allocated_bytes_ += padding + size; - peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_); + reset_peak(); DCHECK_LE(current_chunk_idx_, chunks_.size() - 1); return result; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 4976c6ae058241..07bd13c474bd4a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -61,6 +61,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { // TCMalloc hook will be triggered during destructor memtracker, may cause crash. if (_label == "Process") doris::thread_context_ptr._init = false; DCHECK(remain_child_count() == 0 || _label == "Process"); + consume(_untracked_mem.exchange(0)); if (_parent) { std::lock_guard l(_parent->_child_tracker_limiter_lock); if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 1a8ba5a1497e29..f7a81d49bb4f47 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -173,6 +173,12 @@ class MemTrackerLimiter final : public MemTracker { WARN_UNUSED_RESULT Status try_consume(int64_t bytes); + // When the accumulated untracked memory value exceeds the upper limit, + // the current value is returned and set to 0. + // Thread safety. + int64_t add_untracked_mem(int64_t bytes); + void consume_cache(int64_t bytes); + // Log consumption of all the trackers provided. Returns the sum of consumption in // 'logged_consumption'. 'max_recursive_depth' specifies the maximum number of levels // of children to include in the dump. If it is zero, then no children are dumped. @@ -194,6 +200,10 @@ class MemTrackerLimiter final : public MemTracker { // _all_ancestors with valid limits std::vector _limited_ancestors; + // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate + // to avoid frequent calls to consume/release of MemTracker. + std::atomic _untracked_mem = 0; + // Child trackers of this tracker limiter. Used for error reporting and // listing only (i.e. updating the consumption of a parent tracker limiter does not // update that of its children). @@ -222,12 +232,24 @@ class MemTrackerLimiter final : public MemTracker { }; inline void MemTrackerLimiter::consume(int64_t bytes) { - if (bytes == 0) { - return; - } else { - for (auto& tracker : _all_ancestors) { - tracker->_consumption->add(bytes); - } + if (bytes == 0) return; + for (auto& tracker : _all_ancestors) { + tracker->_consumption->add(bytes); + } +} + +inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) { + _untracked_mem += bytes; + if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) { + return _untracked_mem.exchange(0); + } + return 0; +} + +inline void MemTrackerLimiter::consume_cache(int64_t bytes) { + int64_t consume_bytes = add_untracked_mem(bytes); + if (consume_bytes != 0) { + consume(consume_bytes); } } diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index b63b25df179154..3413fcb8acfb9e 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -90,7 +90,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is, // the negative number of the current value of consume. it->second->parent()->consumption_revise(-it->second->consumption()); - LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first; + LOG(INFO) << fmt::format( + "Deregister query/load memory tracker, queryId={}, Limit={}, PeakUsed={}", + it->first, it->second->limit(), it->second->peak_consumption()); expired_task_ids.emplace_back(it->first); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 1c9cdfc953b0ac..efb7442fed42ea 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -75,12 +75,13 @@ class ThreadMemTrackerMgr { // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, void consume(int64_t size); + // Will not change the value of process_mem_tracker, even though mem_tracker == process_mem_tracker. void transfer_to(int64_t size, MemTrackerLimiter* mem_tracker) { consume(-size); - mem_tracker->consume(size); + mem_tracker->consume_cache(size); } void transfer_from(int64_t size, MemTrackerLimiter* mem_tracker) { - mem_tracker->release(size); + mem_tracker->consume_cache(-size); consume(size); } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 9379f12a9dfa31..e7bbc3664dd3c8 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -249,6 +249,7 @@ class StopCheckThreadMemTrackerLimit { } }; +// The following macros are used to fix the tracking accuracy of caches etc. #define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \ auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit() #define CONSUME_THREAD_MEM_TRACKER(size) \ diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index b1148e2887184b..f0e16b44502a4c 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -102,6 +102,7 @@ class PODArrayBase : private boost::noncopyable, char* c_start = null; /// Does not include pad_left. char* c_end = null; char* c_end_of_storage = null; /// Does not include pad_right. + char* c_end_peak = null; /// The amount of memory occupied by the num_elements of the elements. static size_t byte_size(size_t num_elements) { return num_elements * ELEMENT_SIZE; } @@ -111,15 +112,26 @@ class PODArrayBase : private boost::noncopyable, return byte_size(num_elements) + pad_right + pad_left; } + inline void reset_peak() { + if (UNLIKELY(c_end - c_end_peak > 1024)) { + THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak, + ExecEnv::GetInstance()->process_mem_tracker().get()); + c_end_peak = c_end; + } + } + void alloc_for_num_elements(size_t num_elements) { alloc(round_up_to_power_of_two_or_zero(minimum_memory_for_elements(num_elements))); } template void alloc(size_t bytes, TAllocatorParams&&... allocator_params) { - c_start = c_end = reinterpret_cast(TAllocator::alloc( - bytes, std::forward(allocator_params)...)) + - pad_left; + THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left, + ExecEnv::GetInstance()->process_mem_tracker().get()); + c_start = c_end = c_end_peak = + reinterpret_cast(TAllocator::alloc( + bytes, std::forward(allocator_params)...)) + + pad_left; c_end_of_storage = c_start + bytes - pad_right - pad_left; if (pad_left) memset(c_start - ELEMENT_SIZE, 0, ELEMENT_SIZE); @@ -131,6 +143,8 @@ class PODArrayBase : private boost::noncopyable, unprotect(); TAllocator::free(c_start - pad_left, allocated_bytes()); + THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak, + ExecEnv::GetInstance()->process_mem_tracker().get()); } template @@ -142,6 +156,9 @@ class PODArrayBase : private boost::noncopyable, unprotect(); + THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(), + ExecEnv::GetInstance()->process_mem_tracker().get()); + ptrdiff_t end_diff = c_end - c_start; c_start = reinterpret_cast(TAllocator::realloc( @@ -149,7 +166,7 @@ class PODArrayBase : private boost::noncopyable, std::forward(allocator_params)...)) + pad_left; - c_end = c_start + end_diff; + c_end = c_end_peak = c_start + end_diff; c_end_of_storage = c_start + bytes - pad_right - pad_left; } @@ -220,7 +237,10 @@ class PODArrayBase : private boost::noncopyable, resize_assume_reserved(n); } - void resize_assume_reserved(const size_t n) { c_end = c_start + byte_size(n); } + void resize_assume_reserved(const size_t n) { + c_end = c_start + byte_size(n); + reset_peak(); + } const char* raw_data() const { return c_start; } @@ -231,6 +251,7 @@ class PODArrayBase : private boost::noncopyable, memcpy(c_end, ptr, ELEMENT_SIZE); c_end += byte_size(1); + reset_peak(); } void protect() { @@ -283,6 +304,7 @@ class PODArray : public PODArrayBasealloc_for_num_elements(n); this->c_end += this->byte_size(n); + this->reset_peak(); } PODArray(size_t n, const T& x) { @@ -334,7 +356,10 @@ class PODArray : public PODArrayBasec_end; } - void set_end_ptr(void* ptr) { this->c_end = (char*)ptr; } + void set_end_ptr(void* ptr) { + this->c_end = (char*)ptr; + this->reset_peak(); + } /// Same as resize, but zeroes new elements. void resize_fill(size_t n) { @@ -344,6 +369,7 @@ class PODArray : public PODArrayBasec_end, 0, this->byte_size(n - old_size)); } this->c_end = this->c_start + this->byte_size(n); + this->reset_peak(); } void resize_fill(size_t n, const T& value) { @@ -353,6 +379,7 @@ class PODArray : public PODArrayBasec_end = this->c_start + this->byte_size(n); + this->reset_peak(); } template @@ -362,6 +389,7 @@ class PODArray : public PODArrayBase(x)); this->c_end += this->byte_size(1); + this->reset_peak(); } template @@ -373,6 +401,7 @@ class PODArray : public PODArrayBasec_end = new_end; + this->reset_peak(); } } @@ -381,6 +410,7 @@ class PODArray : public PODArrayBasec_end += sizeof(T) * num; + this->reset_peak(); } /** @@ -391,6 +421,7 @@ class PODArray : public PODArrayBase(x)); this->c_end += this->byte_size(1); + this->reset_peak(); } /** This method doesn't allow to pass parameters for Allocator, @@ -402,6 +433,7 @@ class PODArray : public PODArrayBase(args)...); this->c_end += this->byte_size(1); + this->reset_peak(); } void pop_back() { this->c_end -= this->byte_size(1); } @@ -432,6 +464,7 @@ class PODArray : public PODArrayBasec_end, reinterpret_cast(&*from_begin), bytes_to_copy); this->c_end += bytes_to_copy; + this->reset_peak(); } template @@ -448,6 +481,7 @@ class PODArray : public PODArrayBasec_end - bytes_to_move, reinterpret_cast(&*from_begin), bytes_to_copy); this->c_end += bytes_to_copy; + this->reset_peak(); } template @@ -455,6 +489,7 @@ class PODArray : public PODArrayBasebyte_size(from_end - from_begin); memcpy(this->c_end, reinterpret_cast(&*from_begin), bytes_to_copy); this->c_end += bytes_to_copy; + this->reset_peak(); } void swap(PODArray& rhs) { @@ -480,12 +515,13 @@ class PODArray : public PODArrayBasebyte_size(heap_size); + arr1.c_end_peak = arr2.c_end_peak; /// Allocate stack space for arr2. arr2.alloc(stack_allocated); /// Copy the stack content. memcpy(arr2.c_start, stack_c_start, this->byte_size(stack_size)); - arr2.c_end = arr2.c_start + this->byte_size(stack_size); + arr2.c_end = arr2.c_end_peak = arr2.c_start + this->byte_size(stack_size); }; auto do_move = [this](PODArray& src, PODArray& dest) { @@ -493,15 +529,17 @@ class PODArray : public PODArrayBasebyte_size(src.size())); - dest.c_end = dest.c_start + (src.c_end - src.c_start); + dest.c_end = dest.c_end_peak = dest.c_start + (src.c_end - src.c_start); src.c_start = Base::null; src.c_end = Base::null; src.c_end_of_storage = Base::null; + src.c_end_peak = Base::null; } else { std::swap(dest.c_start, src.c_start); std::swap(dest.c_end, src.c_end); std::swap(dest.c_end_of_storage, src.c_end_of_storage); + std::swap(dest.c_end_peak, src.c_end_peak); } }; @@ -538,6 +576,8 @@ class PODArray : public PODArrayBasec_end = this->c_start + this->byte_size(rhs_size); rhs.c_end = rhs.c_start + this->byte_size(lhs_size); + this->reset_peak(); + rhs.reset_peak(); } else if (this->is_allocated_from_stack() && !rhs.is_allocated_from_stack()) { swap_stack_heap(*this, rhs); } else if (!this->is_allocated_from_stack() && rhs.is_allocated_from_stack()) { @@ -546,6 +586,7 @@ class PODArray : public PODArrayBasec_start, rhs.c_start); std::swap(this->c_end, rhs.c_end); std::swap(this->c_end_of_storage, rhs.c_end_of_storage); + std::swap(this->c_end_peak, rhs.c_end_peak); } } @@ -563,6 +604,7 @@ class PODArray : public PODArrayBasebyte_size(required_capacity); memcpy(this->c_start, reinterpret_cast(&*from_begin), bytes_to_copy); this->c_end = this->c_start + bytes_to_copy; + this->reset_peak(); } void assign(const PODArray& from) { assign(from.begin(), from.end()); }