Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore(query): update
  • Loading branch information
sundy-li committed Jan 16, 2023
commit 372955196845f87d3b40b933764382f68d9a9585
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub use read::SegmentInfoReader;
pub use read::SnapshotHistoryReader;
pub use read::TableSnapshotReader;
pub use read::UncompressedBuffer;
pub use read::NativeReaderExt;

pub use segments::try_join_futures;
pub use segments::try_join_futures_with_vec;
pub use segments::SegmentsIO;
Expand Down
23 changes: 13 additions & 10 deletions src/query/storages/fuse/src/io/read/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataField;
use common_expression::DataSchema;
use common_expression::TableField;
use common_expression::TableSchemaRef;
use common_expression::types::DataType;
use common_storage::ColumnLeaf;
use common_storage::ColumnLeaves;
use futures::future::try_join_all;
Expand All @@ -48,6 +50,7 @@ pub struct BlockReader {
pub(crate) projection: Projection,
pub(crate) projected_schema: TableSchemaRef,
pub(crate) column_leaves: ColumnLeaves,
pub(crate) project_indices: BTreeMap<usize, (Field, DataType)>,
pub(crate) parquet_schema_descriptor: SchemaDescriptor,
}

Expand Down Expand Up @@ -117,7 +120,6 @@ impl BlockReader {
schema: TableSchemaRef,
projection: Projection,
) -> Result<Arc<BlockReader>> {
println!("projection -> {:?}", projection);
let projected_schema = match projection {
Projection::Columns(ref indices) => TableSchemaRef::new(schema.project(indices)),
Projection::InnerColumns(ref path_indices) => {
Expand All @@ -128,13 +130,16 @@ impl BlockReader {
let arrow_schema = schema.to_arrow();
let parquet_schema_descriptor = to_parquet_schema(&arrow_schema)?;
let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema);

let project_column_leaves: Vec<ColumnLeaf> = projection.project_column_leaves(&column_leaves)?.iter().map(|c| (*c).clone()).collect();
let project_indices = Self::build_projection_indices(&project_column_leaves);

Ok(Arc::new(BlockReader {
operator,
projection,
projected_schema,
parquet_schema_descriptor,
column_leaves,
project_indices,
}))
}

Expand Down Expand Up @@ -279,11 +284,9 @@ impl BlockReader {
metrics_inc_remote_io_read_parts(1);
}

let columns = self.projection.project_column_leaves(&self.column_leaves)?;
let indices = Self::build_projection_indices(&columns);

let mut ranges = vec![];
for index in indices.keys() {
for index in self.project_indices.keys() {
let column_meta = &columns_meta[index];
let (offset, len) = column_meta.offset_length();
ranges.push((*index, offset..(offset + len)));
Expand All @@ -306,11 +309,9 @@ impl BlockReader {
part: PartInfoPtr,
) -> Result<MergeIOReadResult> {
let part = FusePartInfo::from_part(&part)?;
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
let indices = Self::build_projection_indices(&columns);

let mut ranges = vec![];
for index in indices.keys() {
for index in self.project_indices.keys() {
let column_meta = &part.columns_meta[index];
let (offset, len) = column_meta.offset_length();
ranges.push((*index, offset..(offset + len)));
Expand All @@ -321,11 +322,13 @@ impl BlockReader {
}

// Build non duplicate leaf_ids to avoid repeated read column from parquet
pub(crate) fn build_projection_indices(columns: &Vec<&ColumnLeaf>) -> BTreeMap<usize, Field> {
pub(crate) fn build_projection_indices(columns: &[ColumnLeaf]) -> BTreeMap<usize, (Field, DataType)> {
let mut indices = BTreeMap::new();
for column in columns {
for index in &column.leaf_ids {
indices.insert(*index, column.field.clone());
let f: TableField = (&column.field).into();
let data_type: DataType = f.data_type().into();
indices.insert(*index, (column.field.clone(), data_type));
}
}
indices
Expand Down
67 changes: 34 additions & 33 deletions src/query/storages/fuse/src/io/read/block_reader_native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use std::io::BufReader;
use std::io::Seek;
use std::time::Instant;

use common_arrow::arrow::array::Array;
use common_arrow::arrow::chunk::Chunk;

use common_arrow::native::read::reader::NativeReader;
use common_arrow::native::read::NativeReadBuf;
use common_catalog::plan::PartInfoPtr;
use common_exception::Result;
use common_expression::BlockEntry;
use common_expression::Column;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::TableField;

use common_expression::Value;
use opendal::Object;
use storages_common_table_meta::meta::ColumnMeta;

Expand All @@ -37,7 +40,10 @@ use crate::metrics::metrics_inc_remote_io_seeks;

// Native storage format

pub type Reader = Box<dyn NativeReadBuf + Send + Sync>;
pub trait NativeReaderExt: NativeReadBuf + std::io::Seek + Send + Sync {}
impl<T: NativeReadBuf + std::io::Seek + Send + Sync> NativeReaderExt for T {}

pub type Reader = Box<dyn NativeReaderExt>;

impl BlockReader {
pub async fn async_read_native_columns_data(
Expand All @@ -50,15 +56,13 @@ impl BlockReader {
}

let part = FusePartInfo::from_part(&part)?;
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
let indices = Self::build_projection_indices(&columns);
let mut join_handlers = Vec::with_capacity(indices.len());
let mut join_handlers = Vec::with_capacity(self.project_indices.len());

for (index, field) in indices {
for (index, (field, _)) in self.project_indices.iter() {
let column_meta = &part.columns_meta[&index];
join_handlers.push(Self::read_native_columns_data(
self.operator.object(&part.location),
index,
*index,
column_meta,
field.data_type().clone(),
));
Expand Down Expand Up @@ -109,19 +113,17 @@ impl BlockReader {
) -> Result<Vec<(usize, NativeReader<Reader>)>> {
let part = FusePartInfo::from_part(&part)?;

let columns = self.projection.project_column_leaves(&self.column_leaves)?;
let indices = Self::build_projection_indices(&columns);
let mut results = Vec::with_capacity(indices.len());
let mut results = Vec::with_capacity(self.project_indices.len());

for (index, field) in indices {
for (index, (field, _)) in self.project_indices.iter() {
let column_meta = &part.columns_meta[&index];

let op = self.operator.clone();

let location = part.location.clone();
let result = Self::sync_read_native_column(
op.object(&location),
index,
*index,
column_meta,
field.data_type().clone(),
);
Expand All @@ -138,8 +140,13 @@ impl BlockReader {
meta: &ColumnMeta,
data_type: common_arrow::arrow::datatypes::DataType,
) -> Result<(usize, NativeReader<Reader>)> {
let (offset, length) = meta.offset_length();
let reader = o.blocking_range_reader(offset..offset + length)?;
let (offset, _) = meta.offset_length();

// let reader = o.blocking_range_reader(offset..offset + length)?;
let path = format!("/home/sundy/work/databend/_data/{}", o.path());
let mut reader = std::fs::File::open(&path).unwrap();
reader.seek(std::io::SeekFrom::Start(offset)).unwrap();

let reader: Reader = Box::new(BufReader::new(reader));

let page_metas = meta.as_native().unwrap().pages.clone();
Expand All @@ -148,25 +155,19 @@ impl BlockReader {
}

pub fn build_block(&self, chunks: Vec<(usize, Box<dyn Array>)>) -> Result<DataBlock> {
let mut results = Vec::with_capacity(chunks.len());
let mut chunk_map: HashMap<usize, Box<dyn Array>> = chunks.into_iter().collect();
let columns = self.projection.project_column_leaves(&self.column_leaves)?;
let mut data_types = Vec::with_capacity(chunk_map.len());

let mut entries = Vec::with_capacity(chunks.len());
// they are already the leaf columns without inner
// TODO support tuple in native storage
for column in &columns {
let indices = &column.leaf_ids;
for index in indices {
if let Some(array) = chunk_map.remove(index) {
let data_field: TableField = (&column.field).into();
results.push(array);
data_types.push(data_field.data_type().into());
break;
}
let mut rows = 0;
for (id, (_, f)) in self.project_indices.iter() {
if let Some(array) = chunks.iter().find(|c| c.0 == *id).map(|c| c.1.clone()) {
entries.push(BlockEntry {
data_type: f.clone(),
value: Value::Column(Column::from_arrow(array.as_ref(), f)),
});
rows = array.len();
}
}
let chunk = Chunk::new(results);
DataBlock::from_arrow_chunk_with_types(&chunk, &data_types)
Ok(DataBlock::new(entries, rows))
}
}
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod read_settings;
mod snapshot_history_reader;
mod versioned_reader;

pub use block_reader_native::NativeReaderExt;
pub use block_reader::BlockReader;
pub use block_reader::MergeIOReadResult;
pub use bloom_index_reader::load_bloom_filter_by_columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use std::fmt::Debug;
use std::fmt::Formatter;

use common_arrow::native::read::reader::NativeReader;
use common_arrow::native::read::NativeReadBuf;
use common_catalog::plan::PartInfoPtr;
use common_expression::BlockMetaInfo;
use common_expression::BlockMetaInfoPtr;
use serde::Deserializer;
use serde::Serializer;

pub type DataChunks = Vec<(usize, NativeReader<Box<dyn NativeReadBuf + Send + Sync>>)>;
use crate::io::NativeReaderExt;

pub type DataChunks = Vec<(usize, NativeReader<Box<dyn NativeReaderExt>>)>;

pub struct NativeDataSourceMeta {
pub part: Vec<PartInfoPtr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ use common_catalog::plan::PushDownInfo;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ConstantFolder;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::Evaluator;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_expression::Value;
use common_functions::scalars::BUILTIN_FUNCTIONS;
use common_pipeline_core::processors::port::InputPort;
Expand All @@ -41,7 +43,7 @@ use crate::operations::read::native_data_source::DataChunks;
use crate::operations::read::native_data_source::NativeDataSourceMeta;

pub struct NativeDeserializeDataTransform {
ctx: Arc<dyn TableContext>,
func_ctx: FunctionContext,
scan_progress: Arc<Progress>,
block_reader: Arc<BlockReader>,

Expand All @@ -57,6 +59,8 @@ pub struct NativeDeserializeDataTransform {
src_schema: DataSchema,
output_schema: DataSchema,
prewhere_filter: Arc<Option<Expr>>,

prewhere_skipped: usize,
}

impl NativeDeserializeDataTransform {
Expand Down Expand Up @@ -99,13 +103,14 @@ impl NativeDeserializeDataTransform {
(&projected).into()
}
};


let func_ctx = ctx.try_get_function_context()?;
let prewhere_schema = src_schema.project(&prewhere_columns);
let prewhere_filter = Self::build_prewhere_filter_expr(plan, &prewhere_schema)?;

let prewhere_filter = Self::build_prewhere_filter_expr(plan, func_ctx.clone(), &prewhere_schema)?;
Ok(ProcessorPtr::create(Box::new(
NativeDeserializeDataTransform {
ctx,
func_ctx,
scan_progress,
block_reader,
input,
Expand All @@ -119,19 +124,23 @@ impl NativeDeserializeDataTransform {
src_schema,
output_schema,
prewhere_filter,
prewhere_skipped:0,
},
)))
}

fn build_prewhere_filter_expr(
plan: &DataSourcePlan,
ctx: FunctionContext,
schema: &DataSchema,
) -> Result<Arc<Option<Expr>>> {
Ok(
match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) {
None => Arc::new(None),
Some(v) => Arc::new(v.filter.as_expr(&BUILTIN_FUNCTIONS).map(|expr| {
expr.project_column_ref(|name| schema.column_with_name(name).unwrap().0)
let expr = expr.project_column_ref(|name| schema.column_with_name(name).unwrap().0);
let (expr, _ ) = ConstantFolder::fold(&expr, ctx, &BUILTIN_FUNCTIONS);
expr
})),
},
)
Expand Down Expand Up @@ -188,6 +197,7 @@ impl Processor for NativeDeserializeDataTransform {
}

if self.input.is_finished() {
metrics_inc_pruning_prewhere_nums(self.prewhere_skipped as u64);
self.output.finish();
return Ok(Event::Finished);
}
Expand Down Expand Up @@ -215,7 +225,7 @@ impl Processor for NativeDeserializeDataTransform {
let prewhere_block = self.block_reader.build_block(arrays.clone())?;
let evaluator = Evaluator::new(
&prewhere_block,
self.ctx.try_get_function_context()?,
self.func_ctx.clone(),
&BUILTIN_FUNCTIONS,
);
let result = evaluator.run(filter).map_err(|(_, e)| {
Expand All @@ -229,7 +239,7 @@ impl Processor for NativeDeserializeDataTransform {
};

if all_filtered {
metrics_inc_pruning_prewhere_nums(1);
self.prewhere_skipped += 1;
for index in self.remain_columns.iter() {
let chunk = chunks.get_mut(*index).unwrap();
chunk.1.skip_page()?;
Expand Down