Skip to content
Open

DNM: rss #10865

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/BackgroundTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void CollectProcInfoBackgroundTask::memCheckJob()
// Update the memory usage of the current process. Defined in Common/MemoryTracker.cpp
auto res = get_process_mem_usage();
real_rss = res.resident_bytes;
rss_file = res.rss_file_bytes;
proc_num_threads = res.cur_proc_num_threads;
proc_virt_size = res.cur_virt_bytes;
baseline_of_query_mem_tracker = root_of_query_mem_trackers->get();
Expand Down
79 changes: 56 additions & 23 deletions dbms/src/Common/MemoryAllocTrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <common/config_common.h> // Included for `USE_JEMALLOC`

#include <fstream>
#include <sstream>
#include <string_view>

#if USE_JEMALLOC
#include <jemalloc/jemalloc.h>
Expand All @@ -38,45 +40,76 @@ std::tuple<uint64_t *, uint64_t *> getAllocDeallocPtr()
#endif
}

bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size)
static bool parseStatusFieldKb(const std::string & line, std::string_view field_name, UInt64 & out_kb)
{
resident_set = 0.0;
if (line.rfind(field_name, 0) != 0)
return false;

const auto colon_pos = line.find(':');
if (colon_pos == std::string::npos)
return false;

std::istringstream iss(line.substr(colon_pos + 1));
iss >> out_kb;
return !iss.fail();
}

static bool parseStatusFieldInt(const std::string & line, std::string_view field_name, Int64 & out_value)
{
if (line.rfind(field_name, 0) != 0)
return false;

// 'file' stat seems to give the most reliable results
std::ifstream stat_stream("/proc/self/stat", std::ios_base::in);
// if "/proc/self/stat" is not supported
if (!stat_stream.is_open())
const auto colon_pos = line.find(':');
if (colon_pos == std::string::npos)
return false;

// dummy vars for leading entries in stat that we don't care about
std::string pid, comm, state, ppid, pgrp, session, tty_nr;
std::string tpgid, flags, minflt, cminflt, majflt, cmajflt;
std::string utime, stime, cutime, cstime, priority, nice;
std::string itrealvalue, starttime;
std::istringstream iss(line.substr(colon_pos + 1));
iss >> out_value;
return !iss.fail();
}

// the field we want
Int64 rss;
bool process_mem_usage(
UInt64 & resident_bytes,
UInt64 & rss_file_bytes,
Int64 & cur_proc_num_threads,
UInt64 & cur_virt_size)
{
std::ifstream status_stream("/proc/self/status", std::ios_base::in);
if (!status_stream.is_open())
return false;

stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt
>> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> cur_proc_num_threads
>> itrealvalue >> starttime >> cur_virt_size >> rss; // don't care about the rest
UInt64 vm_rss_kb = 0;
UInt64 rss_file_kb = 0;
UInt64 vm_size_kb = 0;
Int64 threads = 1;

stat_stream.close();
std::string line;
while (std::getline(status_stream, line))
{
if (parseStatusFieldKb(line, "VmRSS", vm_rss_kb))
resident_bytes = vm_rss_kb * 1024;
if (parseStatusFieldKb(line, "RssFile", rss_file_kb))
rss_file_bytes = rss_file_kb * 1024;
if (parseStatusFieldKb(line, "VmSize", vm_size_kb))
cur_virt_size = vm_size_kb * 1024;
if (parseStatusFieldInt(line, "Threads", threads))
cur_proc_num_threads = threads;
}

Int64 page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages
resident_set = rss * page_size_kb;
return true;
}

ProcessMemoryUsage get_process_mem_usage()
{
double resident_set;
UInt64 raw_rss = 0;
UInt64 rss_file = 0;
Int64 cur_proc_num_threads = 1;
UInt64 cur_virt_size = 0;
process_mem_usage(resident_set, cur_proc_num_threads, cur_virt_size);
resident_set *= 1024; // transfrom from KB to bytes
process_mem_usage(raw_rss, rss_file, cur_proc_num_threads, cur_virt_size);

return ProcessMemoryUsage{
static_cast<size_t>(resident_set),
raw_rss,
rss_file,
cur_virt_size,
cur_proc_num_threads,
};
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/MemoryAllocTrace.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ std::tuple<uint64_t *, uint64_t *> getAllocDeallocPtr();
struct ProcessMemoryUsage
{
UInt64 resident_bytes;
UInt64 rss_file_bytes;
UInt64 cur_virt_bytes;
Int64 cur_proc_num_threads;
};
Expand Down
57 changes: 48 additions & 9 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,40 @@ extern const Metric MemoryTrackingSharedColumnData;
extern const Metric MemoryTrackingKVStore;
} // namespace CurrentMetrics

