From 0cb2ed941946026424aaa3a9601bd3070b6d3e9d Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 26 Oct 2022 12:20:54 +0100 Subject: [PATCH 01/12] Upgrade DataFusion to 13.0.0, Arrow to 25.0.0 The actual 13.0.0 DF release uses Arrow 24.0.0, but we need to pick up 25.0.0, since it brings back the Arrow Schema/Field-to-JSON serialization code (albeit in a different crate for integration tests). https://github.com/apache/arrow-rs/pull/2868 https://github.com/apache/arrow-rs/pull/2724 --- Cargo.lock | 594 ++++++++++++++++++++++++++----------------- Cargo.toml | 25 +- src/context.rs | 6 +- src/frontend/http.rs | 3 +- src/schema.rs | 5 +- 5 files changed, 386 insertions(+), 247 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71b72253..e5c11107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom 0.2.7", + "getrandom 0.2.8", "once_cell", "version_check", ] @@ -36,7 +36,7 @@ checksum = "57e6e951cfbb2db8de1828d49073a113a29fd7117b1596caa781a258c7e38d72" dependencies = [ "cfg-if", "const-random", - "getrandom 0.2.7", + "getrandom 0.2.8", "once_cell", "version_check", ] @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.65" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" +checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "arrayref" @@ -94,11 +94,15 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "22.0.0" +version = "25.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5936b4185aa57cb9790d8742aab22859045ce5cc6a3023796240cd101c19335" +checksum = "76312eb67808c67341f4234861c4fcd2f9868f55e88fa2186ab3b357a6c5830b" dependencies = [ "ahash 0.8.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "bitflags", "chrono", "comfy-table", @@ -113,10 +117,67 @@ dependencies = [ "num", "regex", "regex-syntax", + "serde_json", +] + +[[package]] +name = "arrow-array" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69dd2c257fa76de0bcc63cabe8c81d34c46ef6fa7651e3e497922c3c9878bd67" +dependencies = [ + "ahash 0.8.0", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af963e71bdbbf928231d521083ddc8e8068cf5c8d45d4edcfeaf7eb5cdd779a9" +dependencies = [ + "half", + "num", +] + +[[package]] +name = "arrow-data" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52554ffff560c366d7210c2621a3cf1dc408f9969a0c7688a3ba0a62248a945d" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-integration-test" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d621824c10a3ccc55e1365363074e0b455e907ef38f7d55b516d8dc1c036e18" +dependencies = [ + "arrow", + "arrow-buffer", + "hex", + "num", "serde", "serde_json", ] +[[package]] +name = "arrow-schema" +version = "25.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a5518f2bd7775057391f88257627cbb760ba3e1c2f2444a005ba79158624654" + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -138,6 +199,21 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compression" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" +dependencies = [ + "bzip2", + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-io" version = "1.9.0" @@ -160,18 +236,19 @@ dependencies = [ [[package]] name = "async-lock" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" +checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685" dependencies = [ "event-listener", + "futures-lite", ] [[package]] name = "async-trait" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" dependencies = [ "proc-macro2", "quote", @@ -206,9 +283,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "base64" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "bincode" @@ -311,9 +388,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.11.0" +version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" +checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" [[package]] name = "bytecount" @@ -333,6 +410,27 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +[[package]] +name = "bzip2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6afcd980b5f3a45017c57e57a2fcccbb351cc43a356ce117ef760ef8052b89b0" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cache-padded" version = "1.2.0" @@ -403,9 +501,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.2.22" +version = "3.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750" +checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" dependencies = [ "atty", "bitflags", @@ -452,9 +550,9 @@ dependencies = [ [[package]] name = "comfy-table" -version = "6.1.0" +version = "6.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85914173c2f558d61613bfbbf1911f14e630895087a7ed2fafc0f5319e1536e7" +checksum = "7b3d16bb3da60be2f7c7acfc438f2ae6f3496897ce68c291d0509bb67b4e248e" dependencies = [ "strum", "strum_macros", @@ -491,9 +589,9 @@ dependencies = [ [[package]] name = "const-random" -version = "0.1.13" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f590d95d011aa80b063ffe3253422ed5aa462af4e9867d43ce8337562bac77c4" +checksum = "368a7a772ead6ce7e1de82bfb04c485f3db8ec744f72925af5735e29a22cc18e" dependencies = [ "const-random-macro", "proc-macro-hack", @@ -501,12 +599,12 @@ dependencies = [ [[package]] name = "const-random-macro" -version = "0.1.13" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "615f6e27d000a2bffbc7f2f6a8669179378fa27ee4d0a509e985dfc0a7defb40" +checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" dependencies = [ - "getrandom 0.2.7", - "lazy_static", + "getrandom 0.2.8", + "once_cell", "proc-macro-hack", "tiny-keccak", ] @@ -519,8 +617,8 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" [[package]] name = "convergence" -version = "0.6.0" -source = "git+https://github.com/splitgraph/convergence?branch=datafusion-12-upgrade#b4882ad7a1b019cbfb19ba41e90147c1012d1a36" +version = "0.7.0" +source = "git+https://github.com/splitgraph/convergence?branch=datafusion-13-upgrade#3104b813d0cb001477876c019aea42cbbaa294d3" dependencies = [ "async-trait", "bytes", @@ -534,8 +632,8 @@ dependencies = [ [[package]] name = "convergence-arrow" -version = "0.6.0" -source = "git+https://github.com/splitgraph/convergence?branch=datafusion-12-upgrade#b4882ad7a1b019cbfb19ba41e90147c1012d1a36" +version = "0.7.0" +source = "git+https://github.com/splitgraph/convergence?branch=datafusion-13-upgrade#3104b813d0cb001477876c019aea42cbbaa294d3" dependencies = [ "async-trait", "chrono", @@ -792,9 +890,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.78" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19f39818dcfc97d45b03953c1292efc4e80954e1583c4aa770bac1383e2310a4" +checksum = "6b7d4e43b25d3c994662706a1d4fcfc32aaa6afd287502c111b237093bb23f3a" dependencies = [ "cc", "cxxbridge-flags", @@ -804,9 +902,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.78" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e580d70777c116df50c390d1211993f62d40302881e54d4b79727acb83d0199" +checksum = "84f8829ddc213e2c1368e51a2564c552b65a8cb6a28f31e576270ac81d5e5827" dependencies = [ "cc", "codespan-reporting", @@ -819,15 +917,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.78" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56a46460b88d1cec95112c8c363f0e2c39afdb237f60583b0b36343bf627ea9c" +checksum = "e72537424b474af1460806647c41d4b6d35d09ef7fe031c5c2fa5766047cc56a" [[package]] name = "cxxbridge-macro" -version = "1.0.78" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747b608fecf06b0d72d440f27acc99288207324b793be2c17991839f3d4995ea" +checksum = "309e4fb93eed90e1e14bea0da16b209f81813ba9fc7830c20ed151dd7bc0a4d7" dependencies = [ "proc-macro2", "quote", @@ -836,14 +934,15 @@ dependencies = [ [[package]] name = "datafusion" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aca80caa2b0f7fdf267799b8895ac8b6341ea879db6b1e2d361ec49b47bc676" +version = "13.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=784f10bb57f86a4db2e01a6cb51da742af0dd9d9#784f10bb57f86a4db2e01a6cb51da742af0dd9d9" dependencies = [ "ahash 0.8.0", "arrow", + "async-compression", "async-trait", "bytes", + "bzip2", "chrono", "datafusion-common", "datafusion-expr", @@ -851,6 +950,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-row", "datafusion-sql", + "flate2", "futures", "glob", "hashbrown", @@ -859,10 +959,11 @@ dependencies = [ "log", "num_cpus", "object_store", - "ordered-float 3.2.0", - "parking_lot 0.12.1", + "ordered-float 3.3.0", + "parking_lot", "parquet", "paste", + "percent-encoding", "pin-project-lite", "rand 0.8.5", "smallvec", @@ -870,41 +971,39 @@ dependencies = [ "tempfile", "tokio", "tokio-stream", + "tokio-util", "url", "uuid", ] [[package]] name = "datafusion-common" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7721fd550f6a28ad7235b62462aa51e9a43b08f8346d5cbe4d61f1e83f5df511" +version = "13.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=784f10bb57f86a4db2e01a6cb51da742af0dd9d9#784f10bb57f86a4db2e01a6cb51da742af0dd9d9" dependencies = [ "arrow", "object_store", - "ordered-float 3.2.0", + "ordered-float 3.3.0", "parquet", - "serde_json", "sqlparser", ] [[package]] name = "datafusion-expr" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d81255d043dc594c0ded6240e8a9be6ce8d7c22777a5093357cdb97af3d29ce" +version = "13.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=784f10bb57f86a4db2e01a6cb51da742af0dd9d9#784f10bb57f86a4db2e01a6cb51da742af0dd9d9" dependencies = [ "ahash 0.8.0", "arrow", "datafusion-common", + "log", "sqlparser", ] [[package]] name = "datafusion-optimizer" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71b39f8c75163691fff72b4a71816ad5a912e7c6963ee55f29ed1910b5a6993f" +version = "13.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=784f10bb57f86a4db2e01a6cb51da742af0dd9d9#784f10bb57f86a4db2e01a6cb51da742af0dd9d9" dependencies = [ "arrow", "async-trait", @@ -918,9 +1017,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109c4138220a109feafb63bf05418b86b17a42ece4bf047c38e4fd417572a9f7" +version = "13.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=784f10bb57f86a4db2e01a6cb51da742af0dd9d9#784f10bb57f86a4db2e01a6cb51da742af0dd9d9" dependencies = [ "ahash 0.8.0", "arrow", @@ -933,7 +1031,7 @@ dependencies = [ "hashbrown", "lazy_static", "md-5", - "ordered-float 3.2.0", + "ordered-float 3.3.0", "paste", "rand 0.8.5", "regex", @@ -943,11 +1041,11 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23ff1bd4294e862a1bd6d6c4278fee8d149f5125c138b9a3fb795540d8d9222e" +version = "13.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=784f10bb57f86a4db2e01a6cb51da742af0dd9d9#784f10bb57f86a4db2e01a6cb51da742af0dd9d9" dependencies = [ "arrow", + "async-trait", "datafusion", "datafusion-common", "datafusion-expr", @@ -957,9 +1055,8 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87a178fc0fd7693d9c9f608f7b605823eb982c6731ede0cccd99e2319cacabbc" +version = "13.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=784f10bb57f86a4db2e01a6cb51da742af0dd9d9#784f10bb57f86a4db2e01a6cb51da742af0dd9d9" dependencies = [ "arrow", "datafusion-common", @@ -969,17 +1066,13 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "148cb56e7635faff3b16019393c49b988188c3fdadd1ca90eadb322a80aa1128" +version = "13.0.0" +source = "git+https://github.com/apache/arrow-datafusion?rev=784f10bb57f86a4db2e01a6cb51da742af0dd9d9#784f10bb57f86a4db2e01a6cb51da742af0dd9d9" dependencies = [ - "ahash 0.8.0", "arrow", "datafusion-common", "datafusion-expr", - "hashbrown", "sqlparser", - "tokio", ] [[package]] @@ -1082,12 +1175,9 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" [[package]] name = "dotenvy" -version = "0.15.5" +version = "0.15.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9155c8f4dc55c7470ae9da3f63c6785245093b3f6aeb0f5bf2e968efbba314" -dependencies = [ - "dirs", -] +checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0" [[package]] name = "downcast" @@ -1297,15 +1387,15 @@ dependencies = [ [[package]] name = "fragile" -version = "1.2.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85dcb89d2b10c5f6133de2efd8c11959ce9dbb46a2f7a4cab208c4eeda6ce1ab" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" [[package]] name = "futures" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" dependencies = [ "futures-channel", "futures-core", @@ -1318,9 +1408,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" +checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", "futures-sink", @@ -1328,15 +1418,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" +checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" [[package]] name = "futures-executor" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" dependencies = [ "futures-core", "futures-task", @@ -1345,20 +1435,20 @@ dependencies = [ [[package]] name = "futures-intrusive" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e" +checksum = "1b6bdbb8c5a42b2bb5ee8dd9dc2c7d73ce3e15d26dfe100fb347ffa3f58c672b" dependencies = [ "futures-core", "lock_api", - "parking_lot 0.11.2", + "parking_lot", ] [[package]] name = "futures-io" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" [[package]] name = "futures-lite" @@ -1377,9 +1467,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" dependencies = [ "proc-macro2", "quote", @@ -1388,15 +1478,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" [[package]] name = "futures-task" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" +checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" [[package]] name = "futures-timer" @@ -1406,9 +1496,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" +checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-channel", "futures-core", @@ -1454,9 +1544,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" +checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", "libc", @@ -1507,9 +1597,9 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "h2" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be" +checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" dependencies = [ "bytes", "fnv", @@ -1531,6 +1621,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad6a9459c9c30b177b925162351f97e7d967c7ea8bab3b8352805327daf45554" dependencies = [ "crunchy", + "num-traits", ] [[package]] @@ -1754,9 +1845,9 @@ dependencies = [ [[package]] name = "iana-time-zone-haiku" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde6edd6cef363e9359ed3c98ba64590ba9eecba2293eb5a723ab32aee8926aa" +checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" dependencies = [ "cxx", "cxx-build", @@ -1800,15 +1891,15 @@ dependencies = [ [[package]] name = "integer-encoding" -version = "1.1.7" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "io-lifetimes" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ea37f355c05dde75b84bba2d767906ad522e97cd9e2eef2be7a4ab7fb442c06" +checksum = "e6e481ccbe3dea62107216d0d1138bb8ad8e5e5c43009a098bd1990272c497b0" [[package]] name = "ipnet" @@ -1964,9 +2055,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.135" +version = "0.2.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68783febc7782c6c5cb401fbda4de5a9898be1762314da0bb2c10ced61f18b0c" +checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89" [[package]] name = "libgit2-sys" @@ -1980,6 +2071,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libm" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565" + [[package]] name = "libsqlite3-sys" version = "0.24.2" @@ -2138,21 +2235,21 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" +checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" dependencies = [ "libc", "log", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] name = "mockall" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2be9a9090bc1cac2930688fa9478092a64c6a92ddc6ae0692d46b37d9cab709" +checksum = "50e4a1c770583dac7ab5e2f6c139153b783a53a1bbee9729613f193e59828326" dependencies = [ "cfg-if", "downcast", @@ -2165,9 +2262,9 @@ dependencies = [ [[package]] name = "mockall_derive" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d702a0530a0141cf4ed147cf5ec7be6f2c187d4e37fcbefc39cf34116bfe8f" +checksum = "832663583d5fa284ca8810bf7015e46c9fff9622d3cf34bd1eea5003fec06dd0" dependencies = [ "cfg-if", "proc-macro2", @@ -2189,7 +2286,7 @@ dependencies = [ "futures-util", "num_cpus", "once_cell", - "parking_lot 0.12.1", + "parking_lot", "quanta", "scheduled-thread-pool", "skeptic", @@ -2361,6 +2458,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -2396,8 +2494,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.5.0" -source = "git+https://github.com/apache/arrow-rs?rev=5f441eedff2b7621c46aded8b1caf3b665b8e8a9#5f441eedff2b7621c46aded8b1caf3b665b8e8a9" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ce10a205d9f610ae3532943039c34c145930065ce0c4284134c897fe6073b1" dependencies = [ "async-trait", "base64", @@ -2405,7 +2504,7 @@ dependencies = [ "chrono", "futures", "itertools", - "parking_lot 0.12.1", + "parking_lot", "percent-encoding", "quick-xml", "rand 0.8.5", @@ -2466,9 +2565,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.76" +version = "0.9.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5230151e44c0f05157effb743e8d517472843121cf9243e8b81393edb5acd9ce" +checksum = "b03b84c3b2d099b81f0953422b4d4ad58761589d0229b5506356afca05a3670a" dependencies = [ "autocfg", "cc", @@ -2488,9 +2587,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.2.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "129d36517b53c461acc6e1580aeb919c8ae6708a4b1eae61c4463a615d4f0411" +checksum = "1f74e330193f90ec45e2b257fa3ef6df087784157ac1ad2c1e71c62837b03aa7" dependencies = [ "num-traits", ] @@ -2517,17 +2616,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.5", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -2535,41 +2623,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.3", + "parking_lot_core", ] [[package]] name = "parking_lot_core" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall", - "smallvec", - "winapi", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" dependencies = [ "cfg-if", "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] name = "parquet" -version = "22.0.0" +version = "25.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "474c423be6f10921adab3b94b42ec7fe87c1b87e1360dee150976caee444224f" +checksum = "f7758803135c32b243e52832473fc8f7c768a0a170b0851fb1bb37904c6b3550" dependencies = [ "ahash 0.8.0", "arrow", @@ -2583,7 +2657,6 @@ dependencies = [ "lz4", "num", "num-bigint", - "parquet-format", "rand 0.8.5", "seq-macro", "snap", @@ -2592,15 +2665,6 @@ dependencies = [ "zstd", ] -[[package]] -name = "parquet-format" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f0c06cdcd5460967c485f9c40a821746f5955ad81990533c7fae95dbd9bc0b5" -dependencies = [ - "thrift", -] - [[package]] name = "paste" version = "1.0.9" @@ -2707,15 +2771,15 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" +checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" [[package]] name = "polling" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "899b00b9c8ab553c743b3e11e87c5c7d423b2a2de229ba95b24a756344748011" +checksum = "ab4609a838d88b73d8238967b60dd115cc08d38e2bbaf51ee1e4b695f89122e2" dependencies = [ "autocfg", "cfg-if", @@ -2803,9 +2867,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.46" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b" +checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" dependencies = [ "unicode-ident", ] @@ -2983,7 +3047,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.7", + "getrandom 0.2.8", ] [[package]] @@ -3043,7 +3107,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" dependencies = [ - "getrandom 0.2.7", + "getrandom 0.2.8", "redox_syscall", "thiserror", ] @@ -3194,23 +3258,23 @@ dependencies = [ [[package]] name = "rustix" -version = "0.35.11" +version = "0.35.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbb2fda4666def1433b1b05431ab402e42a1084285477222b72d6c564c417cef" +checksum = "985947f9b6423159c4726323f373be0a21bdb514c5af06a849cb3d2dce2d01e8" dependencies = [ "bitflags", "errno", "io-lifetimes", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] name = "rustls" -version = "0.20.6" +version = "0.20.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" +checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" dependencies = [ "log", "ring", @@ -3270,7 +3334,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" dependencies = [ "lazy_static", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -3279,7 +3343,7 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" dependencies = [ - "parking_lot 0.12.1", + "parking_lot", ] [[package]] @@ -3316,6 +3380,7 @@ version = "0.2.4" dependencies = [ "anyhow", "arrow", + "arrow-integration-test", "async-trait", "base64", "bytes", @@ -3328,14 +3393,13 @@ dependencies = [ "datafusion-expr", "datafusion-proto", "futures", - "hashbrown", "hex", "itertools", "log", "mockall", "moka", "object_store", - "parking_lot 0.12.1", + "parking_lot", "percent-encoding", "pretty_env_logger", "prost", @@ -3398,18 +3462,18 @@ checksum = "0772c5c30e1a0d91f6834f8e545c69281c099dfa9a3ac58d96a9fd629c8d4898" [[package]] name = "serde" -version = "1.0.145" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" +checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.145" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" +checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" dependencies = [ "proc-macro2", "quote", @@ -3418,9 +3482,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074" +checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" dependencies = [ "itoa 1.0.4", "ryu", @@ -3543,9 +3607,9 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "snafu" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd726aec4ebad65756394ff89a9b9598793d4e30121cd71690244c1e497b3aee" +checksum = "a152ba99b054b22972ee794cf04e5ef572da1229e33b65f3c57abbff0525a454" dependencies = [ "doc-comment", "snafu-derive", @@ -3553,9 +3617,9 @@ dependencies = [ [[package]] name = "snafu-derive" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712529e9b0b014eabaa345b38e06032767e3dc393e8b017e853b1d7247094e74" +checksum = "d5e79cdebbabaebb06a9bdbaedc7f159b410461f63611d4d0e3fb0fab8fed850" dependencies = [ "heck", "proc-macro2", @@ -3607,9 +3671,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.23.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0beb13adabbdda01b63d595f38c8bfd19a361e697fd94ce0098a634077bc5b25" +checksum = "0781f2b6bd03e5adf065c8e772b49eaea9f640d06a1b9130330fe8bd2563f4fd" dependencies = [ "log", ] @@ -3765,9 +3829,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.102" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fcd952facd492f9be3ef0d0b7032a6e442ee9b361d4acc2b1d0c4aaa5f613a1" +checksum = "a864042229133ada95abf3b54fdc62ef5ccabe9515b64717bcb9a1919e59445d" dependencies = [ "proc-macro2", "quote", @@ -3776,9 +3840,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.26.4" +version = "0.26.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7890fff842b8db56f2033ebee8f6efe1921475c3830c115995552914fb967580" +checksum = "c6d0dedf2e65d25b365c588382be9dc3a3ee4b0ed792366cf722d174c359d948" dependencies = [ "cfg-if", "core-foundation-sys", @@ -3853,9 +3917,9 @@ dependencies = [ [[package]] name = "textwrap" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16" +checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" @@ -3877,26 +3941,15 @@ dependencies = [ "syn", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "thrift" -version = "0.13.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c6d965454947cc7266d22716ebfd07b18d84ebaf35eec558586bbb2a8cb6b5b" +checksum = "09678c4cdbb4eed72e18b7c2af1329c69825ed16fcbac62d083fc3e2b0590ff0" dependencies = [ "byteorder", "integer-encoding", - "log", "ordered-float 1.1.1", - "threadpool", ] [[package]] @@ -3911,13 +3964,31 @@ dependencies = [ [[package]] name = "time" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d634a985c4d4238ec39cacaed2e7ae552fbd3c476b552c1deac3021b7d7eaf0c" +checksum = "0fab5c8b9980850e06d92ddbe3ab839c062c801f3927c0fb8abd6fc8e918fbca" dependencies = [ "itoa 1.0.4", "libc", "num_threads", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" + +[[package]] +name = "time-macros" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bb801831d812c562ae7d2bfb531f26e66e4e1f6b17307ba4149c5064710e5b" +dependencies = [ + "time-core", ] [[package]] @@ -3956,7 +4027,7 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -4211,7 +4282,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" dependencies = [ - "getrandom 0.2.7", + "getrandom 0.2.8", ] [[package]] @@ -4235,7 +4306,7 @@ dependencies = [ "rustversion", "sysinfo", "thiserror", - "time 0.3.15", + "time 0.3.16", ] [[package]] @@ -4388,9 +4459,9 @@ checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" [[package]] name = "wasm-encoder" -version = "0.18.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c64ac98d5d61192cc45c701b7e4bd0b9aff91e2edfc7a088406cfe2288581e2c" +checksum = "c5816e88e8ea7335016aa62eb0485747f786136d505a9b3890f8c400211d9b5f" dependencies = [ "leb128", ] @@ -4432,7 +4503,7 @@ dependencies = [ "wasmtime-jit", "wasmtime-runtime", "wat", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -4460,7 +4531,7 @@ dependencies = [ "serde", "sha2 0.9.9", "toml", - "windows-sys", + "windows-sys 0.36.1", "zstd", ] @@ -4514,7 +4585,7 @@ dependencies = [ "cfg-if", "rustix", "wasmtime-asm-macros", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -4540,7 +4611,7 @@ dependencies = [ "wasmtime-environ", "wasmtime-jit-debug", "wasmtime-runtime", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -4577,7 +4648,7 @@ dependencies = [ "wasmtime-environ", "wasmtime-fiber", "wasmtime-jit-debug", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -4594,9 +4665,9 @@ dependencies = [ [[package]] name = "wast" -version = "47.0.1" +version = "48.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b98502f3978adea49551e801a6687678e6015317d7d9470a67fe813393f2a8" +checksum = "84825b5ac7164df8260c9e2b2e814075334edbe7ac426f2469b93a5eeac23cce" dependencies = [ "leb128", "memchr", @@ -4606,9 +4677,9 @@ dependencies = [ [[package]] name = "wat" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aab4e20c60429fbba9670a6cae0fff9520046ba0aa3e6d0b1cd2653bea14898" +checksum = "129da4a03ec6d2a815f42c88f641824e789d5be0d86d2f90aa8a218c7068e0be" dependencies = [ "wast", ] @@ -4710,43 +4781,100 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", +] + +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.0", + "windows_i686_gnu 0.42.0", + "windows_i686_msvc 0.42.0", + "windows_x86_64_gnu 0.42.0", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + [[package]] name = "windows_aarch64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" + [[package]] name = "windows_i686_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" + [[package]] name = "windows_i686_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" + [[package]] name = "windows_x86_64_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" + [[package]] name = "windows_x86_64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" + [[package]] name = "winreg" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index db6b84c6..83eb9d98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,11 @@ frontend-postgres = ["convergence", "convergence-arrow"] object-store-s3 = ["object_store/aws"] [dependencies] -arrow = { version = "22.0.0", features = ["prettyprint"] } +arrow = "25.0.0" +# For the JSON format support +# https://github.com/apache/arrow-rs/pull/2868 +# https://github.com/apache/arrow-rs/pull/2724 +arrow-integration-test = "25.0.0" async-trait = "0.1.41" base64 = "0.13.0" @@ -35,11 +39,14 @@ clap = { version = "3.2.19", features = [ "derive" ] } config = "0.13.1" # PG wire protocol support -convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-12-upgrade", optional = true } -convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-12-upgrade", package = "convergence-arrow", optional = true } -datafusion = "12" -datafusion-expr = "12" -datafusion-proto = "12" +convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-13-upgrade", optional = true } +convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-13-upgrade", package = "convergence-arrow", optional = true } + +# DataFusion post-13 update that picks up Arrow 25.0.0 +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "784f10bb57f86a4db2e01a6cb51da742af0dd9d9" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "784f10bb57f86a4db2e01a6cb51da742af0dd9d9" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "784f10bb57f86a4db2e01a6cb51da742af0dd9d9" } + futures = "0.3" hashbrown = { version = "0.12", features = ["raw"] } hex = ">=0.4.0" @@ -59,7 +66,7 @@ reqwest = { version = "0.11.11", features = [ "stream" ] } serde = "1.0.138" serde_json = "1.0.81" sha2 = ">=0.10.1" -sqlparser = "0.23" +sqlparser = "0.25" sqlx = { version = "0.6.2", features = [ "runtime-tokio-rustls", "sqlite" ] } strum = ">=0.24" strum_macros = ">=0.24" @@ -72,8 +79,8 @@ warp = "0.3" wasmtime = "0.40.0" [patch.crates-io] -# Pick up https://github.com/apache/arrow-rs/pull/2731 -object_store = { git = "https://github.com/apache/arrow-rs", rev = "5f441eedff2b7621c46aded8b1caf3b665b8e8a9", package = "object_store" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "784f10bb57f86a4db2e01a6cb51da742af0dd9d9" } + [dev-dependencies] mockall = "0.11.1" diff --git a/src/context.rs b/src/context.rs index d4f2e954..90def088 100644 --- a/src/context.rs +++ b/src/context.rs @@ -42,6 +42,7 @@ use sqlparser::ast::{ AlterTableOperation, ObjectType, Statement, TableFactor, TableWithJoins, }; +use arrow_integration_test::field_to_json; use std::iter::zip; use std::sync::Arc; @@ -67,6 +68,7 @@ use datafusion::{ prelude::SessionContext, sql::{planner::SqlToRel, TableReference}, }; + use datafusion_expr::logical_plan::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable, Extension, LogicalPlan, Projection, @@ -204,7 +206,7 @@ fn build_partition_columns( PartitionColumn { name: Arc::from(column.name().to_string()), - r#type: Arc::from(column.to_json().to_string()), + r#type: Arc::from(field_to_json(column).to_string()), min_value: Arc::new(min_value), max_value: Arc::new(max_value), null_count: stats.null_count.map(|nc| nc as i32), @@ -216,7 +218,7 @@ fn build_partition_columns( .iter() .map(|column| PartitionColumn { name: Arc::from(column.name().to_string()), - r#type: Arc::from(column.to_json().to_string()), + r#type: Arc::from(field_to_json(column).to_string()), min_value: Arc::new(None), max_value: Arc::new(None), null_count: None, diff --git a/src/frontend/http.rs b/src/frontend/http.rs index 5ba1d83c..d0a8cdae 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -10,6 +10,7 @@ use warp::Rejection; use arrow::json::LineDelimitedWriter; use arrow::record_batch::{RecordBatch, RecordBatchReader}; +use arrow_integration_test::schema_from_json; use bytes::{BufMut, Bytes}; use datafusion::datasource::DefaultTableSource; @@ -297,7 +298,7 @@ pub async fn upload( load_part(p).await.map_err(ApiError::UploadBodyLoadError)?; csv_schema = Some( - Schema::from( + schema_from_json( &serde_json::from_slice::(value_bytes.as_slice()) .map_err(ApiError::UploadSchemaDeserializationError)?, ) diff --git a/src/schema.rs b/src/schema.rs index b97bd138..3a5b8fbe 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,3 +1,4 @@ +use arrow_integration_test::{field_from_json, field_to_json}; use std::sync::Arc; use datafusion::arrow::datatypes::{ @@ -13,7 +14,7 @@ pub struct Schema { impl Schema { fn field_from_json(json: Value) -> Result { - ArrowField::from(&json) + field_from_json(&json) } pub fn from_column_names_types<'a, I>(columns: I) -> Self @@ -37,7 +38,7 @@ impl Schema { self.arrow_schema .fields() .iter() - .map(|f| (f.name().clone(), f.to_json().to_string())) + .map(|f| (f.name().clone(), field_to_json(f).to_string())) .collect() } } From b2d78ef5e5e51ef815d5f58f6a537979c53c6b09 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 26 Oct 2022 12:22:10 +0100 Subject: [PATCH 02/12] Remove `hashbrown` It's now the default HashMap implementation and DF's planner uses it as well, so we can use std::HashMap everywhere. --- Cargo.toml | 1 - src/context.rs | 3 +-- src/nodes.rs | 2 +- src/provider.rs | 3 +-- src/version.rs | 2 +- tests/end_to_end.rs | 2 +- 6 files changed, 5 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 83eb9d98..4bfb4a4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,6 @@ datafusion-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "7 datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "784f10bb57f86a4db2e01a6cb51da742af0dd9d9" } futures = "0.3" -hashbrown = { version = "0.12", features = ["raw"] } hex = ">=0.4.0" itertools = ">=0.10.0" log = "0.4" diff --git a/src/context.rs b/src/context.rs index 90def088..47904d3e 100644 --- a/src/context.rs +++ b/src/context.rs @@ -8,7 +8,7 @@ use datafusion::datasource::TableProvider; use datafusion::sql::ResolvedTableReference; use itertools::Itertools; use object_store::local::LocalFileSystem; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use tokio::fs::File as AsyncFile; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; @@ -32,7 +32,6 @@ use crate::object_store::wrapped::InternalObjectStore; use crate::utils::{gc_partitions, group_partitions, hash_file}; use crate::wasm_udf::wasm::create_udf_from_wasm; use futures::{StreamExt, TryStreamExt}; -use hashbrown::HashMap; #[cfg(test)] use mockall::automock; diff --git a/src/nodes.rs b/src/nodes.rs index c70ca2d7..5b0bb9e8 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -1,7 +1,7 @@ +use std::collections::HashMap; use std::{any::Any, fmt, sync::Arc, vec}; use datafusion::logical_plan::{DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode}; -use hashbrown::HashMap; use crate::data_types::TableId; use crate::{provider::SeafowlTable, wasm_udf::data_types::CreateFunctionDetails}; diff --git a/src/provider.rs b/src/provider.rs index 7f841642..306b2ce3 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -39,7 +39,6 @@ use datafusion::{ use datafusion_proto::protobuf; use futures::future; -use hashbrown::HashMap as HashBrownMap; use log::warn; use object_store::ObjectStore; use parking_lot::RwLock; @@ -505,7 +504,7 @@ impl PruningStatistics for SeafowlPruningStatistics { // scope down the rows to which the assignment is applied pub fn project_expressions( schema: &ArrowSchema, - assignments: &HashBrownMap, + assignments: &HashMap, selection_expr: Option>, ) -> Result, String)>> { schema diff --git a/src/version.rs b/src/version.rs index 03af1e18..9c98cc8f 100644 --- a/src/version.rs +++ b/src/version.rs @@ -3,11 +3,11 @@ use crate::data_types::{TableVersionId, Timestamp}; use chrono::DateTime; use datafusion::error::{DataFusionError, Result}; use datafusion::sql::TableReference; -use hashbrown::HashMap; use itertools::Itertools; use sqlparser::ast::{ Expr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias, Value, }; +use std::collections::HashMap; use std::sync::Arc; use crate::datafusion::visit::{visit_table_table_factor, VisitorMut}; diff --git a/tests/end_to_end.rs b/tests/end_to_end.rs index 430dfc23..77713b07 100644 --- a/tests/end_to_end.rs +++ b/tests/end_to_end.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::env; use std::sync::Arc; use std::time::Duration; @@ -6,7 +7,6 @@ use arrow::record_batch::RecordBatch; use chrono::{TimeZone, Utc}; use datafusion::{assert_batches_eq, assert_contains}; use futures::TryStreamExt; -use hashbrown::HashMap; use itertools::{sorted, Itertools}; use object_store::path::Path; use seafowl::catalog::{DEFAULT_DB, DEFAULT_SCHEMA}; From 94dc4d472c34a58f804a383a798e3f1d5e758d58 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 26 Oct 2022 12:30:46 +0100 Subject: [PATCH 03/12] Port DataFusion changes for file compression types --- src/context.rs | 65 +++++++++++++++++++++++----------------- src/datafusion/parser.rs | 23 ++++++++++++++ 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/src/context.rs b/src/context.rs index 47904d3e..d763a62f 100644 --- a/src/context.rs +++ b/src/context.rs @@ -14,10 +14,10 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use std::fs::File; -use datafusion::datasource::file_format::avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}; -use datafusion::datasource::file_format::csv::{CsvFormat, DEFAULT_CSV_EXTENSION}; -use datafusion::datasource::file_format::json::{JsonFormat, DEFAULT_JSON_EXTENSION}; -use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; +use datafusion::datasource::file_format::avro::{AvroFormat}; +use datafusion::datasource::file_format::csv::{CsvFormat}; +use datafusion::datasource::file_format::json::{JsonFormat}; + use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::execution::context::SessionState; @@ -43,9 +43,11 @@ use sqlparser::ast::{ use arrow_integration_test::field_to_json; use std::iter::zip; +use std::str::FromStr; use std::sync::Arc; use datafusion::common::{Column, DFField, DFSchema, ToDFSchema}; +use datafusion::datasource::file_format::file_type::{FileCompressionType, FileType}; pub use datafusion::error::{DataFusionError as Error, Result}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_expr::execution_props::ExecutionProps; @@ -764,36 +766,43 @@ impl DefaultSeafowlContext { }) } - // Copied from DataFUsion's source code (private functions) + // Copied from DataFusion's source code (private functions) async fn create_listing_table( &self, cmd: &CreateExternalTable, ) -> Result> { - let (file_format, file_extension) = match cmd.file_type.as_str() { - "CSV" => ( - Arc::new( - CsvFormat::default() - .with_has_header(cmd.has_header) - .with_delimiter(cmd.delimiter as u8), - ) as Arc, - DEFAULT_CSV_EXTENSION, - ), - "PARQUET" => ( - Arc::new(ParquetFormat::default()) as Arc, - DEFAULT_PARQUET_EXTENSION, - ), - "AVRO" => ( - Arc::new(AvroFormat::default()) as Arc, - DEFAULT_AVRO_EXTENSION, - ), - "JSON" => ( - Arc::new(JsonFormat::default()) as Arc, - DEFAULT_JSON_EXTENSION, - ), - _ => Err(DataFusionError::Execution( + let file_compression_type = + match FileCompressionType::from_str(cmd.file_compression_type.as_str()) { + Ok(t) => t, + Err(_) => Err(DataFusionError::Execution( + "Only known FileCompressionTypes can be ListingTables!".to_string(), + ))?, + }; + + let file_type = match FileType::from_str(cmd.file_type.as_str()) { + Ok(t) => t, + Err(_) => Err(DataFusionError::Execution( "Only known FileTypes can be ListingTables!".to_string(), ))?, }; + + let file_extension = + file_type.get_ext_with_compression(file_compression_type.to_owned())?; + + let file_format: Arc = match file_type { + FileType::CSV => Arc::new( + CsvFormat::default() + .with_has_header(cmd.has_header) + .with_delimiter(cmd.delimiter as u8) + .with_file_compression_type(file_compression_type), + ), + FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::AVRO => Arc::new(AvroFormat::default()), + FileType::JSON => Arc::new( + JsonFormat::default().with_file_compression_type(file_compression_type), + ), + }; + let table = self.inner.table(cmd.name.as_str()); match (cmd.if_not_exists, table) { (true, Ok(_)) => Ok(make_dummy_exec()), @@ -806,7 +815,7 @@ impl DefaultSeafowlContext { }; let options = ListingOptions { format: file_format, - collect_stat: false, + collect_stat: self.inner.copied_config().collect_statistics, file_extension: file_extension.to_owned(), target_partitions: self.inner.copied_config().target_partitions, table_partition_cols: cmd.table_partition_cols.clone(), diff --git a/src/datafusion/parser.rs b/src/datafusion/parser.rs index fbe764d7..a447349d 100644 --- a/src/datafusion/parser.rs +++ b/src/datafusion/parser.rs @@ -50,6 +50,10 @@ fn parse_file_type(s: &str) -> Result { Ok(s.to_uppercase()) } +fn parse_file_compression_type(s: &str) -> Result { + Ok(s.to_uppercase()) +} + // XXX SEAFOWL: removed the struct definitions here because we want to use // the original datafusion::sql::parser structs in order to pass them back // to its logical planner @@ -347,6 +351,12 @@ impl<'a> DFParser<'a> { false => ',', }; + let file_compression_type = if self.parse_has_file_compression_type() { + self.parse_file_compression_type()? + } else { + "".to_string() + }; + let table_partition_cols = if self.parse_has_partition() { self.parse_partitions()? } else { @@ -365,6 +375,7 @@ impl<'a> DFParser<'a> { location, table_partition_cols, if_not_exists, + file_compression_type, }; Ok(Statement::CreateExternalTable(create)) } @@ -377,6 +388,14 @@ impl<'a> DFParser<'a> { } } + /// Parses the set of + fn parse_file_compression_type(&mut self) -> Result { + match self.parser.next_token() { + Token::Word(w) => parse_file_compression_type(&w.value), + unexpected => self.expected("one of GZIP, BZIP2", unexpected), + } + } + fn consume_token(&mut self, expected: &Token) -> bool { let token = self.parser.peek_token().to_string().to_uppercase(); let token = Token::make_keyword(&token); @@ -387,6 +406,10 @@ impl<'a> DFParser<'a> { false } } + fn parse_has_file_compression_type(&mut self) -> bool { + self.consume_token(&Token::make_keyword("COMPRESSION")) + & self.consume_token(&Token::make_keyword("TYPE")) + } fn parse_csv_has_header(&mut self) -> bool { self.consume_token(&Token::make_keyword("WITH")) From efb61873e93374421fd683d0b5adc5b29dc48dbf Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 26 Oct 2022 12:33:53 +0100 Subject: [PATCH 04/12] Fix DataFusion deprecations and symbol moves --- src/context.rs | 16 ++++++++-------- src/nodes.rs | 3 ++- src/provider.rs | 4 ++-- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/context.rs b/src/context.rs index d763a62f..7341182d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -14,9 +14,9 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use std::fs::File; -use datafusion::datasource::file_format::avro::{AvroFormat}; -use datafusion::datasource::file_format::csv::{CsvFormat}; -use datafusion::datasource::file_format::json::{JsonFormat}; +use datafusion::datasource::file_format::avro::AvroFormat; +use datafusion::datasource::file_format::csv::CsvFormat; +use datafusion::datasource::file_format::json::JsonFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -74,7 +74,7 @@ use datafusion_expr::logical_plan::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable, Extension, LogicalPlan, Projection, }; -use datafusion_expr::Expr; +use datafusion_expr::{cast, Expr}; use log::{debug, info, warn}; use prost::Message; use tempfile::TempPath; @@ -1024,10 +1024,10 @@ impl SeafowlContext for DefaultSeafowlContext { expr: target_schema.fields().iter().zip(plan.schema().field_names()).map(|(table_field, query_field_name)| { // Generate CAST (source_col AS table_col_type) AS table_col // If the type is the same, this will be optimized out. - Expr::Cast{ - expr: Box::new(Expr::Column(Column::from_name(query_field_name))), - data_type: table_field.data_type().clone() - }.alias(table_field.name()) + cast( + Expr::Column(Column::from_name(query_field_name)), + table_field.data_type().clone()).alias(table_field.name() + ) }).collect(), input: Arc::new(plan), schema: Arc::new(target_schema), diff --git a/src/nodes.rs b/src/nodes.rs index 5b0bb9e8..a64f2bff 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -1,7 +1,8 @@ +use datafusion::common::DFSchemaRef; use std::collections::HashMap; use std::{any::Any, fmt, sync::Arc, vec}; -use datafusion::logical_plan::{DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; use crate::data_types::TableId; use crate::{provider::SeafowlTable, wasm_udf::data_types::CreateFunctionDetails}; diff --git a/src/provider.rs b/src/provider.rs index 306b2ce3..fbde0023 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -29,13 +29,13 @@ use datafusion::{ }, execution::context::{SessionState, TaskContext}, logical_expr::TableType, - logical_plan::Expr, physical_expr::PhysicalSortExpr, physical_plan::{ file_format::FileScanConfig, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, }; +use datafusion_expr::Expr; use datafusion_proto::protobuf; use futures::future; @@ -220,6 +220,7 @@ impl SeafowlTable { projection: projection.clone(), limit, table_partition_cols: vec![], + config_options: Arc::new(Default::default()), }; let format = ParquetFormat::default(); @@ -533,7 +534,6 @@ pub fn project_expressions( None, vec![(sel_expr.clone(), expr)], Some(col(f.name(), schema)?), - schema, )?; } From 2635b0560f59d88b95f857fd5c006bd26446c4ba Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 26 Oct 2022 13:19:33 +0100 Subject: [PATCH 05/12] Fix some expected output change tests Arrow file hash changes and minor changes in the query plan output --- src/context.rs | 14 +++++++------- src/frontend/http.rs | 4 ++-- tests/end_to_end.rs | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/context.rs b/src/context.rs index 7341182d..9aaa43eb 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1946,12 +1946,12 @@ mod tests { use super::test_utils::mock_context; const PARTITION_1_FILE_NAME: &str = - "f48028e0f51f9447a90c407e9b0caa0f2af13f421db4939dc9b60825e0a26079.parquet"; + "4c643a98a232ba10452165d3673af89c09999b8f747efb2f4fec163fbcd325df.parquet"; const PARTITION_2_FILE_NAME: &str = - "6a4d8c5721ab70411ad52b807cdde40ade15641b6168513ebd3e1e8e4eb4505b.parquet"; + "7b1aaeaed9cf57509b2ecb31e9c298880e26cd269c93cc2fdb4973f2a6649f90.parquet"; const EXPECTED_INSERT_FILE_NAME: &str = - "3d552f85de97027297b42f6ffa644f5f1555b6b18131a4537b208b51ee4ef39f.parquet"; + "bacf07bd78884b01c3d6d80c6799e6b9bd9281fa0224a2c20b6474745376b208.parquet"; fn to_min_max_value(value: ScalarValue) -> Arc>> { Arc::from(scalar_value_to_bytes(&value)) @@ -2230,7 +2230,7 @@ mod tests { assert_eq!( format!("{:?}", plan), "Insert: some_table\ - \n Projection: CAST(#column1 AS Date64) AS date, CAST(#column2 AS Float64) AS value\ + \n Projection: CAST(column1 AS Date64) AS date, CAST(column2 AS Float64) AS value\ \n Values: (Utf8(\"2022-01-01T12:00:00\"), Int64(42))" ); } @@ -2249,8 +2249,8 @@ mod tests { .unwrap(); assert_eq!(format!("{:?}", plan), "Insert: some_table\ - \n Projection: CAST(#my_date AS Date64) AS date, CAST(#my_value AS Float64) AS value\ - \n Projection: #testdb.testcol.some_table.date AS my_date, #testdb.testcol.some_table.value AS my_value\ + \n Projection: CAST(my_date AS Date64) AS date, CAST(my_value AS Float64) AS value\ + \n Projection: testdb.testcol.some_table.date AS my_date, testdb.testcol.some_table.value AS my_value\ \n TableScan: testdb.testcol.some_table"); } @@ -2268,7 +2268,7 @@ mod tests { assert_eq!( format!("{:?}", plan), "Insert: some_table\ - \n Projection: CAST(#column1 AS Date64) AS date, CAST(#column2 AS Float64) AS value\ + \n Projection: CAST(column1 AS Date64) AS date, CAST(column2 AS Float64) AS value\ \n Values: (Utf8(\"2022-01-01T12:00:00\"), Int64(42))" ); } diff --git a/src/frontend/http.rs b/src/frontend/http.rs index d0a8cdae..418cd3d5 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -898,9 +898,9 @@ mod tests { let context = in_memory_context_with_single_table().await; let handler = filters(context, http_config_from_access_policy(free_for_all())); - let resp = query_uncached_endpoint(&handler, "SELECT 1/0").await; + let resp = query_uncached_endpoint(&handler, "SELECT 'notanint'::int").await; assert_eq!(resp.status(), StatusCode::BAD_REQUEST); - assert_eq!(resp.body(), "Arrow error: Divide by zero error"); + assert_eq!(resp.body(), "Arrow error: Cast error: Cannot cast string 'notanint' to value of Int32 type"); } #[tokio::test] diff --git a/tests/end_to_end.rs b/tests/end_to_end.rs index 77713b07..cf3cd470 100644 --- a/tests/end_to_end.rs +++ b/tests/end_to_end.rs @@ -27,11 +27,11 @@ mod http_testutils; // Object store IDs for frequently-used test data const FILENAME_1: &str = - "26e39f1717046023c2a53f69ac4d3fa2f8f790489ddf93a267766407817ad4f0.parquet"; + "d5ebca57fc79b2f43b91595ae5dde26e67a2d20e62b44f210100229e980614ad.parquet"; const FILENAME_2: &str = - "b7ffd2743b5fd11ea026065c9aaefcd827771f9cbf4631989786969b3457ded7.parquet"; + "3dcad71b37e901f358b3f9e835b9819af965cebbd31dfd11d1094776794eeabb.parquet"; const FILENAME_RECHUNKED: &str = - "370e90c844091607f5711565c638bc8b9acbc19b50f242121a88b93ec1892e6d.parquet"; + "edd2f33fc6429ff98f9f4c1047acb69db9216a803ce7aa00ef825af13da2b02f.parquet"; /// Make a SeafowlContext that's connected to a real PostgreSQL database /// (but uses an in-memory object store) @@ -1340,9 +1340,9 @@ async fn test_vacuum_command() { // NB: we have duplicates here which is expected, see: https://github.com/splitgraph/seafowl/issues/5 let orphans = vec![ FILENAME_1, - "a02146c8f6164a0f59526381549a3a8c752a4aa7de5f073e44904bf95833961e.parquet", + "b3651688b4030de7ba4afa294c367792dd01916f12b2c7df620f4b8772427650.parquet", FILENAME_1, - "824f53285c216022db3ae5e07f032cec9f77d9598b0321cec7c03a23f6d36e87.parquet", + "411951cd34bc5465269db28635d45172f0062b21def0e6980271072b0d25100d.parquet", ]; assert_orphan_partitions(context.clone(), orphans.clone()).await; From 38c0568e58e7c0f27d46d2d1cb0c35d92f4e2cfd Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 26 Oct 2022 13:19:56 +0100 Subject: [PATCH 06/12] Add new `df_settings` table to expected output --- tests/end_to_end.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/end_to_end.rs b/tests/end_to_end.rs index cf3cd470..da619e41 100644 --- a/tests/end_to_end.rs +++ b/tests/end_to_end.rs @@ -304,6 +304,7 @@ async fn test_information_schema() { "| table_catalog | table_schema | table_name | table_type |", "+---------------+--------------------+----------------+------------+", "| default | information_schema | columns | VIEW |", + "| default | information_schema | df_settings | VIEW |", "| default | system | table_versions | VIEW |", "| default | information_schema | tables | VIEW |", "| default | information_schema | views | VIEW |", @@ -674,6 +675,7 @@ async fn test_create_table_move_and_drop() { "| table_schema | table_name |", "+--------------------+----------------+", "| information_schema | columns |", + "| information_schema | df_settings |", "| information_schema | tables |", "| information_schema | views |", "| public | test_table_2 |", @@ -719,6 +721,7 @@ async fn test_create_table_move_and_drop() { "| table_schema | table_name |", "+--------------------+----------------+", "| information_schema | columns |", + "| information_schema | df_settings |", "| information_schema | tables |", "| information_schema | views |", "| new_schema | test_table_3 |", @@ -812,6 +815,7 @@ async fn test_create_table_drop_schema() { "| table_schema | table_name |", "+--------------------+----------------+", "| information_schema | columns |", + "| information_schema | df_settings |", "| information_schema | tables |", "| information_schema | views |", "| new_schema | test_table_2 |", @@ -834,6 +838,7 @@ async fn test_create_table_drop_schema() { "| table_schema | table_name |", "+--------------------+----------------+", "| information_schema | columns |", + "| information_schema | df_settings |", "| information_schema | tables |", "| information_schema | views |", "| new_schema | test_table_2 |", @@ -855,6 +860,7 @@ async fn test_create_table_drop_schema() { "| table_schema | table_name |", "+--------------------+----------------+", "| information_schema | columns |", + "| information_schema | df_settings |", "| information_schema | tables |", "| information_schema | views |", "| system | table_versions |", @@ -885,6 +891,7 @@ async fn test_create_table_drop_schema() { "| table_schema | table_name |", "+--------------------+----------------+", "| information_schema | columns |", + "| information_schema | df_settings |", "| information_schema | tables |", "| information_schema | views |", "| public | test_table_1 |", @@ -1176,6 +1183,7 @@ async fn test_create_external_table_http() { "| table_schema | table_name |", "+--------------------+----------------+", "| information_schema | columns |", + "| information_schema | df_settings |", "| information_schema | tables |", "| information_schema | views |", "| staging | file |", @@ -1917,6 +1925,7 @@ async fn test_table_time_travel() { "| table_schema | table_name |", "+--------------------+----------------+", "| information_schema | columns |", + "| information_schema | df_settings |", "| information_schema | tables |", "| information_schema | views |", "| public | test_table |", From 7a61a7c36d5811eab0f7c10e9d77ea04e1687f25 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 26 Oct 2022 18:18:08 +0100 Subject: [PATCH 07/12] Include `UPDATE`/`DELETE` in the query optimizer Make the `Update`/`Delete` nodes expose `inputs` and `expressions` in order to let the DF query optimizer work on the `WHERE ...` / `SET col = expr` expressions. This is slightly hacky: - as an "input", we return a `TableScan` node that we don't use after that (this is just so that the optimizer knows the input schema for all the expressions) - return the expressions used by the node and add code to pack/unpack them into a list The point of this is to let DataFusion run the `TypeCoercion` optimization, without which something like `WHERE float_col > 42` will raise an error (as after DF 13 these type coercions got removed from other places and moved into optimizations) (NB this doesn't work yet, we still get type coercion errors) --- src/context.rs | 30 +++++++++++---- src/nodes.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 117 insertions(+), 14 deletions(-) diff --git a/src/context.rs b/src/context.rs index 9aaa43eb..bca23665 100644 --- a/src/context.rs +++ b/src/context.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use base64::decode; use bytes::BytesMut; -use datafusion::datasource::TableProvider; +use datafusion::datasource::{provider_as_source, TableProvider}; use datafusion::sql::ResolvedTableReference; use itertools::Itertools; use object_store::local::LocalFileSystem; @@ -74,7 +74,7 @@ use datafusion_expr::logical_plan::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable, Extension, LogicalPlan, Projection, }; -use datafusion_expr::{cast, Expr}; +use datafusion_expr::{cast, Expr, LogicalPlanBuilder}; use log::{debug, info, warn}; use prost::Message; use tempfile::TempPath; @@ -1061,7 +1061,7 @@ impl SeafowlContext for DefaultSeafowlContext { // Get the actual table schema, since DF needs to validate unqualified columns // (i.e. ones referenced only by column name, lacking the relation name) let table_name = name.to_string(); - let seafowl_table = self.try_get_seafowl_table(&table_name)?; + let seafowl_table = Arc::new(self.try_get_seafowl_table(&table_name)?); let table_schema = seafowl_table.schema.arrow_schema.clone().to_dfschema()?; let selection_expr = match selection { @@ -1078,9 +1078,13 @@ impl SeafowlContext for DefaultSeafowlContext { Ok(LogicalPlan::Extension(Extension { node: Arc::new(SeafowlExtensionNode::Update(Update { - table: Arc::new(seafowl_table), + table: seafowl_table.clone(), + table_plan: Arc::new(LogicalPlanBuilder::scan(table_name, + provider_as_source(seafowl_table), + None, + )?.build()?), selection: selection_expr, - assignments: HashMap::from_iter(assignment_exprs), + assignments: assignment_exprs, output_schema: Arc::new(DFSchema::empty()) })), })) @@ -1093,7 +1097,7 @@ impl SeafowlContext for DefaultSeafowlContext { // Get the actual table schema, since DF needs to validate unqualified columns // (i.e. ones referenced only by column name, lacking the relation name) let table_name = table_name.to_string(); - let seafowl_table = self.try_get_seafowl_table(&table_name)?; + let seafowl_table = Arc::new(self.try_get_seafowl_table(&table_name)?); let table_schema = seafowl_table.schema.arrow_schema.clone().to_dfschema()?; let selection_expr = match selection { @@ -1103,7 +1107,12 @@ impl SeafowlContext for DefaultSeafowlContext { Ok(LogicalPlan::Extension(Extension { node: Arc::new(SeafowlExtensionNode::Delete(Delete { - table: Arc::new(seafowl_table), + table: seafowl_table.clone(), + table_plan: Arc::new(LogicalPlanBuilder::scan(table_name, + provider_as_source(seafowl_table), + None, + )? + .build()?), selection: selection_expr, output_schema: Arc::new(DFSchema::empty()) })), @@ -1385,10 +1394,15 @@ impl SeafowlContext for DefaultSeafowlContext { let mut final_partition_ids = Vec::with_capacity(partitions.len()); + + // Deduplicate assignments (we have to keep them as a vector in order + // to keep the order of column name -> expression mapping) + let assignment_map = HashMap::from_iter(assignments.clone()); + let mut update_plan: Arc; let project_expressions = project_expressions( &schema, - assignments, + &assignment_map, selection_expr, )?; diff --git a/src/nodes.rs b/src/nodes.rs index a64f2bff..d7dde5ef 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -1,5 +1,5 @@ use datafusion::common::DFSchemaRef; -use std::collections::HashMap; + use std::{any::Any, fmt, sync::Arc, vec}; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; @@ -36,8 +36,10 @@ pub struct Update { pub table: Arc, /// WHERE clause pub selection: Option, + /// Subplan for a table scan without a WHERE clause (used by the query optimizer) + pub table_plan: Arc, /// Columns to update - pub assignments: HashMap, + pub assignments: Vec<(String, Expr)>, /// Dummy result schema for the plan (empty) pub output_schema: DFSchemaRef, } @@ -46,6 +48,8 @@ pub struct Update { pub struct Delete { /// The table to delete from pub table: Arc, + /// Subplan for a table scan without a WHERE clause (used by the query optimizer) + pub table_plan: Arc, /// WHERE clause pub selection: Option, /// Dummy result schema for the plan (empty) @@ -116,7 +120,16 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { fn inputs(&self) -> Vec<&LogicalPlan> { match self { SeafowlExtensionNode::Insert(Insert { input, .. }) => vec![input.as_ref()], - // TODO Update/Delete will probably have children + + // For UPDATE/DELETE, the optimizer needs to know the schema of the "input" (in this + // case, our tables we're updating) in order to optimize this node's expressions (in our + // case, the WHERE ... and the SET col = expr clauses). + SeafowlExtensionNode::Update(Update { table_plan, .. }) => { + vec![table_plan.as_ref()] + } + SeafowlExtensionNode::Delete(Delete { table_plan, .. }) => { + vec![table_plan.as_ref()] + } _ => vec![], } } @@ -151,7 +164,23 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { // NB: this is used by the plan optimizer (gets expressions(), optimizes them, // calls from_template(optimized_exprs) and we'll need to expose our expressions here // and support from_template for a given node if we want them to be optimized. - vec![] + match self { + // For UPDATEs, we pack a list of all SET col = expr expressions and append + // the WHERE clause expression at the end, if it exists. + SeafowlExtensionNode::Update(Update { + assignments, + selection, + .. + }) => assignments + .iter() + .map(|(_, e)| e.clone()) + .chain(selection.clone().into_iter()) + .collect::>(), + SeafowlExtensionNode::Delete(Delete { selection, .. }) => { + selection.clone().into_iter().collect::>() + } + _ => vec![], + } } fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -191,11 +220,10 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { fn from_template( &self, - _exprs: &[Expr], + exprs: &[Expr], inputs: &[LogicalPlan], ) -> Arc { match self { - // This is the only node for which we return `inputs` in inputs() SeafowlExtensionNode::Insert(Insert { table, input, @@ -208,6 +236,67 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { }, output_schema: output_schema.clone(), })), + + SeafowlExtensionNode::Update(Update { + table, + selection, + table_plan, + assignments, + output_schema, + }) => { + // Defensive assertion to make sure that DataFusion gave us back the correct number + // of expressions. + let expected_len = + assignments.len() + (if selection.is_some() { 1 } else { 0 }); + if exprs.len() != expected_len { + // DataFusion doesn't let us give back an Error and this really shouldn't + // happen. Other alternatives (like partially initializing the node) might hide + // errors downstream, so panic here instead. + panic!("DataFusion optimizer returned incorrect number of expressions. Expected {}, got {}", expected_len, exprs.len()) + }; + + Arc::new(SeafowlExtensionNode::Update(Update { + // We ignore the optimized "inputs" in this case (it's just a TableScan without + // filters) and keep our old one + table: table.clone(), + table_plan: table_plan.clone(), + output_schema: output_schema.clone(), + + // Unpack the assignments and the selection expression + assignments: assignments + .iter() + .zip(exprs.iter()) + .map(|((col, _), new_expr)| (col.clone(), new_expr.clone())) + .collect(), + // If we have a selection expression in this node, the last entry in the list is + // the optimized expression + selection: if selection.is_none() { + None + } else { + exprs.get(assignments.len() + 1).cloned() + }, + })) + } + + SeafowlExtensionNode::Delete(Delete { + table, + table_plan, + selection, + output_schema, + }) => Arc::new(SeafowlExtensionNode::Delete(Delete { + table: table.clone(), + table_plan: table_plan.clone(), + output_schema: output_schema.clone(), + selection: if selection.is_none() { + None + } else { + let e = exprs.first(); + if e.is_none() { + panic!("DataFusion optimizer returned incorrect number of expressions. Expected 1, got 0"); + } + e.cloned() + }, + })), _ => Arc::from(self.clone()), } } From 0c6ece6f5247bfe79f31757fcf24f34ee44f54cb Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 27 Oct 2022 13:46:54 +0100 Subject: [PATCH 08/12] Run the query optimizer for UPDATE/DELETE (normally it's run only by DataFusion's `create_physical_plan`, but we don't run that, so we have to execute it manually to get auto type coercion working) --- src/context.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/context.rs b/src/context.rs index bca23665..39707334 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1076,7 +1076,7 @@ impl SeafowlContext for DefaultSeafowlContext { )) }).collect::>>()?; - Ok(LogicalPlan::Extension(Extension { + let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(SeafowlExtensionNode::Update(Update { table: seafowl_table.clone(), table_plan: Arc::new(LogicalPlanBuilder::scan(table_name, @@ -1087,7 +1087,11 @@ impl SeafowlContext for DefaultSeafowlContext { assignments: assignment_exprs, output_schema: Arc::new(DFSchema::empty()) })), - })) + }); + + // Run the optimizer in order to apply required transformations to the query plan + // (e.g. type coercions for the WHERE clause) + self.inner.optimize(&logical_plan) } Statement::Delete { table_name, @@ -1105,7 +1109,7 @@ impl SeafowlContext for DefaultSeafowlContext { Some(expr) => Some(query_planner.sql_to_rex(expr, &table_schema, &mut HashMap::new())?), }; - Ok(LogicalPlan::Extension(Extension { + let logical_plan = LogicalPlan::Extension(Extension { node: Arc::new(SeafowlExtensionNode::Delete(Delete { table: seafowl_table.clone(), table_plan: Arc::new(LogicalPlanBuilder::scan(table_name, @@ -1116,7 +1120,11 @@ impl SeafowlContext for DefaultSeafowlContext { selection: selection_expr, output_schema: Arc::new(DFSchema::empty()) })), - })) + }); + + // Run the optimizer in order to apply required transformations to the query plan + // (e.g. type coercions for the WHERE clause) + self.inner.optimize(&logical_plan) }, Statement::CreateFunction { temporary: false, From 2a27fc2db7332d470bee4844d70e29d3f3ff0664 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 27 Oct 2022 13:48:02 +0100 Subject: [PATCH 09/12] Simplify `expressions()` for `Delete` --- src/nodes.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/nodes.rs b/src/nodes.rs index d7dde5ef..4be09fce 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -176,9 +176,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { .map(|(_, e)| e.clone()) .chain(selection.clone().into_iter()) .collect::>(), - SeafowlExtensionNode::Delete(Delete { selection, .. }) => { - selection.clone().into_iter().collect::>() - } + SeafowlExtensionNode::Delete(Delete { + selection: Some(e), .. + }) => vec![e.clone()], _ => vec![], } } From b4cfc90ff4d4b7ea000c403c4616e108770483d7 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 27 Oct 2022 13:48:38 +0100 Subject: [PATCH 10/12] Add more verbose plan output to `Update`/`Delete` Include `SET` expressions and the predicate if it exists to aid debugging. --- src/nodes.rs | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/nodes.rs b/src/nodes.rs index 4be09fce..5ec5818f 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -191,12 +191,41 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { SeafowlExtensionNode::CreateTable(CreateTable { name, .. }) => { write!(f, "Create: {}", name) } - SeafowlExtensionNode::Update(Update { table, .. }) => { - write!(f, "Update: {}", table.name) + SeafowlExtensionNode::Update(Update { + table, + assignments, + selection, + .. + }) => { + write!( + f, + "Update: {}, SET: {}", + table.name, + assignments + .iter() + .map(|(c, e)| format!("{} = {}", c, e)) + .collect::>() + .join(", ") + )?; + if let Some(s) = selection { + write!(f, " WHERE {}", s)?; + } + Ok(()) } - SeafowlExtensionNode::Delete(Delete { table, .. }) => { + SeafowlExtensionNode::Delete(Delete { + table, + selection: None, + .. + }) => { write!(f, "Delete: {}", table.name) } + SeafowlExtensionNode::Delete(Delete { + table, + selection: Some(e), + .. + }) => { + write!(f, "Delete: {} WHERE {}", table.name, e) + } SeafowlExtensionNode::CreateFunction(CreateFunction { name, .. }) => { write!(f, "CreateFunction: {}", name) } From 32eabcc210378d5406abb082d4697a760a997a88 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 27 Oct 2022 13:51:17 +0100 Subject: [PATCH 11/12] Remove aliases from optimized `Update`/`Delete`s These expressions are similar to what DataFusion uses in the `Filter` node and not doing this seems to break partition pruning (perhaps it stops at the `Alias` node and doesn't prone anything, didn't investigate in depth). Copy the `ExprRewriter` visitor from https://github.com/apache/arrow-datafusion/blob/c50573939d21de40e591c04915d41f7c46a51d0d/datafusion/expr/src/utils.rs#L384-L428 and adapt it to remove aliases from all expressions that the query optimizer gives back to `Update`/`Delete` nodes. --- src/nodes.rs | 43 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/src/nodes.rs b/src/nodes.rs index 5ec5818f..ed37aa0a 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -1,4 +1,6 @@ use datafusion::common::DFSchemaRef; +use datafusion::error::DataFusionError; +use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}; use std::{any::Any, fmt, sync::Arc, vec}; @@ -112,6 +114,40 @@ impl SeafowlExtensionNode { } } +// Code for removing aliases in DELETE/UPDATE predicates +// Copied from `datafusion_expr::utils::from_plan` for Filter +// The query optimizer adds aliases to rewritten expressions in order +// to make them keep the same names. This is not needed in filters and, in our +// case, breaks partition pruning, which makes updates/deletes less efficient. + +// To fix this, we remove all aliases in all expressions used by UPDATE/DELETE +struct RemoveAliases {} + +impl ExprRewriter for RemoveAliases { + fn pre_visit(&mut self, expr: &Expr) -> Result { + match expr { + Expr::Exists { .. } | Expr::ScalarSubquery(_) | Expr::InSubquery { .. } => { + // subqueries could contain aliases so we don't recurse into those + Ok(RewriteRecursion::Stop) + } + Expr::Alias(_, _) => Ok(RewriteRecursion::Mutate), + _ => Ok(RewriteRecursion::Continue), + } + } + + fn mutate(&mut self, expr: Expr) -> Result { + Ok(expr.unalias()) + } +} + +fn remove_aliases(predicate: Expr) -> Expr { + let mut remove_aliases = RemoveAliases {}; + // NB we can't propagate errors in our logical nodes' from_template + // (unlike vanilla DataFusion's from_plan), so we have to cross our fingers + // and panic if something went wrong during the rewrite. + predicate.rewrite(&mut remove_aliases).unwrap() +} + impl UserDefinedLogicalNode for SeafowlExtensionNode { fn as_any(&self) -> &dyn Any { self @@ -284,6 +320,9 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { panic!("DataFusion optimizer returned incorrect number of expressions. Expected {}, got {}", expected_len, exprs.len()) }; + let exprs: Vec = + exprs.iter().map(|e| remove_aliases(e.clone())).collect(); + Arc::new(SeafowlExtensionNode::Update(Update { // We ignore the optimized "inputs" in this case (it's just a TableScan without // filters) and keep our old one @@ -302,7 +341,7 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { selection: if selection.is_none() { None } else { - exprs.get(assignments.len() + 1).cloned() + exprs.get(assignments.len()).cloned() }, })) } @@ -323,7 +362,7 @@ impl UserDefinedLogicalNode for SeafowlExtensionNode { if e.is_none() { panic!("DataFusion optimizer returned incorrect number of expressions. Expected 1, got 0"); } - e.cloned() + e.cloned().map(remove_aliases) }, })), _ => Arc::from(self.clone()), From 78770705e8e6c341478a199a060770932d6df769 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 27 Oct 2022 13:53:04 +0100 Subject: [PATCH 12/12] Assert the query plan in update/delete tests Make sure the constants are correctly cast and let us detect changes to the optimizer faster with new DF updates. --- tests/end_to_end.rs | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/tests/end_to_end.rs b/tests/end_to_end.rs index da619e41..a023736c 100644 --- a/tests/end_to_end.rs +++ b/tests/end_to_end.rs @@ -1385,6 +1385,18 @@ async fn test_delete_statement() { create_table_and_some_partitions(&context, "test_table", None).await; + // Check DELETE's query plan to make sure 46 (int) gets cast to a float value + // by the optimizer + // (NB: EXPLAIN isn't supported for user-defined nodes) + let plan = context + .create_logical_plan("DELETE FROM test_table WHERE some_value > 46") + .await + .unwrap(); + assert_eq!( + format!("{}", plan.display()), + "Delete: test_table WHERE some_value > Float32(46)" + ); + // // Execute DELETE affecting partitions 2, 3 and creating table_version 6 // @@ -1578,13 +1590,21 @@ async fn test_update_statement() { create_table_and_some_partitions(&context, "test_table", None).await; + // Check the UPDATE query plan to make sure IN (41, 42, 43) (int) get cast to a float value + let query = "UPDATE test_table + SET some_time = '2022-01-01 21:21:21Z', some_int_value = 5555, some_value = some_value - 10 + WHERE some_value IN (41, 42, 43)"; + + let plan = context.create_logical_plan(query).await.unwrap(); + assert_eq!( + format!("{}", plan.display()), + "Update: test_table, SET: some_time = Utf8(\"2022-01-01 21:21:21Z\"), some_int_value = Int64(5555), some_value = some_value - Float32(10) WHERE some_value IN ([Float32(41), Float32(42), Float32(43)])" + ); + // // Execute UPDATE with a selection, affecting partitions 1 and 4, and creating table_version 6 // - let plan = context - .plan_query("UPDATE test_table SET some_time = '2022-01-01 21:21:21Z', some_int_value = 5555, some_value = some_value - 10 WHERE some_value IN (41, 42, 43)") - .await - .unwrap(); + let plan = context.plan_query(query).await.unwrap(); context.collect(plan).await.unwrap(); assert_partition_ids(&context, 6, vec![2, 3, 5, 6]).await;