diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index f46bbaee92f0b..462d49b5333e2 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -646,6 +655,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "backtrace" +version = "0.3.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.21.2" @@ -668,6 +692,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" + [[package]] name = "blake2" version = "0.10.6" @@ -812,9 +842,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9cc2b23599e6d7479755f3594285efb3f74a1bdca7a7374948bc831e23a552" +checksum = "f1369bc6b9e9a7dfdae2055f6ec151fe9c554a9d23d357c0237cee2e25eaabb7" dependencies = [ "chrono", "chrono-tz-build", @@ -823,9 +853,9 @@ dependencies = [ [[package]] name = "chrono-tz-build" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9998fb9f7e9b2111641485bf8beb32f92945f97f92a3d061f744cfef335f751" +checksum = "e2f5ebdc942f57ed96d560a6d1a459bae5851102a25d5bf89dc04ae453e31ecf" dependencies = [ "parse-zoneinfo", "phf", @@ -839,7 +869,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "atty", - "bitflags", + "bitflags 1.3.2", "clap_derive", "clap_lex", "indexmap 1.9.3", @@ -994,9 +1024,9 @@ dependencies = [ [[package]] name = "ctor" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1586fa608b1dab41f667475b4a41faec5ba680aee428bfa5de4ea520fdc6e901" +checksum = "eed5fff0d93c7559121e9c72bf9c242295869396255071ff2cb1617147b608c5" dependencies = [ "quote", "syn 2.0.22", @@ -1153,6 +1183,7 @@ dependencies = [ "arrow-array", "arrow-buffer", "arrow-schema", + "base64", "blake2", "blake3", "chrono", @@ -1161,6 +1192,7 @@ dependencies = [ "datafusion-row", "half", "hashbrown 0.14.0", + "hex", "indexmap 2.0.0", "itertools 0.11.0", "lazy_static", @@ -1343,12 +1375,12 @@ dependencies = [ [[package]] name = "fd-lock" -version = "3.0.12" +version = "3.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39ae6b3d9530211fb3b12a95374b8b0823be812f53d09e18c5675c0146b09642" +checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5" dependencies = [ "cfg-if", - "rustix", + "rustix 0.38.1", "windows-sys 0.48.0", ] @@ -1364,7 +1396,7 @@ version = "23.5.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" dependencies = [ - "bitflags", + "bitflags 1.3.2", "rustc_version", ] @@ -1518,6 +1550,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" + [[package]] name = "glob" version = "0.3.1" @@ -1585,15 +1623,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hermit-abi" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.3.1" @@ -1932,6 +1961,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" +[[package]] +name = "linux-raw-sys" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" + [[package]] name = "lock_api" version = "0.4.10" @@ -2044,7 +2079,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "libc", "static_assertions", @@ -2135,14 +2170,23 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.2.6", + "hermit-abi 0.3.1", "libc", ] +[[package]] +name = "object" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1" +dependencies = [ + "memchr", +] + [[package]] name = "object_store" version = "0.6.1" @@ -2334,18 +2378,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead" +checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" +checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", @@ -2354,9 +2398,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" [[package]] name = "pin-utils" @@ -2458,9 +2502,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.28" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" dependencies = [ "proc-macro2", ] @@ -2511,7 +2555,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -2520,7 +2564,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -2639,6 +2683,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustc_version" version = "0.4.0" @@ -2650,15 +2700,28 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.20" +version = "0.37.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0" +checksum = "62f25693a73057a1b4cb56179dd3c7ea21a7c6c5ee7d85781f5749b46f34b79c" dependencies = [ - "bitflags", + "bitflags 1.3.2", "errno", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + +[[package]] +name = "rustix" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc6396159432b5c8490d4e301d8c705f61860b8b6c863bf79942ce5401968f3" +dependencies = [ + "bitflags 2.3.3", + "errno", + "libc", + "linux-raw-sys 0.4.3", "windows-sys 0.48.0", ] @@ -2700,9 +2763,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ "base64", ] @@ -2729,7 +2792,7 @@ version = "11.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfc8644681285d1fb67a467fb3021bfea306b99b4146b166a1fe3ada965eece" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "clipboard-win", "dirs-next", @@ -2792,7 +2855,7 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fc758eb7bffce5b308734e9b0c1468893cae9ff70ebf13e7090be8dcbcc83a8" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -3058,7 +3121,7 @@ dependencies = [ "cfg-if", "fastrand", "redox_syscall 0.3.5", - "rustix", + "rustix 0.37.21", "windows-sys 0.48.0", ] @@ -3166,11 +3229,12 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.2" +version = "1.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" dependencies = [ "autocfg", + "backtrace", "bytes", "libc", "mio", @@ -3613,9 +3677,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.48.0" +version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ "windows_aarch64_gnullvm 0.48.0", "windows_aarch64_msvc 0.48.0", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 4682194703a49..4e0e2fde5f062 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -38,10 +38,11 @@ path = "src/lib.rs" avro = ["apache-avro", "num-traits", "datafusion-common/avro"] compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "compression"] +default = ["crypto_expressions", "encoding__expressions", "regex_expressions", "unicode_expressions", "compression"] # Enables support for non-scalar, binary operations on dictionaries # Note: this results in significant additional codegen dictionary_expressions = ["datafusion-physical-expr/dictionary_expressions", "datafusion-optimizer/dictionary_expressions"] +encoding__expressions = ["datafusion-physical-expr/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] pyarrow = ["datafusion-common/pyarrow"] diff --git a/datafusion/core/tests/sql/expr.rs b/datafusion/core/tests/sql/expr.rs index 711f10cef2536..5444b3a88f05d 100644 --- a/datafusion/core/tests/sql/expr.rs +++ b/datafusion/core/tests/sql/expr.rs @@ -63,6 +63,66 @@ async fn test_mathematical_expressions_with_null() -> Result<()> { Ok(()) } +#[tokio::test] +#[cfg_attr(not(feature = "crypto_expressions"), ignore)] +async fn test_encoding_expressions() -> Result<()> { + // Input Utf8 + test_expression!("encode('tom','base64')", "dG9t"); + test_expression!("arrow_cast(decode('dG9t','base64'), 'Utf8')", "tom"); + test_expression!("encode('tom','hex')", "746f6d"); + test_expression!("arrow_cast(decode('746f6d','hex'), 'Utf8')", "tom"); + + // Input LargeUtf8 + test_expression!("encode(arrow_cast('tom', 'LargeUtf8'),'base64')", "dG9t"); + test_expression!( + "arrow_cast(decode(arrow_cast('dG9t', 'LargeUtf8'),'base64'), 'Utf8')", + "tom" + ); + test_expression!("encode(arrow_cast('tom', 'LargeUtf8'),'hex')", "746f6d"); + test_expression!( + "arrow_cast(decode(arrow_cast('746f6d', 'LargeUtf8'),'hex'), 'Utf8')", + "tom" + ); + + // Input Binary + test_expression!("encode(arrow_cast('tom', 'Binary'),'base64')", "dG9t"); + test_expression!( + "arrow_cast(decode(arrow_cast('dG9t', 'Binary'),'base64'), 'Utf8')", + "tom" + ); + test_expression!("encode(arrow_cast('tom', 'Binary'),'hex')", "746f6d"); + test_expression!( + "arrow_cast(decode(arrow_cast('746f6d', 'Binary'),'hex'), 'Utf8')", + "tom" + ); + + // Input LargeBinary + test_expression!("encode(arrow_cast('tom', 'LargeBinary'),'base64')", "dG9t"); + test_expression!( + "arrow_cast(decode(arrow_cast('dG9t', 'LargeBinary'),'base64'), 'Utf8')", + "tom" + ); + test_expression!("encode(arrow_cast('tom', 'LargeBinary'),'hex')", "746f6d"); + test_expression!( + "arrow_cast(decode(arrow_cast('746f6d', 'LargeBinary'),'hex'), 'Utf8')", + "tom" + ); + + // NULL + test_expression!("encode(NULL,'base64')", "NULL"); + test_expression!("decode(NULL,'base64')", "NULL"); + test_expression!("encode(NULL,'hex')", "NULL"); + test_expression!("decode(NULL,'hex')", "NULL"); + + // Empty string + test_expression!("encode('','base64')", ""); + test_expression!("decode('','base64')", ""); + test_expression!("encode('','hex')", ""); + test_expression!("decode('','hex')", ""); + + Ok(()) +} + #[tokio::test] #[cfg_attr(not(feature = "crypto_expressions"), ignore)] async fn test_crypto_expressions() -> Result<()> { diff --git a/datafusion/core/tests/sqllogictests/test_files/encoding.slt b/datafusion/core/tests/sqllogictests/test_files/encoding.slt new file mode 100644 index 0000000000000..b16ceebd6debf --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/encoding.slt @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +CREATE TABLE test( + num INT, + bin_field BYTEA, + base64_field TEXT, + hex_field TEXT, +) as VALUES + (0, 'abc', encode('abc', 'base64'), encode('abc', 'hex')), + (1, 'qweqwe', encode('qweqwe', 'base64'), encode('qweqwe', 'hex')), + (2, NULL, NULL, NULL) +; + +# Arrays tests +query T +SELECT encode(bin_field, 'hex') FROM test ORDER BY num; +---- +616263 +717765717765 +NULL + +query T +SELECT arrow_cast(decode(base64_field, 'base64'), 'Utf8') FROM test ORDER BY num; +---- +abc +qweqwe +NULL + +query T +SELECT arrow_cast(decode(hex_field, 'hex'), 'Utf8') FROM test ORDER BY num; +---- +abc +qweqwe +NULL diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 2eaa2792b9db8..69054622757d6 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -63,10 +63,14 @@ pub enum BuiltinScalarFunction { Cos, /// cos Cosh, + /// Decode + Decode, /// degrees Degrees, /// Digest Digest, + /// Encode + Encode, /// exp Exp, /// factorial @@ -298,7 +302,9 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Coalesce => Volatility::Immutable, BuiltinScalarFunction::Cos => Volatility::Immutable, BuiltinScalarFunction::Cosh => Volatility::Immutable, + BuiltinScalarFunction::Decode => Volatility::Immutable, BuiltinScalarFunction::Degrees => Volatility::Immutable, + BuiltinScalarFunction::Encode => Volatility::Immutable, BuiltinScalarFunction::Exp => Volatility::Immutable, BuiltinScalarFunction::Factorial => Volatility::Immutable, BuiltinScalarFunction::Floor => Volatility::Immutable, @@ -626,6 +632,32 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Digest => { utf8_or_binary_to_binary_type(&input_expr_types[0], "digest") } + BuiltinScalarFunction::Encode => Ok(match input_expr_types[0] { + Utf8 => Utf8, + LargeUtf8 => LargeUtf8, + Binary => Utf8, + LargeBinary => LargeUtf8, + Null => Null, + _ => { + // this error is internal as `data_types` should have captured this. + return Err(DataFusionError::Internal( + "The encode function can only accept utf8 or binary.".to_string(), + )); + } + }), + BuiltinScalarFunction::Decode => Ok(match input_expr_types[0] { + Utf8 => Binary, + LargeUtf8 => LargeBinary, + Binary => Binary, + LargeBinary => LargeBinary, + Null => Null, + _ => { + // this error is internal as `data_types` should have captured this. + return Err(DataFusionError::Internal( + "The decode function can only accept utf8 or binary.".to_string(), + )); + } + }), BuiltinScalarFunction::SplitPart => { utf8_to_str_type(&input_expr_types[0], "split_part") } @@ -895,6 +927,24 @@ impl BuiltinScalarFunction { ], self.volatility(), ), + BuiltinScalarFunction::Encode => Signature::one_of( + vec![ + Exact(vec![Utf8, Utf8]), + Exact(vec![LargeUtf8, Utf8]), + Exact(vec![Binary, Utf8]), + Exact(vec![LargeBinary, Utf8]), + ], + self.volatility(), + ), + BuiltinScalarFunction::Decode => Signature::one_of( + vec![ + Exact(vec![Utf8, Utf8]), + Exact(vec![LargeUtf8, Utf8]), + Exact(vec![Binary, Utf8]), + Exact(vec![LargeBinary, Utf8]), + ], + self.volatility(), + ), BuiltinScalarFunction::DateTrunc => Signature::one_of( vec![ Exact(vec![Utf8, Timestamp(Nanosecond, None)]), @@ -1175,6 +1225,10 @@ fn aliases(func: &BuiltinScalarFunction) -> &'static [&'static str] { BuiltinScalarFunction::SHA384 => &["sha384"], BuiltinScalarFunction::SHA512 => &["sha512"], + // encode/decode + BuiltinScalarFunction::Encode => &["encode"], + BuiltinScalarFunction::Decode => &["decode"], + // other functions BuiltinScalarFunction::Struct => &["struct"], BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"], diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index a45cf0febaa0d..480ea5d60890b 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -636,6 +636,8 @@ scalar_expr!( "converts the Unicode code point to a UTF8 character" ); scalar_expr!(Digest, digest, input algorithm, "compute the binary hash of `input`, using the `algorithm`"); +scalar_expr!(Encode, encode, input encoding, "encode the `input`, using the `encoding`. encoding can be base64 or hex"); +scalar_expr!(Decode, decode, input encoding, "decode the`input`, using the `encoding`. encoding can be base64 or hex"); scalar_expr!(InitCap, initcap, string, "converts the first letter of each word in `string` in uppercase and the remaining characters in lowercase"); scalar_expr!(Left, left, string n, "returns the first `n` characters in the `string`"); scalar_expr!(Lower, lower, string, "convert the string to lower case"); @@ -942,6 +944,8 @@ mod test { test_scalar_expr!(CharacterLength, character_length, string); test_scalar_expr!(Chr, chr, string); test_scalar_expr!(Digest, digest, string, algorithm); + test_scalar_expr!(Encode, encode, string, encoding); + test_scalar_expr!(Decode, decode, string, encoding); test_scalar_expr!(Gcd, gcd, arg_1, arg_2); test_scalar_expr!(Lcm, lcm, arg_1, arg_2); test_scalar_expr!(InitCap, initcap, string); @@ -1036,4 +1040,30 @@ mod test { unreachable!(); } } + + #[test] + fn encode_function_definitions() { + if let Expr::ScalarFunction(ScalarFunction { fun, args }) = + encode(col("tableA.a"), lit("base64")) + { + let name = BuiltinScalarFunction::Encode; + assert_eq!(name, fun); + assert_eq!(2, args.len()); + } else { + unreachable!(); + } + } + + #[test] + fn decode_function_definitions() { + if let Expr::ScalarFunction(ScalarFunction { fun, args }) = + decode(col("tableA.a"), lit("hex")) + { + let name = BuiltinScalarFunction::Decode; + assert_eq!(name, fun); + assert_eq!(2, args.len()); + } else { + unreachable!(); + } + } } diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 04ba2b9e38722..11f677abdc781 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -34,10 +34,11 @@ path = "src/lib.rs" [features] crypto_expressions = ["md-5", "sha2", "blake2", "blake3"] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] +default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "encoding_expressions"] # Enables support for non-scalar, binary operations on dictionaries # Note: this results in significant additional codegen dictionary_expressions = ["arrow/dyn_cmp_dict"] +encoding_expressions = ["base64", "hex"] regex_expressions = ["regex"] unicode_expressions = ["unicode-segmentation"] @@ -47,6 +48,7 @@ arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-schema = { workspace = true } +base64 = { version = "0.21", optional = true } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } chrono = { version = "0.4.23", default-features = false } @@ -55,6 +57,7 @@ datafusion-expr = { path = "../expr", version = "27.0.0" } datafusion-row = { path = "../row", version = "27.0.0" } half = { version = "2.1", default-features = false } hashbrown = { version = "0.14", features = ["raw"] } +hex = { version = "0.4", optional = true } indexmap = "2.0.0" itertools = { version = "0.11", features = ["use_std"] } lazy_static = { version = "^1.4.0" } diff --git a/datafusion/physical-expr/src/encoding_expressions.rs b/datafusion/physical-expr/src/encoding_expressions.rs new file mode 100644 index 0000000000000..e8b4331e92984 --- /dev/null +++ b/datafusion/physical-expr/src/encoding_expressions.rs @@ -0,0 +1,340 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encoding expressions + +use arrow::{ + array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait, StringArray}, + datatypes::DataType, +}; +use base64::{engine::general_purpose, Engine as _}; +use datafusion_common::cast::{as_generic_binary_array, as_generic_string_array}; +use datafusion_common::ScalarValue; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::ColumnarValue; +use std::sync::Arc; +use std::{fmt, str::FromStr}; + +#[derive(Debug, Copy, Clone)] +enum Encoding { + Base64, + Hex, +} + +fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result { + match value { + ColumnarValue::Array(a) => match a.data_type() { + DataType::Utf8 => encoding.encode_utf8_array::(a.as_ref()), + DataType::LargeUtf8 => encoding.encode_utf8_array::(a.as_ref()), + DataType::Binary => encoding.encode_binary_array::(a.as_ref()), + DataType::LargeBinary => encoding.encode_binary_array::(a.as_ref()), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {other:?} for function encode({encoding})", + ))), + }, + ColumnarValue::Scalar(scalar) => { + match scalar { + ScalarValue::Utf8(a) => { + Ok(encoding.encode_scalar(a.as_ref().map(|s: &String| s.as_bytes()))) + } + ScalarValue::LargeUtf8(a) => Ok(encoding + .encode_large_scalar(a.as_ref().map(|s: &String| s.as_bytes()))), + ScalarValue::Binary(a) => Ok( + encoding.encode_scalar(a.as_ref().map(|v: &Vec| v.as_slice())) + ), + ScalarValue::LargeBinary(a) => Ok(encoding + .encode_large_scalar(a.as_ref().map(|v: &Vec| v.as_slice()))), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {other:?} for function encode({encoding})", + ))), + } + } + } +} + +fn decode_process(value: &ColumnarValue, encoding: Encoding) -> Result { + match value { + ColumnarValue::Array(a) => match a.data_type() { + DataType::Utf8 => encoding.decode_utf8_array::(a.as_ref()), + DataType::LargeUtf8 => encoding.decode_utf8_array::(a.as_ref()), + DataType::Binary => encoding.decode_binary_array::(a.as_ref()), + DataType::LargeBinary => encoding.decode_binary_array::(a.as_ref()), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {other:?} for function decode({encoding})", + ))), + }, + ColumnarValue::Scalar(scalar) => { + match scalar { + ScalarValue::Utf8(a) => { + encoding.decode_scalar(a.as_ref().map(|s: &String| s.as_bytes())) + } + ScalarValue::LargeUtf8(a) => encoding + .decode_large_scalar(a.as_ref().map(|s: &String| s.as_bytes())), + ScalarValue::Binary(a) => { + encoding.decode_scalar(a.as_ref().map(|v: &Vec| v.as_slice())) + } + ScalarValue::LargeBinary(a) => encoding + .decode_large_scalar(a.as_ref().map(|v: &Vec| v.as_slice())), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {other:?} for function decode({encoding})", + ))), + } + } + } +} + +fn hex_encode(input: &[u8]) -> String { + hex::encode(input) +} + +fn base64_encode(input: &[u8]) -> String { + general_purpose::STANDARD_NO_PAD.encode(input) +} + +fn hex_decode(input: &[u8]) -> Result> { + hex::decode(input).map_err(|e| { + DataFusionError::Internal(format!("Failed to decode from hex: {}", e)) + }) +} + +fn base64_decode(input: &[u8]) -> Result> { + general_purpose::STANDARD_NO_PAD.decode(input).map_err(|e| { + DataFusionError::Internal(format!("Failed to decode from base64: {}", e)) + }) +} + +macro_rules! encode_to_array { + ($METHOD: ident, $INPUT:expr) => {{ + let utf8_array: StringArray = $INPUT + .iter() + .map(|x| x.map(|x| $METHOD(x.as_ref()))) + .collect(); + Arc::new(utf8_array) + }}; +} + +macro_rules! decode_to_array { + ($METHOD: ident, $INPUT:expr) => {{ + let binary_array: BinaryArray = $INPUT + .iter() + .map(|x| x.map(|x| $METHOD(x.as_ref())).transpose()) + .collect::>()?; + Arc::new(binary_array) + }}; +} + +impl Encoding { + fn encode_scalar(self, value: Option<&[u8]>) -> ColumnarValue { + ColumnarValue::Scalar(match self { + Self::Base64 => ScalarValue::Utf8( + value.map(|v| general_purpose::STANDARD_NO_PAD.encode(v)), + ), + Self::Hex => ScalarValue::Utf8(value.map(hex::encode)), + }) + } + + fn encode_large_scalar(self, value: Option<&[u8]>) -> ColumnarValue { + ColumnarValue::Scalar(match self { + Self::Base64 => ScalarValue::LargeUtf8( + value.map(|v| general_purpose::STANDARD_NO_PAD.encode(v)), + ), + Self::Hex => ScalarValue::LargeUtf8(value.map(hex::encode)), + }) + } + + fn encode_binary_array(self, value: &dyn Array) -> Result + where + T: OffsetSizeTrait, + { + let input_value = as_generic_binary_array::(value)?; + let array: ArrayRef = match self { + Self::Base64 => encode_to_array!(base64_encode, input_value), + Self::Hex => encode_to_array!(hex_encode, input_value), + }; + Ok(ColumnarValue::Array(array)) + } + + fn encode_utf8_array(self, value: &dyn Array) -> Result + where + T: OffsetSizeTrait, + { + let input_value = as_generic_string_array::(value)?; + let array: ArrayRef = match self { + Self::Base64 => encode_to_array!(base64_encode, input_value), + Self::Hex => encode_to_array!(hex_encode, input_value), + }; + Ok(ColumnarValue::Array(array)) + } + + fn decode_scalar(self, value: Option<&[u8]>) -> Result { + let value = match value { + Some(value) => value, + None => return Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))), + }; + + let out = match self { + Self::Base64 => { + general_purpose::STANDARD_NO_PAD + .decode(value) + .map_err(|e| { + DataFusionError::Internal(format!( + "Failed to decode value using base64: {}", + e + )) + })? + } + Self::Hex => hex::decode(value).map_err(|e| { + DataFusionError::Internal(format!( + "Failed to decode value using hex: {}", + e + )) + })?, + }; + + Ok(ColumnarValue::Scalar(ScalarValue::Binary(Some(out)))) + } + + fn decode_large_scalar(self, value: Option<&[u8]>) -> Result { + let value = match value { + Some(value) => value, + None => return Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(None))), + }; + + let out = match self { + Self::Base64 => { + general_purpose::STANDARD_NO_PAD + .decode(value) + .map_err(|e| { + DataFusionError::Internal(format!( + "Failed to decode value using base64: {}", + e + )) + })? + } + Self::Hex => hex::decode(value).map_err(|e| { + DataFusionError::Internal(format!( + "Failed to decode value using hex: {}", + e + )) + })?, + }; + + Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(out)))) + } + + fn decode_binary_array(self, value: &dyn Array) -> Result + where + T: OffsetSizeTrait, + { + let input_value = as_generic_binary_array::(value)?; + let array: ArrayRef = match self { + Self::Base64 => decode_to_array!(base64_decode, input_value), + Self::Hex => decode_to_array!(hex_decode, input_value), + }; + Ok(ColumnarValue::Array(array)) + } + + fn decode_utf8_array(self, value: &dyn Array) -> Result + where + T: OffsetSizeTrait, + { + let input_value = as_generic_string_array::(value)?; + let array: ArrayRef = match self { + Self::Base64 => decode_to_array!(base64_decode, input_value), + Self::Hex => decode_to_array!(hex_decode, input_value), + }; + Ok(ColumnarValue::Array(array)) + } +} + +impl fmt::Display for Encoding { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", format!("{self:?}").to_lowercase()) + } +} + +impl FromStr for Encoding { + type Err = DataFusionError; + fn from_str(name: &str) -> Result { + Ok(match name { + "base64" => Self::Base64, + "hex" => Self::Hex, + _ => { + let options = [Self::Base64, Self::Hex] + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(", "); + return Err(DataFusionError::Plan(format!( + "There is no built-in encoding named '{name}', currently supported encodings are: {options}", + ))); + } + }) + } +} + +/// Encodes the given data, accepts Binary, LargeBinary, Utf8 or LargeUtf8 and returns a [`ColumnarValue`]. +/// Second argument is the encoding to use. +/// Standard encodings are base64 and hex. +pub fn encode(args: &[ColumnarValue]) -> Result { + if args.len() != 2 { + return Err(DataFusionError::Internal(format!( + "{:?} args were supplied but encode takes exactly two arguments", + args.len(), + ))); + } + let encoding = match &args[1] { + ColumnarValue::Scalar(scalar) => match scalar { + ScalarValue::Utf8(Some(method)) | ScalarValue::LargeUtf8(Some(method)) => { + method.parse::() + } + _ => Err(DataFusionError::NotImplemented( + "Second argument to encode must be a constant: Encode using dynamically decided method is not yet supported".into(), + )), + }, + ColumnarValue::Array(_) => Err(DataFusionError::NotImplemented( + "Second argument to encode must be a constant: Encode using dynamically decided method is not yet supported".into(), + )), + }?; + encode_process(&args[0], encoding) +} + +/// Decodes the given data, accepts Binary, LargeBinary, Utf8 or LargeUtf8 and returns a [`ColumnarValue`]. +/// Second argument is the encoding to use. +/// Standard encodings are base64 and hex. +pub fn decode(args: &[ColumnarValue]) -> Result { + if args.len() != 2 { + return Err(DataFusionError::Internal(format!( + "{:?} args were supplied but decode takes exactly two arguments", + args.len(), + ))); + } + let encoding = match &args[1] { + ColumnarValue::Scalar(scalar) => match scalar { + ScalarValue::Utf8(Some(method)) | ScalarValue::LargeUtf8(Some(method)) => { + method.parse::() + } + _ => Err(DataFusionError::NotImplemented( + "Second argument to decode must be a utf8 constant: Decode using dynamically decided method is not yet supported".into(), + )), + }, + ColumnarValue::Array(_) => Err(DataFusionError::NotImplemented( + "Second argument to decode must be a utf8 constant: Decode using dynamically decided method is not yet supported".into(), + )), + }?; + decode_process(&args[0], encoding) +} diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 3221b6f2932ce..08916d89c9862 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -181,6 +181,26 @@ pub fn create_physical_expr( ))) } +#[cfg(feature = "encoding_expressions")] +macro_rules! invoke_if_encoding_expressions_feature_flag { + ($FUNC:ident, $NAME:expr) => {{ + use crate::encoding_expressions; + encoding_expressions::$FUNC + }}; +} + +#[cfg(not(feature = "encoding_expressions"))] +macro_rules! invoke_if_encoding_expressions_feature_flag { + ($FUNC:ident, $NAME:expr) => { + |_: &[ColumnarValue]| -> Result { + Err(DataFusionError::Internal(format!( + "function {} requires compilation with feature flag: encoding_expressions.", + $NAME + ))) + } + }; +} + #[cfg(feature = "crypto_expressions")] macro_rules! invoke_if_crypto_expressions_feature_flag { ($FUNC:ident, $NAME:expr) => {{ @@ -565,6 +585,12 @@ pub fn create_physical_fun( BuiltinScalarFunction::Digest => { Arc::new(invoke_if_crypto_expressions_feature_flag!(digest, "digest")) } + BuiltinScalarFunction::Decode => Arc::new( + invoke_if_encoding_expressions_feature_flag!(decode, "decode"), + ), + BuiltinScalarFunction::Encode => Arc::new( + invoke_if_encoding_expressions_feature_flag!(encode, "encode"), + ), BuiltinScalarFunction::NullIf => Arc::new(nullif_func), BuiltinScalarFunction::OctetLength => Arc::new(|args| match &args[0] { ColumnarValue::Array(v) => Ok(ColumnarValue::Array(length(v.as_ref())?)), diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 0a2e0e58df7a9..1484cf7ff52ca 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -21,6 +21,8 @@ pub mod conditional_expressions; #[cfg(feature = "crypto_expressions")] pub mod crypto_expressions; pub mod datetime_expressions; +#[cfg(feature = "encoding_expressions")] +pub mod encoding_expressions; pub mod equivalence; pub mod execution_props; pub mod expressions; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index de334dc4a5cc7..6754370a8b02f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -564,6 +564,8 @@ enum ScalarFunction { Cardinality = 98; TrimArray = 99; ArrayContains = 100; + Encode = 101; + Decode = 102; } message ScalarFunctionNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 1cf08be321e1f..d8b3377da6a75 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -17884,6 +17884,8 @@ impl serde::Serialize for ScalarFunction { Self::Cardinality => "Cardinality", Self::TrimArray => "TrimArray", Self::ArrayContains => "ArrayContains", + Self::Encode => "Encode", + Self::Decode => "Decode", }; serializer.serialize_str(variant) } @@ -17996,6 +17998,8 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Cardinality", "TrimArray", "ArrayContains", + "Encode", + "Decode", ]; struct GeneratedVisitor; @@ -18139,6 +18143,8 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Cardinality" => Ok(ScalarFunction::Cardinality), "TrimArray" => Ok(ScalarFunction::TrimArray), "ArrayContains" => Ok(ScalarFunction::ArrayContains), + "Encode" => Ok(ScalarFunction::Encode), + "Decode" => Ok(ScalarFunction::Decode), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 5f201b124d1b9..ff71158634904 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2228,6 +2228,8 @@ pub enum ScalarFunction { Cardinality = 98, TrimArray = 99, ArrayContains = 100, + Encode = 101, + Decode = 102, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -2337,6 +2339,8 @@ impl ScalarFunction { ScalarFunction::Cardinality => "Cardinality", ScalarFunction::TrimArray => "TrimArray", ScalarFunction::ArrayContains => "ArrayContains", + ScalarFunction::Encode => "Encode", + ScalarFunction::Decode => "Decode", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -2443,6 +2447,8 @@ impl ScalarFunction { "Cardinality" => Some(Self::Cardinality), "TrimArray" => Some(Self::TrimArray), "ArrayContains" => Some(Self::ArrayContains), + "Encode" => Some(Self::Encode), + "Decode" => Some(Self::Decode), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index b4a49713c2444..b5100f4376042 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -474,6 +474,8 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Sha384 => Self::SHA384, ScalarFunction::Sha512 => Self::SHA512, ScalarFunction::Digest => Self::Digest, + ScalarFunction::Encode => Self::Encode, + ScalarFunction::Decode => Self::Decode, ScalarFunction::ToTimestampMillis => Self::ToTimestampMillis, ScalarFunction::Log2 => Self::Log2, ScalarFunction::Signum => Self::Signum, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 5b09aee91095f..43485577aeac0 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1366,6 +1366,8 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::SHA384 => Self::Sha384, BuiltinScalarFunction::SHA512 => Self::Sha512, BuiltinScalarFunction::Digest => Self::Digest, + BuiltinScalarFunction::Decode => Self::Decode, + BuiltinScalarFunction::Encode => Self::Encode, BuiltinScalarFunction::ToTimestampMillis => Self::ToTimestampMillis, BuiltinScalarFunction::Log2 => Self::Log2, BuiltinScalarFunction::Signum => Self::Signum,