Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
CONF_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
CONF_mInt64(load_error_log_reserve_hours, "48");
CONF_Int32(number_tablet_writer_threads, "16");
CONF_Int32(number_slave_replica_download_threads, "64");

// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
CONF_Int32(brpc_heavy_work_pool_threads, "192");
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
CONF_Int32(brpc_light_work_pool_threads, "64");
CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");

// The maximum amount of data that can be processed by a stream load
CONF_mInt64(streaming_load_max_mb, "10240");
Expand Down
1,037 changes: 640 additions & 397 deletions be/src/service/internal_service.cpp

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,13 @@ class PInternalServiceImpl : public PBackendService {

private:
ExecEnv* _exec_env;
PriorityThreadPool _tablet_worker_pool;
PriorityThreadPool _slave_replica_worker_pool;

// every brpc service request should put into thread pool
// the reason see issue #16634
// define the interface for reading and writing data as heavy interface
// otherwise as light interface
PriorityThreadPool _heavy_work_pool;
PriorityThreadPool _light_work_pool;
};

} // namespace doris
12 changes: 12 additions & 0 deletions be/src/util/blocking_priority_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ class BlockingPriorityQueue {
return true;
}

// Return false if queue full or has been shutdown.
bool try_put(const T& val) {
std::unique_lock<std::mutex> unique_lock(_lock);
if (_queue.size() < _max_element && !_shutdown) {
_queue.push(val);
unique_lock.unlock();
_get_cv.notify_one();
return true;
}
return false;
}

// Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put.
void shutdown() {
{
Expand Down
10 changes: 10 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ class DorisMetrics {
IntCounter* upload_rowset_count;
IntCounter* upload_fail_count;

UIntGauge* light_work_pool_queue_size;
UIntGauge* heavy_work_pool_queue_size;
UIntGauge* heavy_work_active_threads;
UIntGauge* light_work_active_threads;

UIntGauge* heavy_work_pool_max_queue_size;
UIntGauge* light_work_pool_max_queue_size;
UIntGauge* heavy_work_max_threads;
UIntGauge* light_work_max_threads;

static DorisMetrics* instance() {
static DorisMetrics instance;
return &instance;
Expand Down
12 changes: 11 additions & 1 deletion be/src/util/priority_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class PriorityThreadPool {
// queue exceeds this size, subsequent calls to Offer will block until there is
// capacity available.
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name)
: _work_queue(queue_size), _shutdown(false), _name(name) {
: _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
for (int i = 0; i < num_threads; ++i) {
_threads.create_thread(
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
Expand Down Expand Up @@ -86,6 +86,13 @@ class PriorityThreadPool {
return _work_queue.blocking_put(task);
}

virtual bool try_offer(WorkFunction func) {
PriorityThreadPool::Task task = {0, func, 0};
return _work_queue.try_put(task);
}

virtual uint32_t get_active_threads() const { return _active_threads; }

// Shuts the thread pool down, causing the work queue to cease accepting offered work
// and the worker threads to terminate once they have processed their current work item.
// Returns once the shutdown flag has been set, does not wait for the threads to
Expand Down Expand Up @@ -135,7 +142,9 @@ class PriorityThreadPool {
while (!is_shutdown()) {
Task task;
if (_work_queue.blocking_get(&task)) {
_active_threads++;
task.work_function();
_active_threads--;
}
if (_work_queue.get_size() == 0) {
_empty_cv.notify_all();
Expand All @@ -150,6 +159,7 @@ class PriorityThreadPool {
// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;
std::string _name;
std::atomic<int> _active_threads;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ curl http://be_host:webserver_port/metrics?type=json
|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
|`doris_be_all_rowsets_num`| | Num | 当前所有 rowset 的个数 | | P0 |
|`doris_be_all_segments_num`| | Num | 当前所有 segment 的个数 | | P0 |
|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 |
|`doris_be_heavy_work_pool_queue_size`| | Num | brpc heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
|`doris_be_light_work_pool_queue_size`| | Num | brpc light线程池队列最大长度,超过则阻塞提交work| | p0 |
|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |

### 机器监控

Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ enum PCacheStatus {
INVALID_KEY_RANGE = 6;
DATA_OVERDUE = 7;
EMPTY_DATA = 8;
CANCELED = 9;
};

enum CacheType {
Expand Down