@@ -26,6 +26,7 @@ use common_pipeline_core::Pipeline;
2626use common_pipeline_core:: SourcePipeBuilder ;
2727use tracing:: info;
2828
29+ use crate :: fuse_part:: FusePartInfo ;
2930use crate :: io:: BlockReader ;
3031use crate :: operations:: read:: native_data_source_deserializer:: NativeDeserializeDataTransform ;
3132use crate :: operations:: read:: native_data_source_reader:: ReadNativeDataSource ;
@@ -41,7 +42,8 @@ pub fn build_fuse_native_source_pipeline(
4142 topk : Option < TopK > ,
4243 mut max_io_requests : usize ,
4344) -> Result < ( ) > {
44- ( max_threads, max_io_requests) = adjust_threads_and_request ( max_threads, max_io_requests, plan) ;
45+ ( max_threads, max_io_requests) =
46+ adjust_threads_and_request ( true , max_threads, max_io_requests, plan) ;
4547
4648 if topk. is_some ( ) {
4749 max_threads = max_threads. min ( 16 ) ;
@@ -122,7 +124,8 @@ pub fn build_fuse_parquet_source_pipeline(
122124 mut max_threads : usize ,
123125 mut max_io_requests : usize ,
124126) -> Result < ( ) > {
125- ( max_threads, max_io_requests) = adjust_threads_and_request ( max_threads, max_io_requests, plan) ;
127+ ( max_threads, max_io_requests) =
128+ adjust_threads_and_request ( false , max_threads, max_io_requests, plan) ;
126129
127130 let mut source_builder = SourcePipeBuilder :: create ( ) ;
128131
@@ -220,12 +223,38 @@ pub fn dispatch_partitions(
220223}
221224
222225pub fn adjust_threads_and_request (
226+ is_native : bool ,
223227 mut max_threads : usize ,
224228 mut max_io_requests : usize ,
225229 plan : & DataSourcePlan ,
226230) -> ( usize , usize ) {
227231 if !plan. parts . is_lazy {
228- let block_nums = plan. parts . partitions . len ( ) . max ( 1 ) ;
232+ let mut block_nums = plan. parts . partitions . len ( ) ;
233+
234+ // If the read bytes of a partition is small enough, less than 16k rows
235+ // we will not use an extra heavy thread to process it.
236+ // now only works for native reader
237+ static MIN_ROWS_READ_PER_THREAD : u64 = 16 * 1024 ;
238+ if is_native {
239+ plan. parts . partitions . iter ( ) . for_each ( |part| {
240+ if let Some ( part) = part. as_any ( ) . downcast_ref :: < FusePartInfo > ( ) {
241+ let to_read_rows = part
242+ . columns_meta
243+ . values ( )
244+ . map ( |meta| meta. read_rows ( & part. range ) )
245+ . find ( |rows| * rows > 0 )
246+ . unwrap_or ( part. nums_rows as u64 ) ;
247+
248+ if to_read_rows < MIN_ROWS_READ_PER_THREAD {
249+ block_nums -= 1 ;
250+ }
251+ }
252+ } ) ;
253+ }
254+
255+ // At least max(1/8 of the original parts, 1), in case of too many small partitions but io threads is just one.
256+ block_nums = std:: cmp:: max ( block_nums, plan. parts . partitions . len ( ) / 8 ) ;
257+ block_nums = std:: cmp:: max ( block_nums, 1 ) ;
229258
230259 max_threads = std:: cmp:: min ( max_threads, block_nums) ;
231260 max_io_requests = std:: cmp:: min ( max_io_requests, block_nums) ;
0 commit comments