diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 0b107d9a139545..3a77a3cc29a9de 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -54,7 +54,7 @@ bool ScanOperator::can_read() { return true; } else { if (_node->_scanner_ctx->get_num_running_scanners() == 0 && - _node->_scanner_ctx->has_enough_space_in_blocks_queue()) { + _node->_scanner_ctx->should_be_scheduled()) { _node->_scanner_ctx->reschedule_scanner_ctx(); } return _node->ready_to_read(); // there are some blocks to process diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 9d94aaf9517db5..66d87579551946 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -98,8 +98,7 @@ struct DataReadyDependency : public Dependency { void* shared_state() override { return nullptr; } [[nodiscard]] Dependency* read_blocked_by() override { - if (_scanner_ctx->get_num_running_scanners() == 0 && - _scanner_ctx->has_enough_space_in_blocks_queue()) { + if (_scanner_ctx->get_num_running_scanners() == 0 && _scanner_ctx->should_be_scheduled()) { _scanner_ctx->reschedule_scanner_ctx(); } return _ready_for_read ? nullptr : this; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 2f445438a0d9e0..7d795343290316 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -272,6 +272,9 @@ Status NewOlapScanner::open(RuntimeState* state) { return Status::InternalError(ss.str()); } + // Do not hold rs_splits any more to release memory. + _tablet_reader_params.rs_splits.clear(); + return Status::OK(); } @@ -530,7 +533,8 @@ Status NewOlapScanner::close(RuntimeState* state) { // so that it will core _tablet_reader_params.rs_splits.clear(); _tablet_reader.reset(); - + auto tablet_id = _scan_range.tablet_id; + LOG(INFO) << "close_tablet_id" << tablet_id; RETURN_IF_ERROR(VScanner::close(state)); return Status::OK(); } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 09ec0e3a5531f3..2022ecb29e6606 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -185,10 +185,6 @@ class PipScannerContext : public vectorized::ScannerContext { _free_blocks_memory_usage->add(free_blocks_memory_usage); } - bool has_enough_space_in_blocks_queue() const override { - return _current_used_bytes < _max_bytes_in_queue / 2 * _num_parallel_instances; - } - void _dispose_coloate_blocks_not_in_queue() override { if (_need_colocate_distribute) { for (int i = 0; i < _num_parallel_instances; ++i) { @@ -257,8 +253,7 @@ class PipScannerContext : public vectorized::ScannerContext { if (_data_dependency) { _data_dependency->set_ready_for_read(); } - bool get_block_not_empty = true; - _colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty); + _colocate_blocks[loc] = get_free_block(); _colocate_mutable_blocks[loc]->set_muatable_columns( _colocate_blocks[loc]->mutate_columns()); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 80c5ef1a09ba70..77b1a599053116 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -55,7 +55,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V _process_status(Status::OK()), _batch_size(state_->batch_size()), limit(limit_), - _max_bytes_in_queue(max_bytes_in_blocks_queue_), + _max_bytes_in_queue(max_bytes_in_blocks_queue_ * num_parallel_instances), _scanner_scheduler(state_->exec_env()->scanner_scheduler()), _scanners(scanners_), _num_parallel_instances(num_parallel_instances) { @@ -66,27 +66,21 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V if (limit < 0) { limit = -1; } -} - -// After init function call, should not access _parent -Status ScannerContext::init() { - // 1. Calculate max concurrency - // TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4 - // should find a more reasonable value. _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4; - if (_parent && _parent->_shared_scan_opt) { - DCHECK(_num_parallel_instances > 0); - _max_thread_num *= _num_parallel_instances; - } + _max_thread_num *= num_parallel_instances; _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; DCHECK(_max_thread_num > 0); _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size()); + // 1. Calculate max concurrency // For select * from table limit 10; should just use one thread. if ((_parent && _parent->should_run_serial()) || (_local_state && _local_state->should_run_serial())) { _max_thread_num = 1; } +} +// After init function call, should not access _parent +Status ScannerContext::init() { if (_parent) { _scanner_profile = _parent->_scanner_profile; _scanner_sched_counter = _parent->_scanner_sched_counter; @@ -120,6 +114,9 @@ Status ScannerContext::init() { limit == -1 ? _batch_size : std::min(static_cast(_batch_size), limit); _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; _free_blocks_capacity = _max_thread_num * _block_per_scanner; + auto block = get_free_block(); + _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); + return_free_block(std::move(block)); #ifndef BE_TEST // 3. get thread token @@ -151,27 +148,33 @@ std::string ScannerContext::parent_name() { return _parent ? _parent->get_name() : _local_state->get_name(); } -vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, - bool get_block_not_empty) { +vectorized::BlockUPtr ScannerContext::get_free_block() { vectorized::BlockUPtr block; if (_free_blocks.try_dequeue(block)) { - if (!get_block_not_empty || block->mem_reuse()) { - _free_blocks_capacity--; - _free_blocks_memory_usage->add(-block->allocated_bytes()); - return block; - } + DCHECK(block->mem_reuse()); + _free_blocks_memory_usage->add(-block->allocated_bytes()); + _serving_blocks_num++; + return block; } + block = vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, + true /*ignore invalid slots*/); COUNTER_UPDATE(_newly_create_free_blocks_num, 1); - return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, - true /*ignore invalid slots*/); + + _serving_blocks_num++; + return block; } void ScannerContext::return_free_block(std::unique_ptr block) { - block->clear_column_data(); - _free_blocks_memory_usage->add(block->allocated_bytes()); - _free_blocks.enqueue(std::move(block)); - ++_free_blocks_capacity; + _serving_blocks_num--; + if (block->mem_reuse()) { + // Only put blocks with schema to free blocks, because colocate blocks + // need schema. + _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); + block->clear_column_data(); + _free_blocks_memory_usage->add(block->allocated_bytes()); + _free_blocks.enqueue(std::move(block)); + } } void ScannerContext::append_blocks_to_queue(std::vector& blocks) { @@ -200,7 +203,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // (if the scheduler continues to schedule, it will cause a lot of busy running). // At this point, consumers are required to trigger new scheduling to ensure that // data can be continuously fetched. - if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) { + if (should_be_scheduled() && _num_running_scanners == 0) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; @@ -208,6 +211,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo set_status_on_error(state, false); } } + // Wait for block from queue if (wait) { // scanner batch wait time @@ -231,6 +235,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo _blocks_queue.pop_front(); auto block_bytes = (*block)->allocated_bytes(); _cur_bytes_in_queue -= block_bytes; + _queued_blocks_memory_usage->add(-block_bytes); return Status::OK(); } else { @@ -381,7 +386,13 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _scanners.push_front(scanner); } std::lock_guard l(_transfer_lock); - if (has_enough_space_in_blocks_queue()) { + + // In pipeline engine, doris will close scanners when `no_schedule`. + // We have to decrease _num_running_scanners before schedule, otherwise + // schedule does not woring due to _num_running_scanners. + _num_running_scanners--; + + if (should_be_scheduled()) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; @@ -404,8 +415,6 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { } _blocks_queue_added_cv.notify_one(); } - // In pipeline engine, doris will close scanners when `no_schedule`. - _num_running_scanners--; _ctx_finish_cv.notify_one(); } @@ -415,7 +424,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current { // If there are enough space in blocks queue, // the scanner number depends on the _free_blocks numbers - thread_slot_num = cal_thread_slot_num_by_free_block_num(); + thread_slot_num = get_available_thread_slot_num(); } // 2. get #thread_slot_num scanners from ctx->scanners diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index b4d89dcb62a528..a27833abe67528 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -72,13 +72,13 @@ class ScannerContext { ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* output_tuple_desc, const std::list& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0, + int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 1, pipeline::ScanLocalStateBase* local_state = nullptr); virtual ~ScannerContext() = default; virtual Status init(); - vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false); + vectorized::BlockUPtr get_free_block(); void return_free_block(std::unique_ptr block); // Append blocks from scanners to the blocks queue. @@ -146,19 +146,25 @@ class ScannerContext { virtual bool empty_in_queue(int id); // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan - virtual inline bool has_enough_space_in_blocks_queue() const { - return _cur_bytes_in_queue < _max_bytes_in_queue / 2; + inline bool should_be_scheduled() const { + return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && + (_serving_blocks_num < allowed_blocks_num()); } - int cal_thread_slot_num_by_free_block_num() { + int get_available_thread_slot_num() { int thread_slot_num = 0; - thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner; + thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / _block_per_scanner; thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); - if (thread_slot_num <= 0) { - thread_slot_num = 1; - } return thread_slot_num; } + + int32_t allowed_blocks_num() const { + int32_t blocks_num = std::min(_free_blocks_capacity, + int32_t((_max_bytes_in_queue + _estimated_block_bytes - 1) / + _estimated_block_bytes)); + return blocks_num; + } + taskgroup::TaskGroup* get_task_group() const; void reschedule_scanner_ctx(); @@ -216,10 +222,12 @@ class ScannerContext { // Lazy-allocated blocks for all scanners to share, for memory reuse. moodycamel::ConcurrentQueue _free_blocks; + std::atomic _serving_blocks_num = 0; // The current number of free blocks available to the scanners. // Used to limit the memory usage of the scanner. // NOTE: this is NOT the size of `_free_blocks`. - std::atomic_int32_t _free_blocks_capacity = 0; + int32_t _free_blocks_capacity = 0; + int64_t _estimated_block_bytes = 0; int _batch_size; // The limit from SQL's limit clause @@ -244,6 +252,7 @@ class ScannerContext { int64_t _cur_bytes_in_queue = 0; // The max limit bytes of blocks in blocks queue const int64_t _max_bytes_in_queue; + std::atomic _bytes_allocated = 0; doris::vectorized::ScannerScheduler* _scanner_scheduler; // List "scanners" saves all "unfinished" scanners. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index ac9c13c0020178..3f2d591cef27c9 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -353,7 +353,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; int64_t raw_bytes_read = 0; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; - bool has_free_block = true; int num_rows_in_block = 0; // Only set to true when ctx->done() return true. @@ -363,9 +362,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext bool should_stop = false; // Has to wait at least one full block, or it will cause a lot of schedule task in priority // queue, it will affect query latency and query concurrency for example ssb 3.3. - while (!eos && raw_bytes_read < raw_bytes_threshold && - ((raw_rows_read < raw_rows_threshold && has_free_block) || - num_rows_in_block < state->batch_size())) { + while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold && + num_rows_in_block < state->batch_size()) { // TODO llj task group should should_yield? if (UNLIKELY(ctx->done())) { // No need to set status on error here. @@ -374,7 +372,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext break; } - BlockUPtr block = ctx->get_free_block(&has_free_block); + BlockUPtr block = ctx->get_free_block(); + status = scanner->get_block(state, block.get(), &eos); VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos; // The VFileScanner for external table may try to open not exist files, @@ -390,12 +389,11 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext if (status.is()) { // The only case in this "if" branch is external table file delete and fe cache has not been updated yet. // Set status to OK. - LOG(INFO) << "scan range not found: " << scanner->get_current_scan_range_name(); status = Status::OK(); eos = true; } - raw_bytes_read += block->bytes(); + raw_bytes_read += block->allocated_bytes(); num_rows_in_block += block->rows(); if (UNLIKELY(block->rows() == 0)) { ctx->return_free_block(std::move(block)); @@ -430,7 +428,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext if (eos || should_stop) { scanner->mark_to_need_to_close(); } - ctx->push_back_scanner_and_reschedule(scanner); }