Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ rpath = false
large_futures = "warn"

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unused_imports = "deny"
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ path = "src/lib.rs"
avro = ["apache-avro"]
backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
force_hash_collisions = []

[dependencies]
ahash = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ impl ConfigField for TableOptions {
/// # Parameters
///
/// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
/// for CSV format.
/// for CSV format.
/// * `value`: The value to set for the specified configuration key.
///
/// # Returns
Expand Down
20 changes: 18 additions & 2 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

//! Functionality used both on logical and physical plans

#[cfg(not(feature = "force_hash_collisions"))]
use std::sync::Arc;

use ahash::RandomState;
use arrow::array::*;
use arrow::datatypes::*;
use arrow::row::Rows;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use arrow_buffer::IntervalDayTime;
use arrow_buffer::IntervalMonthDayNano;

#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
as_boolean_array, as_fixed_size_list_array, as_generic_binary_array,
as_large_list_array, as_list_array, as_map_array, as_primitive_array,
as_string_array, as_struct_array,
};
use crate::error::{Result, _internal_err};
use crate::error::Result;
#[cfg(not(feature = "force_hash_collisions"))]
use crate::error::_internal_err;

// Combines two hashes into one hash
#[inline]
Expand All @@ -41,6 +46,7 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
hash.wrapping_mul(37).wrapping_add(r)
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
if mul_col {
hashes_buffer.iter_mut().for_each(|hash| {
Expand Down Expand Up @@ -90,6 +96,7 @@ hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
/// Builds hash values of PrimitiveArray and writes them into `hashes_buffer`
/// If `rehash==true` this combines the previous hash value in the buffer
/// with the new hash using `combine_hashes`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array_primitive<T>(
array: &PrimitiveArray<T>,
random_state: &RandomState,
Expand Down Expand Up @@ -135,6 +142,7 @@ fn hash_array_primitive<T>(
/// Hashes one array into the `hashes_buffer`
/// If `rehash==true` this combines the previous hash value in the buffer
/// with the new hash using `combine_hashes`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array<T>(
array: T,
random_state: &RandomState,
Expand Down Expand Up @@ -180,6 +188,7 @@ fn hash_array<T>(
}

/// Hash the values in a dictionary array
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
array: &DictionaryArray<K>,
random_state: &RandomState,
Expand Down Expand Up @@ -210,6 +219,7 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_struct_array(
array: &StructArray,
random_state: &RandomState,
Expand Down Expand Up @@ -270,6 +280,7 @@ fn hash_map_array(
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_list_array<OffsetSize>(
array: &GenericListArray<OffsetSize>,
random_state: &RandomState,
Expand Down Expand Up @@ -303,6 +314,7 @@ where
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_fixed_list_array(
array: &FixedSizeListArray,
random_state: &RandomState,
Expand Down Expand Up @@ -488,7 +500,11 @@ pub fn create_row_hashes_v2<'a>(

#[cfg(test)]
mod tests {
use arrow::{array::*, datatypes::*};
use std::sync::Arc;

use arrow::array::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::datatypes::*;

use super::*;

Expand Down
8 changes: 4 additions & 4 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ macro_rules! handle_transform_recursion {
/// There are three categories of TreeNode APIs:
///
/// 1. "Inspecting" APIs to traverse a tree of `&TreeNodes`:
/// [`apply`], [`visit`], [`exists`].
/// [`apply`], [`visit`], [`exists`].
///
/// 2. "Transforming" APIs that traverse and consume a tree of `TreeNode`s
/// producing possibly changed `TreeNode`s: [`transform`], [`transform_up`],
/// [`transform_down`], [`transform_down_up`], and [`rewrite`].
/// producing possibly changed `TreeNode`s: [`transform`], [`transform_up`],
/// [`transform_down`], [`transform_down_up`], and [`rewrite`].
///
/// 3. Internal APIs used to implement the `TreeNode` API: [`apply_children`],
/// and [`map_children`].
/// and [`map_children`].
///
/// | Traversal Order | Inspecting | Transforming |
/// | --- | --- | --- |
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/utils/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{DataFusionError, Result};
/// # Parameters
/// - `num_elements`: The number of elements expected in the hash table.
/// - `fixed_size`: A fixed overhead size associated with the collection
/// (e.g., HashSet or HashTable).
/// (e.g., HashSet or HashTable).
/// - `T`: The type of elements stored in the hash table.
///
/// # Details
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
/// This function finds the longest prefix of the form 0, 1, 2, ... within the
/// collection `sequence`. Examples:
/// - For 0, 1, 2, 4, 5; we would produce 3, meaning 0, 1, 2 is the longest satisfying
/// prefix.
/// prefix.
/// - For 1, 2, 3, 4; we would produce 0, meaning there is no such prefix.
pub fn longest_consecutive_prefix<T: Borrow<usize>>(
sequence: impl IntoIterator<Item = T>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ default = [
]
encoding_expressions = ["datafusion-functions/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"]
math_expressions = ["datafusion-functions/math_expressions"]
parquet = ["datafusion-common/parquet", "dep:parquet"]
pyarrow = ["datafusion-common/pyarrow", "parquet"]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! 1. Creates a list of tuples (sorted if necessary)
//!
//! 2. Divides those tuples across some number of streams of [`RecordBatch`]
//! preserving any ordering
//! preserving any ordering
//!
//! 3. Times how long it takes for a given sort plan to process the input
//!
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ pub trait CatalogList: CatalogProviderList {}
/// Here are some examples of how to implement custom catalogs:
///
/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider
/// that treats files and directories on a filesystem as tables.
/// that treats files and directories on a filesystem as tables.
///
/// * The [`catalog.rs`]: a simple directory based catalog.
///
/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can
/// read from Delta Lake tables
/// read from Delta Lake tables
///
/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html
/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ impl Default for DataFrameWriteOptions {
/// The typical workflow using DataFrames looks like
///
/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`]
/// and [`read_parquet`].
/// and [`read_parquet`].
///
/// 2. Build a desired calculation by calling methods such as [`filter`],
/// [`select`], [`aggregate`], and [`limit`]
/// [`select`], [`aggregate`], and [`limit`]
///
/// 3. Execute into [`RecordBatch`]es by calling [`collect`]
///
/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`]
/// required for execution.
/// required for execution.
///
/// DataFrames are "lazy" in the sense that most methods do not actually compute
/// anything, they just build up a plan. Calling [`collect`] executes the plan
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use object_store::{ObjectMeta, ObjectStore};
/// This means that if this function returns true:
/// - the table provider can filter the table partition values with this expression
/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
/// was performed
/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| {
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,17 @@ impl ListingOptions {
///# Notes
///
/// - If only one level (e.g. `year` in the example above) is
/// specified, the other levels are ignored but the files are
/// still read.
/// specified, the other levels are ignored but the files are
/// still read.
///
/// - Files that don't follow this partitioning scheme will be
/// ignored.
/// ignored.
///
/// - Since the columns have the same value for all rows read from
/// each individual file (such as dates), they are typically
/// dictionary encoded for efficiency. You may use
/// [`wrap_partition_type_in_dict`] to request a
/// dictionary-encoded type.
/// each individual file (such as dates), they are typically
/// dictionary encoded for efficiency. You may use
/// [`wrap_partition_type_in_dict`] to request a
/// dictionary-encoded type.
///
/// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself.
///
Expand Down
40 changes: 20 additions & 20 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,32 +119,32 @@ pub use writer::plan_to_parquet;
/// Supports the following optimizations:
///
/// * Concurrent reads: Can read from one or more files in parallel as multiple
/// partitions, including concurrently reading multiple row groups from a single
/// file.
/// partitions, including concurrently reading multiple row groups from a single
/// file.
///
/// * Predicate push down: skips row groups and pages based on
/// min/max/null_counts in the row group metadata, the page index and bloom
/// filters.
/// min/max/null_counts in the row group metadata, the page index and bloom
/// filters.
///
/// * Projection pushdown: reads and decodes only the columns required.
///
/// * Limit pushdown: stop execution early after some number of rows are read.
///
/// * Custom readers: customize reading parquet files, e.g. to cache metadata,
/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
/// details.
/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
/// details.
///
/// * Schema adapters: read parquet files with different schemas into a unified
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
/// custom reader is used, it supplies the metadata directly and this parameter
/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more details.
/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
/// custom reader is used, it supplies the metadata directly and this parameter
/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more details.
///
/// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages
/// based on external information. See "Implementing External Indexes" below
/// based on external information. See "Implementing External Indexes" below
///
/// # Implementing External Indexes
///
Expand Down Expand Up @@ -191,22 +191,22 @@ pub use writer::plan_to_parquet;
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
/// configured to open parquet files with a [`ParquetOpener`].
/// configured to open parquet files with a [`ParquetOpener`].
///
/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open
/// the file.
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by
/// applying predicates to metadata. The plan and projections are used to
/// determine what pages must be read.
/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by
/// applying predicates to metadata. The plan and projections are used to
/// determine what pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
/// and incrementally decoding them.
///
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1358,14 +1358,14 @@ impl<'a> StatisticsConverter<'a> {
/// # Parameters:
///
/// * `column_page_index`: The parquet column page indices, read from
/// `ParquetMetaData` column_index
/// `ParquetMetaData` column_index
///
/// * `column_offset_index`: The parquet column offset indices, read from
/// `ParquetMetaData` offset_index
/// `ParquetMetaData` offset_index
///
/// * `row_group_indices`: The indices of the row groups, that are used to
/// extract the column page index and offset index on a per row group
/// per column basis.
/// extract the column page index and offset index on a per row group
/// per column basis.
///
/// # Return Value
///
Expand Down Expand Up @@ -1486,13 +1486,13 @@ impl<'a> StatisticsConverter<'a> {
/// # Parameters:
///
/// * `column_offset_index`: The parquet column offset indices, read from
/// `ParquetMetaData` offset_index
/// `ParquetMetaData` offset_index
///
/// * `row_group_metadatas`: The metadata slice of the row groups, read
/// from `ParquetMetaData` row_groups
/// from `ParquetMetaData` row_groups
///
/// * `row_group_indices`: The indices of the row groups, that are used to
/// extract the column offset index on a per row group per column basis.
/// extract the column offset index on a per row group per column basis.
///
/// See docs on [`Self::data_page_mins`] for details.
pub fn data_page_row_counts<I>(
Expand Down
Loading