diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 87140b21fe9..8aef1637490 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -134,7 +134,6 @@ def _gold_tests(self, gold_dir): skip.add("JS") skip.add("Rust") if prefix == '2.0.0-compression': - skip.add("Go") skip.add("JS") skip.add("Rust") diff --git a/docs/source/status.rst b/docs/source/status.rst index 1c6262274e6..acf5af90d52 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -42,7 +42,7 @@ Data Types +-------------------+-------+-------+-------+------------+-------+-------+-------+ | Float32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ -| Decimal128 | ✓ | ✓ | | | ✓ | ✓ | ✓ | +| Decimal128 | ✓ | ✓ | ✓ | | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ | Decimal256 | ✓ | ✓ | | | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ @@ -126,7 +126,7 @@ IPC Format +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Sparse tensors | ✓ | | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ -| Buffer compression | ✓ | ✓ (3) | | | | | ✓ | +| Buffer compression | ✓ | ✓ (3) | ✓ | | | | ✓ | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Endianness conversion | ✓ (2) | | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ @@ -152,13 +152,13 @@ Flight RPC | Flight RPC Feature | C++ | Java | Go | JavaScript | C# | Rust | Julia | | | | | | | | | | +=============================+=======+=======+=======+============+=======+=======+=======+ -| gRPC transport | ✓ | ✓ | | | ✓ (1) | | | +| gRPC transport | ✓ | ✓ | ✓ | | ✓ (1) | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ -| gRPC + TLS transport | ✓ | ✓ | | | ✓ | | | +| gRPC + TLS transport | ✓ | ✓ | ✓ | | ✓ | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | RPC error codes | ✓ | ✓ | | | ✓ | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ -| Authentication handlers | ✓ | ✓ | | | ✓ (2) | | | +| Authentication handlers | ✓ | ✓ | ✓ | | ✓ (2) | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Custom client middleware | ✓ | ✓ | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ diff --git a/go/arrow/go.mod b/go/arrow/go.mod index ca760281449..5e7915fa194 100644 --- a/go/arrow/go.mod +++ b/go/arrow/go.mod @@ -20,8 +20,12 @@ go 1.12 require ( github.com/davecgh/go-spew v1.1.0 // indirect + github.com/frankban/quicktest v1.11.3 // indirect github.com/golang/protobuf v1.4.2 github.com/google/flatbuffers v1.11.0 + github.com/klauspost/compress v1.11.13 + github.com/pierrec/lz4 v2.6.0+incompatible + github.com/pierrec/lz4/v4 v4.1.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.2.0 golang.org/x/net v0.0.0-20200904194848-62affa334b73 // indirect @@ -30,6 +34,5 @@ require ( golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f // indirect google.golang.org/grpc v1.32.0 - google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3 // indirect google.golang.org/protobuf v1.25.0 ) diff --git a/go/arrow/go.sum b/go/arrow/go.sum index f56e738d4c3..5743321c526 100644 --- a/go/arrow/go.sum +++ b/go/arrow/go.sum @@ -9,6 +9,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -19,7 +21,6 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= @@ -29,8 +30,20 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4= +github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.4 h1:PjkB+qEooc9nw4F6Pxe/e0xaRdWz3suItXWxWqAO1QE= +github.com/pierrec/lz4/v4 v4.1.4/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -45,7 +58,6 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA= @@ -55,13 +67,11 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200909081042-eff7692f9009 h1:W0lCpv29Hv0UaM1LXb9QlBHLNP8UFfcKjblhVCWftOM= golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -76,19 +86,15 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f h1:Yv4xsIx7HZOoyUGSJ2ksDyWE2qIBXROsZKt2ny3hCGM= google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0= google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3 h1:MZjUhWVLZHiPPNKvwdt31HZVHrASfgk1ScV3vVTKbDo= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go index 33aab24bb3c..dfafa4e7fac 100644 --- a/go/arrow/internal/arrdata/ioutil.go +++ b/go/arrow/internal/arrdata/ioutil.go @@ -25,6 +25,7 @@ import ( "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/ipc" "github.com/apache/arrow/go/arrow/memory" ) @@ -128,7 +129,7 @@ func CheckArrowStream(t *testing.T, f *os.File, mem memory.Allocator, schema *ar for r.Next() { rec := r.Record() if !array.RecordEqual(rec, recs[n]) { - t.Fatalf("records[%d] differ", n) + t.Fatalf("records[%d] differ, got: %s, expected %s", n, rec, recs[n]) } n++ } @@ -166,6 +167,55 @@ func WriteFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Sch t.Fatalf("could not sync data to disk: %v", err) } + // put the cursor back at the start of the file before returning rather than + // leaving it at the end so the reader can just start reading from the handle + // immediately for the test. + _, err = f.Seek(0, io.SeekStart) + if err != nil { + t.Fatalf("could not seek to start: %v", err) + } +} + +// WriteFile writes a list of records to the given file descriptor, as an ARROW file. +func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType) { + t.Helper() + + opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem)} + switch codec { + case flatbuf.CompressionTypeLZ4_FRAME: + opts = append(opts, ipc.WithLZ4()) + case flatbuf.CompressionTypeZSTD: + opts = append(opts, ipc.WithZstd()) + default: + t.Fatalf("invalid compression codec %v, only LZ4_FRAME or ZSTD is allowed", codec) + } + + w, err := ipc.NewFileWriter(f, opts...) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + for i, rec := range recs { + err = w.Write(rec) + if err != nil { + t.Fatalf("could not write record[%d]: %v", i, err) + } + } + + err = w.Close() + if err != nil { + t.Fatal(err) + } + + err = f.Sync() + if err != nil { + t.Fatalf("could not sync data to disk: %v", err) + } + + // put the cursor back at the start of the file before returning rather than + // leaving it at the end so the reader can just start reading from the handle + // immediately for the test. _, err = f.Seek(0, io.SeekStart) if err != nil { t.Fatalf("could not seek to start: %v", err) @@ -191,3 +241,34 @@ func WriteStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.S t.Fatal(err) } } + +// WriteStreamCompressed writes a list of records to the given file descriptor as an ARROW stream +// using the provided compression type. +func WriteStreamCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType, np int) { + t.Helper() + + opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem), ipc.WithCompressConcurrency(np)} + switch codec { + case flatbuf.CompressionTypeLZ4_FRAME: + opts = append(opts, ipc.WithLZ4()) + case flatbuf.CompressionTypeZSTD: + opts = append(opts, ipc.WithZstd()) + default: + t.Fatalf("invalid compression codec %v, only LZ4_FRAME or ZSTD is allowed", codec) + } + + w := ipc.NewWriter(f, opts...) + defer w.Close() + + for i, rec := range recs { + err := w.Write(rec) + if err != nil { + t.Fatalf("could not write record[%d]: %v", i, err) + } + } + + err := w.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/go/arrow/ipc/cmd/arrow-cat/main_test.go b/go/arrow/ipc/cmd/arrow-cat/main_test.go index a940d8b807e..dd42438d993 100644 --- a/go/arrow/ipc/cmd/arrow-cat/main_test.go +++ b/go/arrow/ipc/cmd/arrow-cat/main_test.go @@ -283,7 +283,7 @@ record 3... }, { name: "primitives", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "bools": [true (null) (null) false true] col[1] "int8s": [-1 (null) (null) -4 -5] @@ -333,7 +333,7 @@ record 2... }, { name: "structs", - want: `version: V4 + want: `version: V5 record 1/2... col[0] "struct_nullable": {[-1 (null) (null) -4 -5 -11 (null) (null) -14 -15 -21 (null) (null) -24 -25 -31 (null) (null) -34 -35 -41 (null) (null) -44 -45] ["111" (null) (null) "444" "555" "1111" (null) (null) "1444" "1555" "2111" (null) (null) "2444" "2555" "3111" (null) (null) "3444" "3555" "4111" (null) (null) "4444" "4555"]} record 2/2... @@ -355,7 +355,7 @@ record 4... }, { name: "lists", - want: `version: V4 + want: `version: V5 record 1/4... col[0] "list_nullable": [[1 (null) (null) 4 5] [11 (null) (null) 14 15] [21 (null) (null) 24 25]] record 2/4... @@ -382,7 +382,7 @@ record 3... }, { name: "strings", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "strings": ["1é" (null) (null) "4" "5"] col[1] "bytes": ["1é" (null) (null) "4" "5"] @@ -407,7 +407,7 @@ record 3... }, { name: "fixed_size_lists", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "fixed_size_list_nullable": [[1 (null) 3] [11 (null) 13] [21 (null) 23]] record 2/3... @@ -459,7 +459,7 @@ record 3... }, { name: "fixed_width_types", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "float16s": [1 (null) (null) 4 5] col[1] "time32ms": [-2 (null) (null) 1 2] @@ -511,7 +511,7 @@ record 3... }, { name: "fixed_size_binaries", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"] record 2/3... diff --git a/go/arrow/ipc/cmd/arrow-ls/main_test.go b/go/arrow/ipc/cmd/arrow-ls/main_test.go index 5d6ac92649e..1ea95802620 100644 --- a/go/arrow/ipc/cmd/arrow-ls/main_test.go +++ b/go/arrow/ipc/cmd/arrow-ls/main_test.go @@ -207,7 +207,7 @@ records: 3 }, { name: "primitives", - want: `version: V4 + want: `version: V5 schema: fields: 11 - bools: type=bool, nullable @@ -236,7 +236,7 @@ records: 2 }, { name: "structs", - want: `version: V4 + want: `version: V5 schema: fields: 1 - struct_nullable: type=struct, nullable @@ -254,7 +254,7 @@ records: 4 }, { name: "lists", - want: `version: V4 + want: `version: V5 schema: fields: 1 - list_nullable: type=list, nullable @@ -272,7 +272,7 @@ records: 3 }, { name: "fixed_size_binaries", - want: `version: V4 + want: `version: V5 schema: fields: 1 - fixed_size_binary_3: type=fixed_size_binary[3], nullable diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go new file mode 100644 index 00000000000..7b2502f87e0 --- /dev/null +++ b/go/arrow/ipc/compression.go @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipc + +import ( + "io" + + "github.com/apache/arrow/go/arrow/internal/debug" + "github.com/apache/arrow/go/arrow/internal/flatbuf" + "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4/v4" +) + +type compressor interface { + MaxCompressedLen(n int) int + Reset(io.Writer) + io.WriteCloser + Type() flatbuf.CompressionType +} + +type lz4Compressor struct { + *lz4.Writer +} + +func (lz4Compressor) MaxCompressedLen(n int) int { + return lz4.CompressBlockBound(n) +} + +func (lz4Compressor) Type() flatbuf.CompressionType { + return flatbuf.CompressionTypeLZ4_FRAME +} + +type zstdCompressor struct { + *zstd.Encoder +} + +// from zstd.h, ZSTD_COMPRESSBOUND +func (zstdCompressor) MaxCompressedLen(len int) int { + debug.Assert(len >= 0, "MaxCompressedLen called with len less than 0") + extra := uint((uint(128<<10) - uint(len)) >> 11) + if len >= (128 << 10) { + extra = 0 + } + return int(uint(len+(len>>8)) + extra) +} + +func (zstdCompressor) Type() flatbuf.CompressionType { + return flatbuf.CompressionTypeZSTD +} + +func getCompressor(codec flatbuf.CompressionType) compressor { + switch codec { + case flatbuf.CompressionTypeLZ4_FRAME: + w := lz4.NewWriter(nil) + // options here chosen in order to match the C++ implementation + w.Apply(lz4.ChecksumOption(false), lz4.BlockSizeOption(lz4.Block64Kb)) + return &lz4Compressor{w} + case flatbuf.CompressionTypeZSTD: + enc, err := zstd.NewWriter(nil) + if err != nil { + panic(err) + } + return zstdCompressor{enc} + } + return nil +} + +type decompressor interface { + io.Reader + Reset(io.Reader) +} + +type zstdDecompressor struct { + *zstd.Decoder +} + +func (z *zstdDecompressor) Reset(r io.Reader) { + if err := z.Decoder.Reset(r); err != nil { + panic(err) + } +} + +func getDecompressor(codec flatbuf.CompressionType) decompressor { + switch codec { + case flatbuf.CompressionTypeLZ4_FRAME: + return lz4.NewReader(nil) + case flatbuf.CompressionTypeZSTD: + dec, err := zstd.NewReader(nil) + if err != nil { + panic(err) + } + return &zstdDecompressor{dec} + } + return nil +} diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index cf324482018..3b38f1bf628 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -312,16 +312,23 @@ func (f *FileReader) ReadAt(i int64) (array.Record, error) { func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record { var ( - msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) - md flatbuf.RecordBatch + msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) + md flatbuf.RecordBatch + codec decompressor ) initFB(&md, msg.Header) rows := md.Length() + bodyCompress := md.Compression(nil) + if bodyCompress != nil { + codec = getDecompressor(bodyCompress.Codec()) + } + ctx := &arrayLoaderContext{ src: ipcSource{ - meta: &md, - r: body, + meta: &md, + r: body, + codec: codec, }, max: kMaxNestingDepth, } @@ -335,8 +342,9 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) arr } type ipcSource struct { - meta *flatbuf.RecordBatch - r ReadAtSeeker + meta *flatbuf.RecordBatch + r ReadAtSeeker + codec decompressor } func (src *ipcSource) buffer(i int) *memory.Buffer { @@ -348,10 +356,35 @@ func (src *ipcSource) buffer(i int) *memory.Buffer { return memory.NewBufferBytes(nil) } - raw := make([]byte, buf.Length()) - _, err := src.r.ReadAt(raw, buf.Offset()) - if err != nil { - panic(err) + var raw []byte + if src.codec == nil { + raw = make([]byte, buf.Length()) + _, err := src.r.ReadAt(raw, buf.Offset()) + if err != nil { + panic(err) + } + } else { + sr := io.NewSectionReader(src.r, buf.Offset(), buf.Length()) + var uncompressedSize uint64 + + err := binary.Read(sr, binary.LittleEndian, &uncompressedSize) + if err != nil { + panic(err) + } + + var r io.Reader = sr + // check for an uncompressed buffer + if int64(uncompressedSize) != -1 { + raw = make([]byte, uncompressedSize) + src.codec.Reset(sr) + r = src.codec + } else { + raw = make([]byte, buf.Length()) + } + + if _, err = io.ReadFull(r, raw); err != nil { + panic(err) + } } return memory.NewBufferBytes(raw) diff --git a/go/arrow/ipc/file_test.go b/go/arrow/ipc/file_test.go index d0ef9605e61..53915e51aed 100644 --- a/go/arrow/ipc/file_test.go +++ b/go/arrow/ipc/file_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/apache/arrow/go/arrow/internal/arrdata" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/memory" ) @@ -49,3 +50,34 @@ func TestFile(t *testing.T) { }) } } + +func TestFileCompressed(t *testing.T) { + tempDir, err := ioutil.TempDir("", "go-arrow-file-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + compressTypes := []flatbuf.CompressionType{ + flatbuf.CompressionTypeLZ4_FRAME, flatbuf.CompressionTypeZSTD, + } + + for _, codec := range compressTypes { + for name, recs := range arrdata.Records { + t.Run(name, func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + f, err := ioutil.TempFile(tempDir, "go-arrow-file-") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + arrdata.WriteFileCompressed(t, f, mem, recs[0].Schema(), recs, codec) + arrdata.CheckArrowFile(t, f, mem, recs[0].Schema(), recs) + arrdata.CheckArrowConcurrentFile(t, f, mem, recs[0].Schema(), recs) + }) + } + } +} diff --git a/go/arrow/ipc/file_writer.go b/go/arrow/ipc/file_writer.go index 112427020f4..1b6c4fbf3d9 100644 --- a/go/arrow/ipc/file_writer.go +++ b/go/arrow/ipc/file_writer.go @@ -274,7 +274,9 @@ type FileWriter struct { pw PayloadWriter - schema *arrow.Schema + schema *arrow.Schema + codec flatbuf.CompressionType + compressNP int } // NewFileWriter opens an Arrow file using the provided writer w. @@ -285,10 +287,12 @@ func NewFileWriter(w io.WriteSeeker, opts ...Option) (*FileWriter, error) { ) f := FileWriter{ - w: w, - pw: &pwriter{w: w, schema: cfg.schema, pos: -1}, - mem: cfg.alloc, - schema: cfg.schema, + w: w, + pw: &pwriter{w: w, schema: cfg.schema, pos: -1}, + mem: cfg.alloc, + schema: cfg.schema, + codec: cfg.codec, + compressNP: cfg.compressNP, } pos, err := f.w.Seek(0, io.SeekCurrent) @@ -332,7 +336,7 @@ func (f *FileWriter) Write(rec array.Record) error { const allow64b = true var ( data = Payload{msg: MessageRecordBatch} - enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b) + enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b, f.codec, f.compressNP) ) defer data.Release() diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go index 4c0eb4b61e6..fa114c9500d 100644 --- a/go/arrow/ipc/ipc.go +++ b/go/arrow/ipc/ipc.go @@ -21,6 +21,7 @@ import ( "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/arrio" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/memory" ) @@ -65,11 +66,14 @@ type config struct { footer struct { offset int64 } + codec flatbuf.CompressionType + compressNP int } func newConfig(opts ...Option) *config { cfg := &config{ alloc: memory.NewGoAllocator(), + codec: -1, // uncompressed } for _, opt := range opts { @@ -104,6 +108,32 @@ func WithSchema(schema *arrow.Schema) Option { } } +// WithLZ4 tells the writer to use LZ4 Frame compression on the data +// buffers before writing. Requires >= Arrow 1.0.0 to read/decompress +func WithLZ4() Option { + return func(cfg *config) { + cfg.codec = flatbuf.CompressionTypeLZ4_FRAME + } +} + +// WithZstd tells the writer to use ZSTD compression on the data +// buffers before writing. Requires >= Arrow 1.0.0 to read/decompress +func WithZstd() Option { + return func(cfg *config) { + cfg.codec = flatbuf.CompressionTypeZSTD + } +} + +// WithCompressConcurrency specifies a number of goroutines to spin up for +// concurrent compression of the body buffers when writing compress IPC records. +// If n <= 1 then compression will be done serially without goroutine +// parallelization. Default is 0. +func WithCompressConcurrency(n int) Option { + return func(cfg *config) { + cfg.compressNP = n + } +} + var ( _ arrio.Reader = (*Reader)(nil) _ arrio.Writer = (*Writer)(nil) diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go index 76f81e4fa4a..50dbc7a7187 100644 --- a/go/arrow/ipc/message.go +++ b/go/arrow/ipc/message.go @@ -36,6 +36,7 @@ const ( MetadataV2 = MetadataVersion(flatbuf.MetadataVersionV2) // version for Arrow-0.2.0 MetadataV3 = MetadataVersion(flatbuf.MetadataVersionV3) // version for Arrow-0.3.0 to 0.7.1 MetadataV4 = MetadataVersion(flatbuf.MetadataVersionV4) // version for >= Arrow-0.8.0 + MetadataV5 = MetadataVersion(flatbuf.MetadataVersionV5) // version for >= Arrow-1.0.0, backward compatible with v4 ) func (m MetadataVersion) String() string { diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index ef9a9bbe457..5f335f10890 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -32,7 +32,7 @@ import ( var Magic = []byte("ARROW1") const ( - currentMetadataVersion = MetadataV4 + currentMetadataVersion = MetadataV5 minMetadataVersion = MetadataV4 kExtensionTypeKeyName = "arrow_extension_name" @@ -966,20 +966,28 @@ func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w io.Writer) return err } -func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata) *memory.Buffer { +func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) *memory.Buffer { b := flatbuffers.NewBuilder(0) - recFB := recordToFB(b, size, bodyLength, fields, meta) + recFB := recordToFB(b, size, bodyLength, fields, meta, codec) return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength) } -func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata) flatbuffers.UOffsetT { +func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) flatbuffers.UOffsetT { fieldsFB := writeFieldNodes(b, fields, flatbuf.RecordBatchStartNodesVector) metaFB := writeBuffers(b, meta, flatbuf.RecordBatchStartBuffersVector) + var bodyCompressFB flatbuffers.UOffsetT + if codec != -1 { + bodyCompressFB = writeBodyCompression(b, codec) + } flatbuf.RecordBatchStart(b) flatbuf.RecordBatchAddLength(b, size) flatbuf.RecordBatchAddNodes(b, fieldsFB) flatbuf.RecordBatchAddBuffers(b, metaFB) + if codec != -1 { + flatbuf.RecordBatchAddCompression(b, bodyCompressFB) + } + return flatbuf.RecordBatchEnd(b) } @@ -1006,6 +1014,13 @@ func writeBuffers(b *flatbuffers.Builder, buffers []bufferMetadata, start startV return b.EndVector(len(buffers)) } +func writeBodyCompression(b *flatbuffers.Builder, codec flatbuf.CompressionType) flatbuffers.UOffsetT { + flatbuf.BodyCompressionStart(b) + flatbuf.BodyCompressionAddCodec(b, codec) + flatbuf.BodyCompressionAddMethod(b, flatbuf.BodyCompressionMethodBUFFER) + return flatbuf.BodyCompressionEnd(b) +} + func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error) { var ( n int diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/stream_test.go index 596a3f97bfd..42e3709bff7 100644 --- a/go/arrow/ipc/stream_test.go +++ b/go/arrow/ipc/stream_test.go @@ -20,9 +20,11 @@ import ( "io" "io/ioutil" "os" + "strconv" "testing" "github.com/apache/arrow/go/arrow/internal/arrdata" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/memory" ) @@ -60,3 +62,50 @@ func TestStream(t *testing.T) { }) } } + +func TestStreamCompressed(t *testing.T) { + tempDir, err := ioutil.TempDir("", "go-arrow-stream-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + compressTypes := []flatbuf.CompressionType{ + flatbuf.CompressionTypeLZ4_FRAME, flatbuf.CompressionTypeZSTD, + } + + for np := 0; np < 3; np++ { + t.Run("compress concurrency "+strconv.Itoa(np), func(t *testing.T) { + for _, codec := range compressTypes { + t.Run(codec.String(), func(t *testing.T) { + for name, recs := range arrdata.Records { + t.Run(name, func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + f, err := ioutil.TempFile(tempDir, "go-arrow-stream-") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + arrdata.WriteStreamCompressed(t, f, mem, recs[0].Schema(), recs, codec, np) + + err = f.Sync() + if err != nil { + t.Fatalf("could not sync data to disk: %v", err) + } + + _, err = f.Seek(0, io.SeekStart) + if err != nil { + t.Fatalf("could not seek to start: %v", err) + } + + arrdata.CheckArrowStream(t, f, mem, recs[0].Schema(), recs) + }) + } + }) + } + }) + } +} diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index c9a16a35457..c4c75722610 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -17,12 +17,17 @@ package ipc // import "github.com/apache/arrow/go/arrow/ipc" import ( + "bytes" + "context" + "encoding/binary" "io" "math" + "sync" "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" "github.com/apache/arrow/go/arrow/bitutil" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/memory" "golang.org/x/xerrors" ) @@ -59,8 +64,10 @@ type Writer struct { mem memory.Allocator pw PayloadWriter - started bool - schema *arrow.Schema + started bool + schema *arrow.Schema + codec flatbuf.CompressionType + compressNP int } // NewWriterWithPayloadWriter constructs a writer with the provided payload writer @@ -69,9 +76,11 @@ type Writer struct { func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer { cfg := newConfig(opts...) return &Writer{ - mem: cfg.alloc, - pw: pw, - schema: cfg.schema, + mem: cfg.alloc, + pw: pw, + schema: cfg.schema, + codec: cfg.codec, + compressNP: cfg.compressNP, } } @@ -83,6 +92,7 @@ func NewWriter(w io.Writer, opts ...Option) *Writer { mem: cfg.alloc, pw: &swriter{w: w}, schema: cfg.schema, + codec: cfg.codec, } } @@ -123,7 +133,7 @@ func (w *Writer) Write(rec array.Record) error { const allow64b = true var ( data = Payload{msg: MessageRecordBatch} - enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b) + enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, w.codec, w.compressNP) ) defer data.Release() @@ -157,18 +167,97 @@ type recordEncoder struct { fields []fieldMetadata meta []bufferMetadata - depth int64 - start int64 - allow64b bool + depth int64 + start int64 + allow64b bool + codec flatbuf.CompressionType + compressNP int } -func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, allow64b bool) *recordEncoder { +func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, allow64b bool, codec flatbuf.CompressionType, compressNP int) *recordEncoder { return &recordEncoder{ - mem: mem, - start: startOffset, - depth: maxDepth, - allow64b: allow64b, + mem: mem, + start: startOffset, + depth: maxDepth, + allow64b: allow64b, + codec: codec, + compressNP: compressNP, + } +} + +func (w *recordEncoder) compressBodyBuffers(p *Payload) error { + compress := func(idx int, codec compressor) error { + if p.body[idx] == nil || p.body[idx].Len() == 0 { + return nil + } + var buf bytes.Buffer + buf.Grow(codec.MaxCompressedLen(p.body[idx].Len()) + arrow.Int64SizeBytes) + if err := binary.Write(&buf, binary.LittleEndian, uint64(p.body[idx].Len())); err != nil { + return err + } + codec.Reset(&buf) + if _, err := codec.Write(p.body[idx].Bytes()); err != nil { + return err + } + if err := codec.Close(); err != nil { + return err + } + p.body[idx] = memory.NewBufferBytes(buf.Bytes()) + return nil + } + + if w.compressNP <= 1 { + codec := getCompressor(w.codec) + for idx := range p.body { + if err := compress(idx, codec); err != nil { + return err + } + } + return nil } + + var ( + wg sync.WaitGroup + ch = make(chan int) + errch = make(chan error) + ctx, cancel = context.WithCancel(context.Background()) + ) + defer cancel() + + for i := 0; i < w.compressNP; i++ { + go func() { + defer wg.Done() + codec := getCompressor(w.codec) + for { + select { + case idx, ok := <-ch: + if !ok { + // we're done, channel is closed! + return + } + + if err := compress(idx, codec); err != nil { + errch <- err + cancel() + return + } + case <-ctx.Done(): + // cancelled, return early + return + } + } + }() + } + + for idx := range p.body { + ch <- idx + } + + close(ch) + wg.Wait() + close(errch) + + return <-errch } func (w *recordEncoder) Encode(p *Payload, rec array.Record) error { @@ -181,6 +270,10 @@ func (w *recordEncoder) Encode(p *Payload, rec array.Record) error { } } + if w.codec != -1 { + w.compressBodyBuffers(p) + } + // position for the start of a buffer relative to the passed frame of reference. // may be 0 or some other position in an address space. offset := w.start @@ -199,7 +292,9 @@ func (w *recordEncoder) Encode(p *Payload, rec array.Record) error { } w.meta[i] = bufferMetadata{ Offset: offset, - Len: size + padding, + // even though we add padding, we need the Len to be correct + // so that decompressing works properly. + Len: size, } offset += size + padding } @@ -434,7 +529,7 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr array.Interface) (*memory.B } func (w *recordEncoder) encodeMetadata(p *Payload, nrows int64) error { - p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta) + p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta, w.codec) return nil }