Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
477 changes: 220 additions & 257 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ build = "build.rs"

[dependencies]
# Arrow and DataFusion ecosystem
arrow = "57.1.0"
arrow-array = "57.1.0"
arrow-flight = { version = "57.1.0", features = [
arrow = "58.0.0"
arrow-array = "58.0.0"
arrow-flight = { version = "58.0.0", features = [
"tls-aws-lc",
"tls-native-roots",
] }
arrow-ipc = { version = "57.1.0", features = ["zstd"] }
arrow-json = "57.1.0"
arrow-schema = { version = "57.1.0", features = ["serde"] }
arrow-select = "57.1.0"
datafusion = "51.0.0"
object_store = { version = "0.12.4", features = [
arrow-ipc = { version = "58.0.0", features = ["zstd"] }
arrow-json = "58.0.0"
arrow-schema = { version = "58.0.0", features = ["serde"] }
arrow-select = "58.0.0"
datafusion = "53.0.0"
object_store = { version = "0.13.1", features = [
"cloud",
"aws",
"azure",
"gcp",
] }
parquet = "57.1.0"
parquet = "58.0.0"

# Web server and HTTP-related
actix-cors = "0.7.0"
Expand Down Expand Up @@ -200,7 +200,7 @@ anyhow = "1.0"

[dev-dependencies]
rstest = "0.23.0"
arrow = "57.1.0"
arrow = "58.0.0"
temp-dir = "0.1.14"

[package.metadata.parseable_ui]
Expand Down
2 changes: 1 addition & 1 deletion src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use chrono::NaiveDate;
use clokwerk::{AsyncScheduler, Interval, Job};
use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered};
use futures_util::TryFutureExt;
use object_store::{ObjectStore, local::LocalFileSystem};
use object_store::{ObjectStoreExt, local::LocalFileSystem};
use once_cell::sync::OnceCell;
use parquet::errors::ParquetError;
use relative_path::RelativePathBuf;
Expand Down
2 changes: 1 addition & 1 deletion src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ impl Stream {
let time_partition_idx = merged_schema.index_of(time_partition_field).unwrap_or(0);

let mut props = WriterProperties::builder()
.set_max_row_group_size(self.options.row_group_size)
.set_max_row_group_row_count(Some(self.options.row_group_size))
.set_compression(self.options.parquet_compression.into())
.set_column_encoding(
ColumnPath::new(vec![time_partition_field.to_string()]),
Expand Down
20 changes: 10 additions & 10 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,21 @@ impl StandardTableProvider {

// parquet file source, default table parquet options
let file_source = if let Some(phyiscal_expr) = filters {
ParquetSource::default().with_predicate(phyiscal_expr)
ParquetSource::new(self.schema.clone()).with_predicate(phyiscal_expr)
} else {
ParquetSource::default()
ParquetSource::new(self.schema.clone())
};

let mut conf_builder =
FileScanConfigBuilder::new(object_store_url, self.schema.clone(), file_source.into())
.with_statistics(statistics)
.with_batch_size(Some(20000))
.with_constraints(Constraints::default())
.with_file_groups(file_groups)
.with_output_ordering(vec![LexOrdering::new([sort_expr]).unwrap()]);
let mut conf_builder = FileScanConfigBuilder::new(object_store_url, file_source.into())
.with_statistics(statistics)
.with_batch_size(Some(20000))
.with_constraints(Constraints::default())
.with_file_groups(file_groups)
.with_output_ordering(vec![LexOrdering::new([sort_expr]).unwrap()]);

// Set projection if provided
if let Some(proj_indices) = projection {
conf_builder = conf_builder.with_projection_indices(Some(proj_indices.clone()));
conf_builder = conf_builder.with_projection_indices(Some(proj_indices.clone()))?;
}

// Set limit if provided
Expand Down Expand Up @@ -423,6 +422,7 @@ impl StandardTableProvider {
min_value: Precision::Exact(min),
distinct_count: Precision::Absent,
sum_value: Precision::Absent,
byte_size: Precision::Absent,
})
.unwrap_or_default()
})
Expand Down
3 changes: 2 additions & 1 deletion src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use datafusion::{
};
use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered};
use object_store::{
BackoffConfig, ClientOptions, ListResult, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
BackoffConfig, ClientOptions, ListResult, ObjectMeta, ObjectStore, ObjectStoreExt, PutPayload,
RetryConfig,
azure::{MicrosoftAzure, MicrosoftAzureBuilder},
buffered::BufReader,
limit::LimitStore,
Expand Down
3 changes: 2 additions & 1 deletion src/storage/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use datafusion::{
};
use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered};
use object_store::{
BackoffConfig, ClientOptions, ListResult, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
BackoffConfig, ClientOptions, ListResult, ObjectMeta, ObjectStore, ObjectStoreExt, PutPayload,
RetryConfig,
buffered::BufReader,
gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder},
limit::LimitStore,
Expand Down
189 changes: 24 additions & 165 deletions src/storage/metrics_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures_util::{Stream, StreamExt, stream::BoxStream};
use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
path::Path,
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions,
Result as ObjectStoreResult, path::Path,
};

use crate::metrics::STORAGE_REQUEST_RESPONSE_TIME;
Expand Down Expand Up @@ -85,31 +85,10 @@ impl<T: ObjectStore> std::fmt::Display for MetricLayer<T> {

#[async_trait]
impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
/// PutPayload.from_bytes(bytes)
async fn put(
&self,
location: &Path,
bytes: PutPayload, /* PutPayload */
) -> ObjectStoreResult<PutResult> {
let time = time::Instant::now();
let put_result = self.inner.put(location, bytes).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &put_result {
Ok(_) => "200",
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "PUT", status])
.observe(elapsed);
put_result
}

async fn put_opts(
&self,
location: &Path,
payload: PutPayload, /* PutPayload */
payload: PutPayload,
opts: PutOptions,
) -> ObjectStoreResult<PutResult> {
let time = time::Instant::now();
Expand All @@ -122,27 +101,11 @@ impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "PUT_OPTS", status])
.with_label_values(&[&self.provider, "PUT", status])
.observe(elapsed);
put_result
}

