Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions arrow-cast/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,17 +1052,20 @@ pub fn cast_with_options(
Int64 => cast_numeric_to_string::<Int64Type, i32>(array),
Float32 => cast_numeric_to_string::<Float32Type, i32>(array),
Float64 => cast_numeric_to_string::<Float64Type, i32>(array),
Timestamp(TimeUnit::Nanosecond, tz) => {
cast_timestamp_to_string::<TimestampNanosecondType, i32>(array, tz)
}
Timestamp(TimeUnit::Microsecond, tz) => {
cast_timestamp_to_string::<TimestampMicrosecondType, i32>(array, tz)
}
Timestamp(TimeUnit::Millisecond, tz) => {
cast_timestamp_to_string::<TimestampMillisecondType, i32>(array, tz)
}
Timestamp(TimeUnit::Nanosecond, tz) => cast_timestamp_to_string::<
TimestampNanosecondType,
i32,
>(array, tz.as_ref()),
Timestamp(TimeUnit::Microsecond, tz) => cast_timestamp_to_string::<
TimestampMicrosecondType,
i32,
>(array, tz.as_ref()),
Timestamp(TimeUnit::Millisecond, tz) => cast_timestamp_to_string::<
TimestampMillisecondType,
i32,
>(array, tz.as_ref()),
Timestamp(TimeUnit::Second, tz) => {
cast_timestamp_to_string::<TimestampSecondType, i32>(array, tz)
cast_timestamp_to_string::<TimestampSecondType, i32>(array, tz.as_ref())
}
Date32 => cast_date32_to_string::<i32>(array),
Date64 => cast_date64_to_string::<i32>(array),
Expand Down Expand Up @@ -1106,17 +1109,20 @@ pub fn cast_with_options(
Int64 => cast_numeric_to_string::<Int64Type, i64>(array),
Float32 => cast_numeric_to_string::<Float32Type, i64>(array),
Float64 => cast_numeric_to_string::<Float64Type, i64>(array),
Timestamp(TimeUnit::Nanosecond, tz) => {
cast_timestamp_to_string::<TimestampNanosecondType, i64>(array, tz)
}
Timestamp(TimeUnit::Microsecond, tz) => {
cast_timestamp_to_string::<TimestampMicrosecondType, i64>(array, tz)
}
Timestamp(TimeUnit::Millisecond, tz) => {
cast_timestamp_to_string::<TimestampMillisecondType, i64>(array, tz)
}
Timestamp(TimeUnit::Nanosecond, tz) => cast_timestamp_to_string::<
TimestampNanosecondType,
i64,
>(array, tz.as_ref()),
Timestamp(TimeUnit::Microsecond, tz) => cast_timestamp_to_string::<
TimestampMicrosecondType,
i64,
>(array, tz.as_ref()),
Timestamp(TimeUnit::Millisecond, tz) => cast_timestamp_to_string::<
TimestampMillisecondType,
i64,
>(array, tz.as_ref()),
Timestamp(TimeUnit::Second, tz) => {
cast_timestamp_to_string::<TimestampSecondType, i64>(array, tz)
cast_timestamp_to_string::<TimestampSecondType, i64>(array, tz.as_ref())
}
Date32 => cast_date32_to_string::<i64>(array),
Date64 => cast_date64_to_string::<i64>(array),
Expand Down Expand Up @@ -2472,7 +2478,7 @@ where
/// Cast timestamp types to Utf8/LargeUtf8
fn cast_timestamp_to_string<T, OffsetSize>(
array: &ArrayRef,
tz: &Option<String>,
tz: Option<&String>,
) -> Result<ArrayRef, ArrowError>
where
T: ArrowTemporalType + ArrowPrimitiveType,
Expand Down
2 changes: 1 addition & 1 deletion arrow-ipc/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use arrow_schema::ArrowError;
const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
const LENGTH_OF_PREFIX_DATA: i64 = 8;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// Represents compressing a ipc stream using a particular compression algorithm
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionCodec {
Lz4Frame,
Zstd,
Expand Down
8 changes: 4 additions & 4 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl IpcDataGenerator {
offset,
array.len(),
array.null_count(),
&compression_codec,
compression_codec,
write_options,
)?;
}
Expand Down Expand Up @@ -452,7 +452,7 @@ impl IpcDataGenerator {
0,
array_data.len(),
array_data.null_count(),
&compression_codec,
compression_codec,
write_options,
)?;

Expand Down Expand Up @@ -1058,7 +1058,7 @@ fn write_array_data(
offset: i64,
num_rows: usize,
null_count: usize,
compression_codec: &Option<CompressionCodec>,
compression_codec: Option<CompressionCodec>,
write_options: &IpcWriteOptions,
) -> Result<i64, ArrowError> {
let mut offset = offset;
Expand Down Expand Up @@ -1234,7 +1234,7 @@ fn write_buffer(
buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
arrow_data: &mut Vec<u8>, // output stream
offset: i64, // current output stream offset
compression_codec: &Option<CompressionCodec>,
compression_codec: Option<CompressionCodec>,
) -> Result<i64, ArrowError> {
let len: i64 = match compression_codec {
Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
Expand Down
7 changes: 4 additions & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,8 @@ mod tests {

let expected_data = match opts.row_selections {
Some((selections, row_count)) => {
let mut without_skip_data = gen_expected_data::<T>(&def_levels, &values);
let mut without_skip_data =
gen_expected_data::<T>(def_levels.as_ref(), &values);

let mut skip_data: Vec<Option<T::T>> = vec![];
let dequeue: VecDeque<RowSelector> = selections.clone().into();
Expand All @@ -1585,7 +1586,7 @@ mod tests {
}
None => {
//get flatten table data
let expected_data = gen_expected_data::<T>(&def_levels, &values);
let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
expected_data
}
Expand Down Expand Up @@ -1654,7 +1655,7 @@ mod tests {
}

fn gen_expected_data<T: DataType>(
def_levels: &Option<Vec<Vec<i16>>>,
def_levels: Option<&Vec<Vec<i16>>>,
values: &[Vec<T::T>],
) -> Vec<Option<T::T>> {
let data: Vec<Option<T::T>> = match def_levels {
Expand Down
6 changes: 2 additions & 4 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,10 +903,8 @@ mod tests {

// Check offset indexes are present for all columns
for rg in metadata_with_index.row_groups() {
let page_locations = rg
.page_offset_index()
.as_ref()
.expect("expected page offset index");
let page_locations =
rg.page_offset_index().expect("expected page offset index");
assert_eq!(page_locations.len(), rg.columns().len())
}

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: &Option<Statistics>) {
fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) {
// update the column index
let null_page = (self.page_metrics.num_buffered_rows as u64)
== self.page_metrics.num_page_nulls;
Expand Down Expand Up @@ -664,7 +664,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
};

// update column and offset index
self.update_column_offset_index(&page_statistics);
self.update_column_offset_index(page_statistics.as_ref());

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ impl RowGroupMetaData {
}

/// Returns reference of page offset index of all column in this row group.
pub fn page_offset_index(&self) -> &Option<Vec<Vec<PageLocation>>> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically an API change

&self.page_offset_index
pub fn page_offset_index(&self) -> Option<&Vec<Vec<PageLocation>>> {
self.page_offset_index.as_ref()
}

/// Returns reference to a schema descriptor.
Expand Down
2 changes: 1 addition & 1 deletion parquet/tests/arrow_writer_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) {
for (row_group, row_group_layout) in meta.row_groups().iter().zip(&layout.row_groups)
{
// Check against offset index
let offset_index = row_group.page_offset_index().as_ref().unwrap();
let offset_index = row_group.page_offset_index().unwrap();
assert_eq!(offset_index.len(), row_group_layout.columns.len());

for (column_index, column_layout) in
Expand Down
4 changes: 2 additions & 2 deletions parquet_derive/src/parquet_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,8 @@ mod test {
let struct_def: proc_macro2::TokenStream = quote! {
struct StringBorrower<'a> {
optional_str: Option<&'a str>,
optional_string: &Option<String>,
optional_dumb_int: &Option<&i32>,
optional_string: Option<&String>,
optional_dumb_int: Option<&i32>,
}
};

Expand Down