std::atomic<Int64> real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
std::atomic<Int64> real_rss{0}, rss_file{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
std::atomic<UInt64> proc_virt_size{0};
std::atomic<bool> exclude_rss_file_from_memory_control{false};

void setExcludeRssFileFromMemoryControl(bool value)
{
exclude_rss_file_from_memory_control.store(value, std::memory_order_relaxed);
}

bool getExcludeRssFileFromMemoryControl()
{
return exclude_rss_file_from_memory_control.load(std::memory_order_relaxed);
}

struct MemoryControlRssInfo
{
Int64 real_rss;
Int64 rss_file;
Int64 memory_control_rss;
};

static MemoryControlRssInfo getMemoryControlRss()
{
const Int64 current_real_rss = real_rss.load(std::memory_order_relaxed);
const Int64 current_rss_file = rss_file.load(std::memory_order_relaxed);
if (!getExcludeRssFileFromMemoryControl())
return {current_real_rss, current_rss_file, current_real_rss};
return {
current_real_rss,
current_rss_file,
current_real_rss > current_rss_file ? current_real_rss - current_rss_file : 0,
};
}

MemoryTracker::~MemoryTracker()
{
// Destruction of global root mem tracker means the process is shutting down, log and metrics models may have been released!
Expand Down Expand Up @@ -133,9 +165,10 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
{
Int64 current_limit = limit.load(std::memory_order_relaxed);
Int64 current_accuracy_diff_for_test = accuracy_diff_for_test.load(std::memory_order_relaxed);
const auto rss_info = getMemoryControlRss();
if (unlikely(
!next.load(std::memory_order_relaxed) && current_accuracy_diff_for_test && current_limit
&& real_rss > current_accuracy_diff_for_test + current_limit))
&& rss_info.memory_control_rss > current_accuracy_diff_for_test + current_limit))
{
DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory tracker accuracy ");
Expand All @@ -144,10 +177,13 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
fmt_buf.fmtAppend(" {}", tmp_decr);

fmt_buf.fmtAppend(
": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, "
": fault injected. memory_control_rss ({}) is much larger than limit ({}). Debug info, "
"real_rss: {}, rss_file: {}, threads of process: {}, "
"memory usage tracked by ProcessList: peak {}, current {}. Virtual memory size: {}.",
formatReadableSizeWithBinarySuffix(real_rss),
formatReadableSizeWithBinarySuffix(rss_info.memory_control_rss),
formatReadableSizeWithBinarySuffix(current_limit),
formatReadableSizeWithBinarySuffix(rss_info.real_rss),
formatReadableSizeWithBinarySuffix(rss_info.rss_file),
proc_num_threads.load(),
(root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->peak)
: "0"),
Expand Down Expand Up @@ -181,7 +217,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed);
bool is_rss_too_large
= (!next.load(std::memory_order_relaxed) && current_limit
&& real_rss > current_limit + current_bytes_rss_larger_than_limit
&& rss_info.memory_control_rss > current_limit + current_bytes_rss_larger_than_limit
&& will_be > baseline_of_query_mem_tracker);
if (is_rss_too_large || unlikely(current_limit && will_be > current_limit))
{
Expand All @@ -207,11 +243,14 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
else
{ // RSS too large
fmt_buf.fmtAppend(
" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would "
"be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}.",
formatReadableSizeWithBinarySuffix(real_rss),
" exceeded caused by 'memory_control_rss much larger than limit' : memory_control_rss would "
"be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}. "
"real_rss={}, rss_file={}.",
formatReadableSizeWithBinarySuffix(rss_info.memory_control_rss),
size,
formatReadableSizeWithBinarySuffix(current_limit));
formatReadableSizeWithBinarySuffix(current_limit),
formatReadableSizeWithBinarySuffix(rss_info.real_rss),
formatReadableSizeWithBinarySuffix(rss_info.rss_file));
}

fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail());
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
#include <atomic>
#include <boost/noncopyable.hpp>

extern std::atomic<Int64> real_rss, proc_num_threads, baseline_of_query_mem_tracker;
extern std::atomic<Int64> real_rss, rss_file, proc_num_threads, baseline_of_query_mem_tracker;
extern std::atomic<UInt64> proc_virt_size;

void setExcludeRssFileFromMemoryControl(bool value);
bool getExcludeRssFileFromMemoryControl();
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
Expand Down
21 changes: 18 additions & 3 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/MemoryAllocTrace.h>
#include <Common/MemoryTracker.h>
#include <Common/RedactHelpers.h>
#include <Common/SpillLimiter.h>
#include <Common/StringUtils/StringUtils.h>
Expand Down Expand Up @@ -1274,6 +1275,22 @@ try
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

// TiCI reader uses mmap heavily; file-backed RSS (RssFile) should not trigger memory control.
// Effective value is true only when TiCI reader is enabled AND
// `tici.exclude-rss-file-from-memory-control` is true (default: true).
// If reader is disabled, this stays false regardless of the config item.
const auto tici_reader_addr = config().getString("tici.reader-node.addr", "");
const auto tici_reader_port = config().getInt("tici.reader-node.port", 0);
const bool tici_reader_enabled = !tici_reader_addr.empty() || tici_reader_port > 0;
const bool exclude_rss_file_from_memory_control
= tici_reader_enabled && config().getBool("tici.exclude-rss-file-from-memory-control", true);
setExcludeRssFileFromMemoryControl(exclude_rss_file_from_memory_control);
LOG_INFO(
log,
"TiCI memory control config: reader_enabled={} exclude_rss_file_from_memory_control={}",
tici_reader_enabled,
exclude_rss_file_from_memory_control);

/// startup flash service for handling coprocessor and MPP requests.
FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log);

Expand All @@ -1289,9 +1306,7 @@ try

proxy_machine.runKVStore(tmt_context);

auto tici_reader_addr = config().getString("tici.reader-node.addr", "");
auto tici_reader_port = config().getInt("tici.reader-node.port", 0);
if (!tici_reader_addr.empty() || tici_reader_port > 0)
if (tici_reader_enabled)
{
Stopwatch watch;
auto service_addr = config().getString("flash.service_addr");
Expand Down