From ddfe23e6052e980644d3e6e7681870da5b0b4cfd Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 20 Dec 2023 16:41:48 -0500 Subject: [PATCH 1/5] add example --- .../examples/dataframe_output.rs | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 datafusion-examples/examples/dataframe_output.rs diff --git a/datafusion-examples/examples/dataframe_output.rs b/datafusion-examples/examples/dataframe_output.rs new file mode 100644 index 0000000000000..fa478b981c29f --- /dev/null +++ b/datafusion-examples/examples/dataframe_output.rs @@ -0,0 +1,58 @@ +use datafusion::{dataframe::DataFrameWriteOptions, prelude::*}; +use datafusion_common::DataFusionError; +use object_store::local::LocalFileSystem; +use std::sync::Arc; +use url::Url; + +/// This example demonstrates the various methods to write out a DataFrame +#[tokio::main] +async fn main() -> Result<(), DataFusionError> { + let ctx = SessionContext::new(); + let local = Arc::new(LocalFileSystem::new_with_prefix("./").unwrap()); + let local_url = Url::parse("file://local").unwrap(); + ctx.runtime_env().register_object_store(&local_url, local); + + let mut df = ctx.sql( + "values ('a'), ('b'), ('c')" + ).await + .unwrap(); + + // Ensure the column names and types match the target table + df = df.with_column_renamed("column1", "tablecol1").unwrap(); + + ctx.sql( + "create external table + test(tablecol1 varchar) + stored as parquet + location './datafusion-examples/test_table/'", + ) + .await? + .collect() + .await?; + + df.clone() + .write_table("test", DataFrameWriteOptions::new()) + .await?; + + df.clone() + .write_parquet("file://local/datafusion-examples/test_parquet/", + DataFrameWriteOptions::new(), + None, + ) + .await?; + + df.clone() + .write_csv("file://local/datafusion-examples/test_csv/", + DataFrameWriteOptions::new(), + None, + ) + .await?; + + df.clone() + .write_json("file://local/datafusion-examples/test_json/", + DataFrameWriteOptions::new(), + ) + .await?; + + Ok(()) +} From 53d5ec3d65080fcc1f5531cb8144bd01320e0a99 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 20 Dec 2023 16:52:15 -0500 Subject: [PATCH 2/5] add license --- .../examples/dataframe_output.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion-examples/examples/dataframe_output.rs b/datafusion-examples/examples/dataframe_output.rs index fa478b981c29f..ee38a1df81a96 100644 --- a/datafusion-examples/examples/dataframe_output.rs +++ b/datafusion-examples/examples/dataframe_output.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use datafusion::{dataframe::DataFrameWriteOptions, prelude::*}; use datafusion_common::DataFusionError; use object_store::local::LocalFileSystem; From 354f576909ee84d3161f9b2d22bbeed2b299fa52 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 20 Dec 2023 16:53:53 -0500 Subject: [PATCH 3/5] fmt --- .../examples/dataframe_output.rs | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/datafusion-examples/examples/dataframe_output.rs b/datafusion-examples/examples/dataframe_output.rs index ee38a1df81a96..6a4371f69971b 100644 --- a/datafusion-examples/examples/dataframe_output.rs +++ b/datafusion-examples/examples/dataframe_output.rs @@ -29,10 +29,7 @@ async fn main() -> Result<(), DataFusionError> { let local_url = Url::parse("file://local").unwrap(); ctx.runtime_env().register_object_store(&local_url, local); - let mut df = ctx.sql( - "values ('a'), ('b'), ('c')" - ).await - .unwrap(); + let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap(); // Ensure the column names and types match the target table df = df.with_column_renamed("column1", "tablecol1").unwrap(); @@ -52,23 +49,26 @@ async fn main() -> Result<(), DataFusionError> { .await?; df.clone() - .write_parquet("file://local/datafusion-examples/test_parquet/", - DataFrameWriteOptions::new(), - None, - ) + .write_parquet( + "file://local/datafusion-examples/test_parquet/", + DataFrameWriteOptions::new(), + None, + ) .await?; df.clone() - .write_csv("file://local/datafusion-examples/test_csv/", - DataFrameWriteOptions::new(), - None, - ) + .write_csv( + "file://local/datafusion-examples/test_csv/", + DataFrameWriteOptions::new(), + None, + ) .await?; df.clone() - .write_json("file://local/datafusion-examples/test_json/", - DataFrameWriteOptions::new(), - ) + .write_json( + "file://local/datafusion-examples/test_json/", + DataFrameWriteOptions::new(), + ) .await?; Ok(()) From 6c5a546942873f719bb03fe11b03af59c1b76731 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 21 Dec 2023 08:14:33 -0500 Subject: [PATCH 4/5] simplify dataframe_output examples --- .../examples/dataframe_output.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/datafusion-examples/examples/dataframe_output.rs b/datafusion-examples/examples/dataframe_output.rs index 6a4371f69971b..8c46c22616c13 100644 --- a/datafusion-examples/examples/dataframe_output.rs +++ b/datafusion-examples/examples/dataframe_output.rs @@ -17,17 +17,13 @@ use datafusion::{dataframe::DataFrameWriteOptions, prelude::*}; use datafusion_common::DataFusionError; -use object_store::local::LocalFileSystem; -use std::sync::Arc; -use url::Url; -/// This example demonstrates the various methods to write out a DataFrame +/// This example demonstrates the various methods to write out a DataFrame to local storage. +/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example +/// using a remote object store. #[tokio::main] async fn main() -> Result<(), DataFusionError> { let ctx = SessionContext::new(); - let local = Arc::new(LocalFileSystem::new_with_prefix("./").unwrap()); - let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap(); @@ -44,13 +40,16 @@ async fn main() -> Result<(), DataFusionError> { .collect() .await?; + // This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c'). + // The behavior of write_table depends on the TableProvider's implementation + // of the insert_into method. df.clone() .write_table("test", DataFrameWriteOptions::new()) .await?; df.clone() .write_parquet( - "file://local/datafusion-examples/test_parquet/", + "./datafusion-examples/test_parquet/", DataFrameWriteOptions::new(), None, ) @@ -58,7 +57,7 @@ async fn main() -> Result<(), DataFusionError> { df.clone() .write_csv( - "file://local/datafusion-examples/test_csv/", + "./datafusion-examples/test_csv/", DataFrameWriteOptions::new(), None, ) @@ -66,7 +65,7 @@ async fn main() -> Result<(), DataFusionError> { df.clone() .write_json( - "file://local/datafusion-examples/test_json/", + "./datafusion-examples/test_json/", DataFrameWriteOptions::new(), ) .await?; From b1d12b1e0ceba44f51ca5dd8de0758befc30407f Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 21 Dec 2023 17:35:27 -0500 Subject: [PATCH 5/5] review comments --- datafusion-examples/README.md | 3 ++- datafusion-examples/examples/dataframe_output.rs | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 305422ccd0be0..057cdd4752739 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -47,7 +47,8 @@ cargo run --example csv_sql - [`catalog.rs`](examples/external_dependency/catalog.rs): Register the table into a custom catalog - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) - [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame against a local parquet file -- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 +- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 +- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame - [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and anaylze `Expr`s diff --git a/datafusion-examples/examples/dataframe_output.rs b/datafusion-examples/examples/dataframe_output.rs index 8c46c22616c13..c773384dfcd50 100644 --- a/datafusion-examples/examples/dataframe_output.rs +++ b/datafusion-examples/examples/dataframe_output.rs @@ -16,7 +16,7 @@ // under the License. use datafusion::{dataframe::DataFrameWriteOptions, prelude::*}; -use datafusion_common::DataFusionError; +use datafusion_common::{parsers::CompressionTypeVariant, DataFusionError}; /// This example demonstrates the various methods to write out a DataFrame to local storage. /// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example @@ -58,7 +58,9 @@ async fn main() -> Result<(), DataFusionError> { df.clone() .write_csv( "./datafusion-examples/test_csv/", - DataFrameWriteOptions::new(), + // DataFrameWriteOptions contains options which control how data is written + // such as compression codec + DataFrameWriteOptions::new().with_compression(CompressionTypeVariant::GZIP), None, ) .await?;