// // ! removed in object_store 0.10.0
// async fn abort_multipart(
// &self,
// location: &Path,
// multipart_id: &MultipartId,
// ) -> object_store::Result<()> {
// let time = time::Instant::now();
// let elapsed = time.elapsed().as_secs_f64();
// self.inner.abort_multipart(location, multipart_id).await?;
// STORAGE_REQUEST_RESPONSE_TIME
// .with_label_values(&["PUT_MULTIPART_ABORT", "200"])
// .observe(elapsed);
// Ok(())
// }

/* Keep for easier migration to object_store 0.10.0 */
async fn put_multipart_opts(
&self,
location: &Path,
Expand All @@ -157,46 +120,12 @@ impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "PUT_MULTIPART_OPTS", status])
.observe(elapsed);
result
}

// todo completly tracking multipart upload
async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> /* ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> */
{
let time = time::Instant::now();
let result = self.inner.put_multipart(location).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &result {
Ok(_) => "200",
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "PUT_MULTIPART", status])
.observe(elapsed);
result
}

async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
let time = time::Instant::now();
let get_result = self.inner.get(location).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &get_result {
Ok(_) => "200",
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "GET", status])
.observe(elapsed);
get_result
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
let time = time::Instant::now();
let result = self.inner.get_opts(location, options).await;
Expand All @@ -208,23 +137,7 @@ impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "GET_OPTS", status])
.observe(elapsed);
result
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
let time = time::Instant::now();
let result = self.inner.get_range(location, range).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &result {
Ok(_) => "200",
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "GET_RANGE", status])
.with_label_values(&[&self.provider, "GET", status])
.observe(elapsed);
result
}
Expand All @@ -249,42 +162,10 @@ impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
result
}

async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
let time = time::Instant::now();
let result = self.inner.head(location).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &result {
Ok(_) => "200",
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "HEAD", status])
.observe(elapsed);
result
}

async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
let time = time::Instant::now();
let result = self.inner.delete(location).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &result {
Ok(_) => "200",
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "DELETE", status])
.observe(elapsed);
result
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, ObjectStoreResult<Path>>,
) -> BoxStream<'a, ObjectStoreResult<Path>> {
fn delete_stream(
&self,
locations: BoxStream<'static, ObjectStoreResult<Path>>,
) -> BoxStream<'static, ObjectStoreResult<Path>> {
self.inner.delete_stream(locations)
}

Expand Down Expand Up @@ -335,9 +216,14 @@ impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
result
}

async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> ObjectStoreResult<()> {
let time = time::Instant::now();
let result = self.inner.copy(from, to).await;
let result = self.inner.copy_opts(from, to, options).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &result {
Expand All @@ -351,9 +237,14 @@ impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
result
}

async fn rename(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
async fn rename_opts(
&self,
from: &Path,
to: &Path,
options: RenameOptions,
) -> ObjectStoreResult<()> {
let time = time::Instant::now();
let result = self.inner.rename(from, to).await;
let result = self.inner.rename_opts(from, to, options).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &result {
Expand All @@ -366,38 +257,6 @@ impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
.observe(elapsed);
result
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
let time = time::Instant::now();
let result = self.inner.copy_if_not_exists(from, to).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &result {
Ok(_) => "200",
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "COPY_IF", status])
.observe(elapsed);
result
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
let time = time::Instant::now();
let result = self.inner.rename_if_not_exists(from, to).await;
let elapsed = time.elapsed().as_secs_f64();

let status = match &result {
Ok(_) => "200",
Err(err) => error_to_status_code(err),
};

STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&[&self.provider, "RENAME_IF", status])
.observe(elapsed);
result
}
}

struct StreamMetricWrapper<'a, T> {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use datafusion::{
};
use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered};
use object_store::{
BackoffConfig, ClientOptions, ListResult, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
BackoffConfig, ClientOptions, ListResult, ObjectMeta, ObjectStore, ObjectStoreExt, PutPayload,
RetryConfig,
aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum},
buffered::BufReader,
limit::LimitStore,
Expand Down
Loading