From 5b8f26ca26c3c003b52f2e71176ade478743f610 Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Mon, 29 Mar 2021 15:51:40 -0400 Subject: [PATCH 1/8] add compression handling --- go/arrow/go.mod | 4 +- go/arrow/go.sum | 22 +++-- go/arrow/internal/arrdata/ioutil.go | 73 ++++++++++++++++- go/arrow/ipc/cmd/arrow-cat/main_test.go | 14 ++-- go/arrow/ipc/cmd/arrow-ls/main_test.go | 8 +- go/arrow/ipc/compression.go | 104 ++++++++++++++++++++++++ go/arrow/ipc/file_reader.go | 44 +++++++--- go/arrow/ipc/file_test.go | 32 ++++++++ go/arrow/ipc/file_writer.go | 4 +- go/arrow/ipc/ipc.go | 19 +++++ go/arrow/ipc/message.go | 1 + go/arrow/ipc/metadata.go | 23 +++++- go/arrow/ipc/stream_test.go | 42 ++++++++++ go/arrow/ipc/writer.go | 40 ++++++++- 14 files changed, 390 insertions(+), 40 deletions(-) create mode 100644 go/arrow/ipc/compression.go diff --git a/go/arrow/go.mod b/go/arrow/go.mod index ca760281449..e2edcc997b2 100644 --- a/go/arrow/go.mod +++ b/go/arrow/go.mod @@ -20,8 +20,11 @@ go 1.12 require ( github.com/davecgh/go-spew v1.1.0 // indirect + github.com/frankban/quicktest v1.11.3 // indirect github.com/golang/protobuf v1.4.2 github.com/google/flatbuffers v1.11.0 + github.com/klauspost/compress v1.11.13 + github.com/pierrec/lz4 v2.6.0+incompatible github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.2.0 golang.org/x/net v0.0.0-20200904194848-62affa334b73 // indirect @@ -30,6 +33,5 @@ require ( golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f // indirect google.golang.org/grpc v1.32.0 - google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3 // indirect google.golang.org/protobuf v1.25.0 ) diff --git a/go/arrow/go.sum b/go/arrow/go.sum index f56e738d4c3..7cfe4446da3 100644 --- a/go/arrow/go.sum +++ b/go/arrow/go.sum @@ -9,6 +9,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -19,7 +21,6 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= @@ -29,8 +30,18 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4= +github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -45,7 +56,6 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA= @@ -55,13 +65,11 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200909081042-eff7692f9009 h1:W0lCpv29Hv0UaM1LXb9QlBHLNP8UFfcKjblhVCWftOM= golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -76,19 +84,15 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f h1:Yv4xsIx7HZOoyUGSJ2ksDyWE2qIBXROsZKt2ny3hCGM= google.golang.org/genproto v0.0.0-20200911024640-645f7a48b24f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0= google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3 h1:MZjUhWVLZHiPPNKvwdt31HZVHrASfgk1ScV3vVTKbDo= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go index 33aab24bb3c..c501a4161ec 100644 --- a/go/arrow/internal/arrdata/ioutil.go +++ b/go/arrow/internal/arrdata/ioutil.go @@ -25,6 +25,7 @@ import ( "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/ipc" "github.com/apache/arrow/go/arrow/memory" ) @@ -128,7 +129,7 @@ func CheckArrowStream(t *testing.T, f *os.File, mem memory.Allocator, schema *ar for r.Next() { rec := r.Record() if !array.RecordEqual(rec, recs[n]) { - t.Fatalf("records[%d] differ", n) + t.Fatalf("records[%d] differ, got: %s, expected %s", n, rec, recs[n]) } n++ } @@ -172,6 +173,47 @@ func WriteFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Sch } } +// WriteFile writes a list of records to the given file descriptor, as an ARROW file. +func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType) { + t.Helper() + + opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem)} + switch codec { + case flatbuf.CompressionTypeLZ4_FRAME: + opts = append(opts, ipc.WithLZ4()) + case flatbuf.CompressionTypeZSTD: + opts = append(opts, ipc.WithZstd()) + } + + w, err := ipc.NewFileWriter(f, opts...) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + for i, rec := range recs { + err = w.Write(rec) + if err != nil { + t.Fatalf("could not write record[%d]: %v", i, err) + } + } + + err = w.Close() + if err != nil { + t.Fatal(err) + } + + err = f.Sync() + if err != nil { + t.Fatalf("could not sync data to disk: %v", err) + } + + _, err = f.Seek(0, io.SeekStart) + if err != nil { + t.Fatalf("could not seek to start: %v", err) + } +} + // WriteStream writes a list of records to the given file descriptor, as an ARROW stream. func WriteStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record) { t.Helper() @@ -191,3 +233,32 @@ func WriteStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.S t.Fatal(err) } } + +// WriteStreamCompressed writes a list of records to the given file descriptor as an ARROW stream +// using the provided compression type. +func WriteStreamCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType) { + t.Helper() + + opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem)} + switch codec { + case flatbuf.CompressionTypeLZ4_FRAME: + opts = append(opts, ipc.WithLZ4()) + case flatbuf.CompressionTypeZSTD: + opts = append(opts, ipc.WithZstd()) + } + + w := ipc.NewWriter(f, opts...) + defer w.Close() + + for i, rec := range recs { + err := w.Write(rec) + if err != nil { + t.Fatalf("could not write record[%d]: %v", i, err) + } + } + + err := w.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/go/arrow/ipc/cmd/arrow-cat/main_test.go b/go/arrow/ipc/cmd/arrow-cat/main_test.go index a940d8b807e..dd42438d993 100644 --- a/go/arrow/ipc/cmd/arrow-cat/main_test.go +++ b/go/arrow/ipc/cmd/arrow-cat/main_test.go @@ -283,7 +283,7 @@ record 3... }, { name: "primitives", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "bools": [true (null) (null) false true] col[1] "int8s": [-1 (null) (null) -4 -5] @@ -333,7 +333,7 @@ record 2... }, { name: "structs", - want: `version: V4 + want: `version: V5 record 1/2... col[0] "struct_nullable": {[-1 (null) (null) -4 -5 -11 (null) (null) -14 -15 -21 (null) (null) -24 -25 -31 (null) (null) -34 -35 -41 (null) (null) -44 -45] ["111" (null) (null) "444" "555" "1111" (null) (null) "1444" "1555" "2111" (null) (null) "2444" "2555" "3111" (null) (null) "3444" "3555" "4111" (null) (null) "4444" "4555"]} record 2/2... @@ -355,7 +355,7 @@ record 4... }, { name: "lists", - want: `version: V4 + want: `version: V5 record 1/4... col[0] "list_nullable": [[1 (null) (null) 4 5] [11 (null) (null) 14 15] [21 (null) (null) 24 25]] record 2/4... @@ -382,7 +382,7 @@ record 3... }, { name: "strings", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "strings": ["1é" (null) (null) "4" "5"] col[1] "bytes": ["1é" (null) (null) "4" "5"] @@ -407,7 +407,7 @@ record 3... }, { name: "fixed_size_lists", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "fixed_size_list_nullable": [[1 (null) 3] [11 (null) 13] [21 (null) 23]] record 2/3... @@ -459,7 +459,7 @@ record 3... }, { name: "fixed_width_types", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "float16s": [1 (null) (null) 4 5] col[1] "time32ms": [-2 (null) (null) 1 2] @@ -511,7 +511,7 @@ record 3... }, { name: "fixed_size_binaries", - want: `version: V4 + want: `version: V5 record 1/3... col[0] "fixed_size_binary_3": ["001" (null) (null) "004" "005"] record 2/3... diff --git a/go/arrow/ipc/cmd/arrow-ls/main_test.go b/go/arrow/ipc/cmd/arrow-ls/main_test.go index 5d6ac92649e..1ea95802620 100644 --- a/go/arrow/ipc/cmd/arrow-ls/main_test.go +++ b/go/arrow/ipc/cmd/arrow-ls/main_test.go @@ -207,7 +207,7 @@ records: 3 }, { name: "primitives", - want: `version: V4 + want: `version: V5 schema: fields: 11 - bools: type=bool, nullable @@ -236,7 +236,7 @@ records: 2 }, { name: "structs", - want: `version: V4 + want: `version: V5 schema: fields: 1 - struct_nullable: type=struct, nullable @@ -254,7 +254,7 @@ records: 4 }, { name: "lists", - want: `version: V4 + want: `version: V5 schema: fields: 1 - list_nullable: type=list, nullable @@ -272,7 +272,7 @@ records: 3 }, { name: "fixed_size_binaries", - want: `version: V4 + want: `version: V5 schema: fields: 1 - fixed_size_binary_3: type=fixed_size_binary[3], nullable diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go new file mode 100644 index 00000000000..d7c40927117 --- /dev/null +++ b/go/arrow/ipc/compression.go @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipc + +import ( + "io" + + "github.com/apache/arrow/go/arrow/internal/flatbuf" + "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4" +) + +type compressor interface { + MaxCompressedLen(n int) int + Reset(io.Writer) + io.WriteCloser + Type() flatbuf.CompressionType +} + +type lz4Compressor struct { + *lz4.Writer +} + +func (lz4Compressor) MaxCompressedLen(n int) int { + return lz4.CompressBlockBound(n) +} + +func (lz4Compressor) Type() flatbuf.CompressionType { + return flatbuf.CompressionTypeLZ4_FRAME +} + +type zstdCompressor struct { + *zstd.Encoder +} + +// from zstd.h +func (zstdCompressor) MaxCompressedLen(len int) int { + extra := ((128 << 10) - len) >> 11 + if len >= (128 << 10) { + extra = 0 + } + return len + (len >> 8) + extra +} + +func (zstdCompressor) Type() flatbuf.CompressionType { + return flatbuf.CompressionTypeZSTD +} + +func getCompressor(codec flatbuf.CompressionType) compressor { + switch codec { + case flatbuf.CompressionTypeLZ4_FRAME: + return lz4Compressor{lz4.NewWriter(nil)} + case flatbuf.CompressionTypeZSTD: + enc, err := zstd.NewWriter(nil) + if err != nil { + panic(err) + } + return zstdCompressor{enc} + } + return nil +} + +type decompressor interface { + io.Reader + Reset(io.Reader) +} + +type zstdDecompressor struct { + *zstd.Decoder +} + +func (z *zstdDecompressor) Reset(r io.Reader) { + if err := z.Decoder.Reset(r); err != nil { + panic(err) + } +} + +func getDecompressor(codec flatbuf.CompressionType) decompressor { + switch codec { + case flatbuf.CompressionTypeLZ4_FRAME: + return lz4.NewReader(nil) + case flatbuf.CompressionTypeZSTD: + dec, err := zstd.NewReader(nil) + if err != nil { + panic(err) + } + return &zstdDecompressor{dec} + } + return nil +} diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index cf324482018..504deb80735 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -312,16 +312,23 @@ func (f *FileReader) ReadAt(i int64) (array.Record, error) { func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) array.Record { var ( - msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) - md flatbuf.RecordBatch + msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) + md flatbuf.RecordBatch + codec decompressor ) initFB(&md, msg.Header) rows := md.Length() + bodyCompress := md.Compression(nil) + if bodyCompress != nil { + codec = getDecompressor(bodyCompress.Codec()) + } + ctx := &arrayLoaderContext{ src: ipcSource{ - meta: &md, - r: body, + meta: &md, + r: body, + codec: codec, }, max: kMaxNestingDepth, } @@ -335,8 +342,9 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, body ReadAtSeeker) arr } type ipcSource struct { - meta *flatbuf.RecordBatch - r ReadAtSeeker + meta *flatbuf.RecordBatch + r ReadAtSeeker + codec decompressor } func (src *ipcSource) buffer(i int) *memory.Buffer { @@ -348,10 +356,26 @@ func (src *ipcSource) buffer(i int) *memory.Buffer { return memory.NewBufferBytes(nil) } - raw := make([]byte, buf.Length()) - _, err := src.r.ReadAt(raw, buf.Offset()) - if err != nil { - panic(err) + var raw []byte + if src.codec == nil { + raw = make([]byte, buf.Length()) + _, err := src.r.ReadAt(raw, buf.Offset()) + if err != nil { + panic(err) + } + } else { + sr := io.NewSectionReader(src.r, buf.Offset(), buf.Length()) + var uncompressedSize uint64 + if err := binary.Read(sr, binary.LittleEndian, &uncompressedSize); err != nil { + panic(err) + } + src.codec.Reset(sr) + + raw = make([]byte, uncompressedSize) + _, err := io.ReadFull(src.codec, raw) + if err != nil { + panic(err) + } } return memory.NewBufferBytes(raw) diff --git a/go/arrow/ipc/file_test.go b/go/arrow/ipc/file_test.go index d0ef9605e61..53915e51aed 100644 --- a/go/arrow/ipc/file_test.go +++ b/go/arrow/ipc/file_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/apache/arrow/go/arrow/internal/arrdata" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/memory" ) @@ -49,3 +50,34 @@ func TestFile(t *testing.T) { }) } } + +func TestFileCompressed(t *testing.T) { + tempDir, err := ioutil.TempDir("", "go-arrow-file-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + compressTypes := []flatbuf.CompressionType{ + flatbuf.CompressionTypeLZ4_FRAME, flatbuf.CompressionTypeZSTD, + } + + for _, codec := range compressTypes { + for name, recs := range arrdata.Records { + t.Run(name, func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + f, err := ioutil.TempFile(tempDir, "go-arrow-file-") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + arrdata.WriteFileCompressed(t, f, mem, recs[0].Schema(), recs, codec) + arrdata.CheckArrowFile(t, f, mem, recs[0].Schema(), recs) + arrdata.CheckArrowConcurrentFile(t, f, mem, recs[0].Schema(), recs) + }) + } + } +} diff --git a/go/arrow/ipc/file_writer.go b/go/arrow/ipc/file_writer.go index 112427020f4..cca70e04613 100644 --- a/go/arrow/ipc/file_writer.go +++ b/go/arrow/ipc/file_writer.go @@ -275,6 +275,7 @@ type FileWriter struct { pw PayloadWriter schema *arrow.Schema + codec flatbuf.CompressionType } // NewFileWriter opens an Arrow file using the provided writer w. @@ -289,6 +290,7 @@ func NewFileWriter(w io.WriteSeeker, opts ...Option) (*FileWriter, error) { pw: &pwriter{w: w, schema: cfg.schema, pos: -1}, mem: cfg.alloc, schema: cfg.schema, + codec: cfg.codec, } pos, err := f.w.Seek(0, io.SeekCurrent) @@ -332,7 +334,7 @@ func (f *FileWriter) Write(rec array.Record) error { const allow64b = true var ( data = Payload{msg: MessageRecordBatch} - enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b) + enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b, f.codec) ) defer data.Release() diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go index 4c0eb4b61e6..d273b34f380 100644 --- a/go/arrow/ipc/ipc.go +++ b/go/arrow/ipc/ipc.go @@ -21,6 +21,7 @@ import ( "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/arrio" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/memory" ) @@ -65,11 +66,13 @@ type config struct { footer struct { offset int64 } + codec flatbuf.CompressionType } func newConfig(opts ...Option) *config { cfg := &config{ alloc: memory.NewGoAllocator(), + codec: -1, // uncompressed } for _, opt := range opts { @@ -104,6 +107,22 @@ func WithSchema(schema *arrow.Schema) Option { } } +// WithLz4 tells the writer to use LZ4 Frame compression on the data +// buffers before writing. Requires >= Arrow 1.0.0 to read/decompress +func WithLZ4() Option { + return func(cfg *config) { + cfg.codec = flatbuf.CompressionTypeLZ4_FRAME + } +} + +// WithZstd tells the writer to use ZSTD compression on the data +// buffers before writing. Requires >= Arrow 1.0.0 to read/decompress +func WithZstd() Option { + return func(cfg *config) { + cfg.codec = flatbuf.CompressionTypeZSTD + } +} + var ( _ arrio.Reader = (*Reader)(nil) _ arrio.Writer = (*Writer)(nil) diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go index 76f81e4fa4a..50dbc7a7187 100644 --- a/go/arrow/ipc/message.go +++ b/go/arrow/ipc/message.go @@ -36,6 +36,7 @@ const ( MetadataV2 = MetadataVersion(flatbuf.MetadataVersionV2) // version for Arrow-0.2.0 MetadataV3 = MetadataVersion(flatbuf.MetadataVersionV3) // version for Arrow-0.3.0 to 0.7.1 MetadataV4 = MetadataVersion(flatbuf.MetadataVersionV4) // version for >= Arrow-0.8.0 + MetadataV5 = MetadataVersion(flatbuf.MetadataVersionV5) // version for >= Arrow-1.0.0, backward compatible with v4 ) func (m MetadataVersion) String() string { diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index ef9a9bbe457..aad884251f2 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -32,7 +32,7 @@ import ( var Magic = []byte("ARROW1") const ( - currentMetadataVersion = MetadataV4 + currentMetadataVersion = MetadataV5 minMetadataVersion = MetadataV4 kExtensionTypeKeyName = "arrow_extension_name" @@ -966,20 +966,28 @@ func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w io.Writer) return err } -func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata) *memory.Buffer { +func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec compressor) *memory.Buffer { b := flatbuffers.NewBuilder(0) - recFB := recordToFB(b, size, bodyLength, fields, meta) + recFB := recordToFB(b, size, bodyLength, fields, meta, codec) return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength) } -func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata) flatbuffers.UOffsetT { +func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec compressor) flatbuffers.UOffsetT { fieldsFB := writeFieldNodes(b, fields, flatbuf.RecordBatchStartNodesVector) metaFB := writeBuffers(b, meta, flatbuf.RecordBatchStartBuffersVector) + var bodyCompressFB flatbuffers.UOffsetT + if codec != nil { + bodyCompressFB = writeBodyCompression(b, codec.Type()) + } flatbuf.RecordBatchStart(b) flatbuf.RecordBatchAddLength(b, size) flatbuf.RecordBatchAddNodes(b, fieldsFB) flatbuf.RecordBatchAddBuffers(b, metaFB) + if codec != nil { + flatbuf.RecordBatchAddCompression(b, bodyCompressFB) + } + return flatbuf.RecordBatchEnd(b) } @@ -1006,6 +1014,13 @@ func writeBuffers(b *flatbuffers.Builder, buffers []bufferMetadata, start startV return b.EndVector(len(buffers)) } +func writeBodyCompression(b *flatbuffers.Builder, codec flatbuf.CompressionType) flatbuffers.UOffsetT { + flatbuf.BodyCompressionStart(b) + flatbuf.BodyCompressionAddCodec(b, codec) + flatbuf.BodyCompressionAddMethod(b, flatbuf.BodyCompressionMethodBUFFER) + return flatbuf.BodyCompressionEnd(b) +} + func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error) { var ( n int diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/stream_test.go index 596a3f97bfd..cb9d1bcbe41 100644 --- a/go/arrow/ipc/stream_test.go +++ b/go/arrow/ipc/stream_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/apache/arrow/go/arrow/internal/arrdata" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/memory" ) @@ -60,3 +61,44 @@ func TestStream(t *testing.T) { }) } } + +func TestStreamCompressed(t *testing.T) { + tempDir, err := ioutil.TempDir("", "go-arrow-stream-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + compressTypes := []flatbuf.CompressionType{ + flatbuf.CompressionTypeLZ4_FRAME, flatbuf.CompressionTypeZSTD, + } + + for _, codec := range compressTypes { + for name, recs := range arrdata.Records { + t.Run(name, func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + f, err := ioutil.TempFile(tempDir, "go-arrow-stream-") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + arrdata.WriteStreamCompressed(t, f, mem, recs[0].Schema(), recs, codec) + + err = f.Sync() + if err != nil { + t.Fatalf("could not sync data to disk: %v", err) + } + + _, err = f.Seek(0, io.SeekStart) + if err != nil { + t.Fatalf("could not seek to start: %v", err) + } + + arrdata.CheckArrowStream(t, f, mem, recs[0].Schema(), recs) + }) + } + } +} diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index c9a16a35457..8a8f8693aa2 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -17,12 +17,15 @@ package ipc // import "github.com/apache/arrow/go/arrow/ipc" import ( + "bytes" + "encoding/binary" "io" "math" "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" "github.com/apache/arrow/go/arrow/bitutil" + "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/apache/arrow/go/arrow/memory" "golang.org/x/xerrors" ) @@ -61,6 +64,7 @@ type Writer struct { started bool schema *arrow.Schema + codec flatbuf.CompressionType } // NewWriterWithPayloadWriter constructs a writer with the provided payload writer @@ -72,6 +76,7 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer { mem: cfg.alloc, pw: pw, schema: cfg.schema, + codec: cfg.codec, } } @@ -83,6 +88,7 @@ func NewWriter(w io.Writer, opts ...Option) *Writer { mem: cfg.alloc, pw: &swriter{w: w}, schema: cfg.schema, + codec: cfg.codec, } } @@ -123,7 +129,7 @@ func (w *Writer) Write(rec array.Record) error { const allow64b = true var ( data = Payload{msg: MessageRecordBatch} - enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b) + enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, w.codec) ) defer data.Release() @@ -160,17 +166,41 @@ type recordEncoder struct { depth int64 start int64 allow64b bool + codec compressor } -func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, allow64b bool) *recordEncoder { +func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, allow64b bool, codec flatbuf.CompressionType) *recordEncoder { return &recordEncoder{ mem: mem, start: startOffset, depth: maxDepth, allow64b: allow64b, + codec: getCompressor(codec), } } +func (w *recordEncoder) compressBodyBuffers(p *Payload) (err error) { + for idx := range p.body { + if p.body[idx] == nil || p.body[idx].Len() == 0 { + continue + } + var buf bytes.Buffer + buf.Grow(w.codec.MaxCompressedLen(p.body[idx].Len()) + arrow.Int64SizeBytes) + if err = binary.Write(&buf, binary.LittleEndian, uint64(p.body[idx].Len())); err != nil { + break + } + w.codec.Reset(&buf) + if _, err = w.codec.Write(p.body[idx].Bytes()); err != nil { + break + } + if err = w.codec.Close(); err != nil { + break + } + p.body[idx] = memory.NewBufferBytes(buf.Bytes()) + } + return +} + func (w *recordEncoder) Encode(p *Payload, rec array.Record) error { // perform depth-first traversal of the row-batch @@ -181,6 +211,10 @@ func (w *recordEncoder) Encode(p *Payload, rec array.Record) error { } } + if w.codec != nil { + w.compressBodyBuffers(p) + } + // position for the start of a buffer relative to the passed frame of reference. // may be 0 or some other position in an address space. offset := w.start @@ -434,7 +468,7 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr array.Interface) (*memory.B } func (w *recordEncoder) encodeMetadata(p *Payload, nrows int64) error { - p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta) + p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta, w.codec) return nil } From 80aa9228b9d28169a175c93eebc6e64c7e432ef5 Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Mon, 29 Mar 2021 20:12:37 -0400 Subject: [PATCH 2/8] fix offsets for compressions --- go/arrow/go.mod | 1 + go/arrow/go.sum | 2 ++ go/arrow/ipc/compression.go | 6 ++++-- go/arrow/ipc/writer.go | 4 +++- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/go/arrow/go.mod b/go/arrow/go.mod index e2edcc997b2..5e7915fa194 100644 --- a/go/arrow/go.mod +++ b/go/arrow/go.mod @@ -25,6 +25,7 @@ require ( github.com/google/flatbuffers v1.11.0 github.com/klauspost/compress v1.11.13 github.com/pierrec/lz4 v2.6.0+incompatible + github.com/pierrec/lz4/v4 v4.1.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.2.0 golang.org/x/net v0.0.0-20200904194848-62affa334b73 // indirect diff --git a/go/arrow/go.sum b/go/arrow/go.sum index 7cfe4446da3..5743321c526 100644 --- a/go/arrow/go.sum +++ b/go/arrow/go.sum @@ -42,6 +42,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.4 h1:PjkB+qEooc9nw4F6Pxe/e0xaRdWz3suItXWxWqAO1QE= +github.com/pierrec/lz4/v4 v4.1.4/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go index d7c40927117..843327a4481 100644 --- a/go/arrow/ipc/compression.go +++ b/go/arrow/ipc/compression.go @@ -21,7 +21,7 @@ import ( "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/klauspost/compress/zstd" - "github.com/pierrec/lz4" + "github.com/pierrec/lz4/v4" ) type compressor interface { @@ -63,7 +63,9 @@ func (zstdCompressor) Type() flatbuf.CompressionType { func getCompressor(codec flatbuf.CompressionType) compressor { switch codec { case flatbuf.CompressionTypeLZ4_FRAME: - return lz4Compressor{lz4.NewWriter(nil)} + w := lz4.NewWriter(nil) + w.Apply(lz4.ConcurrencyOption(1), lz4.ChecksumOption(false), lz4.BlockSizeOption(lz4.Block64Kb)) + return &lz4Compressor{w} case flatbuf.CompressionTypeZSTD: enc, err := zstd.NewWriter(nil) if err != nil { diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index 8a8f8693aa2..73a9239095a 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -233,7 +233,9 @@ func (w *recordEncoder) Encode(p *Payload, rec array.Record) error { } w.meta[i] = bufferMetadata{ Offset: offset, - Len: size + padding, + // even though we add padding, we need the Len to be correct + // so that decompressing works properly. + Len: size, } offset += size + padding } From 9398402144add36b1320342949493e5c7a93f231 Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Thu, 1 Apr 2021 09:32:59 -0400 Subject: [PATCH 3/8] remove blocks on integration tests for go, update docs. --- dev/archery/archery/integration/runner.py | 1 - docs/source/status.rst | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 87140b21fe9..8aef1637490 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -134,7 +134,6 @@ def _gold_tests(self, gold_dir): skip.add("JS") skip.add("Rust") if prefix == '2.0.0-compression': - skip.add("Go") skip.add("JS") skip.add("Rust") diff --git a/docs/source/status.rst b/docs/source/status.rst index 1c6262274e6..acf5af90d52 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -42,7 +42,7 @@ Data Types +-------------------+-------+-------+-------+------------+-------+-------+-------+ | Float32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ -| Decimal128 | ✓ | ✓ | | | ✓ | ✓ | ✓ | +| Decimal128 | ✓ | ✓ | ✓ | | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ | Decimal256 | ✓ | ✓ | | | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ @@ -126,7 +126,7 @@ IPC Format +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Sparse tensors | ✓ | | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ -| Buffer compression | ✓ | ✓ (3) | | | | | ✓ | +| Buffer compression | ✓ | ✓ (3) | ✓ | | | | ✓ | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Endianness conversion | ✓ (2) | | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ @@ -152,13 +152,13 @@ Flight RPC | Flight RPC Feature | C++ | Java | Go | JavaScript | C# | Rust | Julia | | | | | | | | | | +=============================+=======+=======+=======+============+=======+=======+=======+ -| gRPC transport | ✓ | ✓ | | | ✓ (1) | | | +| gRPC transport | ✓ | ✓ | ✓ | | ✓ (1) | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ -| gRPC + TLS transport | ✓ | ✓ | | | ✓ | | | +| gRPC + TLS transport | ✓ | ✓ | ✓ | | ✓ | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | RPC error codes | ✓ | ✓ | | | ✓ | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ -| Authentication handlers | ✓ | ✓ | | | ✓ (2) | | | +| Authentication handlers | ✓ | ✓ | ✓ | | ✓ (2) | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Custom client middleware | ✓ | ✓ | | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ From 75f2bf9c26d2e5261645d9aea28dbd63db01babb Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Sun, 4 Apr 2021 12:29:16 -0400 Subject: [PATCH 4/8] changes per feedback --- go/arrow/internal/arrdata/ioutil.go | 4 ++++ go/arrow/ipc/compression.go | 9 +++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go index c501a4161ec..1e8e695c6fe 100644 --- a/go/arrow/internal/arrdata/ioutil.go +++ b/go/arrow/internal/arrdata/ioutil.go @@ -183,6 +183,8 @@ func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema opts = append(opts, ipc.WithLZ4()) case flatbuf.CompressionTypeZSTD: opts = append(opts, ipc.WithZstd()) + default: + panic("invalid compression codec, only LZ4_FRAME or ZSTD is allowed") } w, err := ipc.NewFileWriter(f, opts...) @@ -245,6 +247,8 @@ func WriteStreamCompressed(t *testing.T, f *os.File, mem memory.Allocator, schem opts = append(opts, ipc.WithLZ4()) case flatbuf.CompressionTypeZSTD: opts = append(opts, ipc.WithZstd()) + default: + panic("invalid compression codec, only LZ4_FRAME or ZSTD is allowed") } w := ipc.NewWriter(f, opts...) diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go index 843327a4481..00290fb3553 100644 --- a/go/arrow/ipc/compression.go +++ b/go/arrow/ipc/compression.go @@ -47,13 +47,13 @@ type zstdCompressor struct { *zstd.Encoder } -// from zstd.h +// from zstd.h, ZSTD_COMPRESSBOUND func (zstdCompressor) MaxCompressedLen(len int) int { - extra := ((128 << 10) - len) >> 11 + extra := uint((uint(128<<10) - uint(len)) >> 11) if len >= (128 << 10) { extra = 0 } - return len + (len >> 8) + extra + return int(uint(len+(len>>8)) + extra) } func (zstdCompressor) Type() flatbuf.CompressionType { @@ -64,7 +64,8 @@ func getCompressor(codec flatbuf.CompressionType) compressor { switch codec { case flatbuf.CompressionTypeLZ4_FRAME: w := lz4.NewWriter(nil) - w.Apply(lz4.ConcurrencyOption(1), lz4.ChecksumOption(false), lz4.BlockSizeOption(lz4.Block64Kb)) + // options here chosen in order to match the C++ implementation + w.Apply(lz4.ChecksumOption(false), lz4.BlockSizeOption(lz4.Block64Kb)) return &lz4Compressor{w} case flatbuf.CompressionTypeZSTD: enc, err := zstd.NewWriter(nil) From beba83805410c75156e66497a64e96430ace08c5 Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Sun, 4 Apr 2021 12:50:40 -0400 Subject: [PATCH 5/8] add comments about the calls to Seek to start --- go/arrow/internal/arrdata/ioutil.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go index 1e8e695c6fe..ba6627c24e2 100644 --- a/go/arrow/internal/arrdata/ioutil.go +++ b/go/arrow/internal/arrdata/ioutil.go @@ -167,6 +167,9 @@ func WriteFile(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Sch t.Fatalf("could not sync data to disk: %v", err) } + // put the cursor back at the start of the file before returning rather than + // leaving it at the end so the reader can just start reading from the handle + // immediately for the test. _, err = f.Seek(0, io.SeekStart) if err != nil { t.Fatalf("could not seek to start: %v", err) @@ -210,6 +213,9 @@ func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema t.Fatalf("could not sync data to disk: %v", err) } + // put the cursor back at the start of the file before returning rather than + // leaving it at the end so the reader can just start reading from the handle + // immediately for the test. _, err = f.Seek(0, io.SeekStart) if err != nil { t.Fatalf("could not seek to start: %v", err) From d30361febbab7dbe4e5936c622f309eb6d97a61a Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Sun, 4 Apr 2021 14:19:27 -0400 Subject: [PATCH 6/8] handle uncompressed buffers, parallel compression --- go/arrow/internal/arrdata/ioutil.go | 8 +- go/arrow/ipc/file_reader.go | 19 +++-- go/arrow/ipc/file_writer.go | 18 +++-- go/arrow/ipc/ipc.go | 11 ++- go/arrow/ipc/metadata.go | 10 +-- go/arrow/ipc/stream_test.go | 61 +++++++------- go/arrow/ipc/writer.go | 121 +++++++++++++++++++++------- 7 files changed, 167 insertions(+), 81 deletions(-) diff --git a/go/arrow/internal/arrdata/ioutil.go b/go/arrow/internal/arrdata/ioutil.go index ba6627c24e2..dfafa4e7fac 100644 --- a/go/arrow/internal/arrdata/ioutil.go +++ b/go/arrow/internal/arrdata/ioutil.go @@ -187,7 +187,7 @@ func WriteFileCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema case flatbuf.CompressionTypeZSTD: opts = append(opts, ipc.WithZstd()) default: - panic("invalid compression codec, only LZ4_FRAME or ZSTD is allowed") + t.Fatalf("invalid compression codec %v, only LZ4_FRAME or ZSTD is allowed", codec) } w, err := ipc.NewFileWriter(f, opts...) @@ -244,17 +244,17 @@ func WriteStream(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.S // WriteStreamCompressed writes a list of records to the given file descriptor as an ARROW stream // using the provided compression type. -func WriteStreamCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType) { +func WriteStreamCompressed(t *testing.T, f *os.File, mem memory.Allocator, schema *arrow.Schema, recs []array.Record, codec flatbuf.CompressionType, np int) { t.Helper() - opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem)} + opts := []ipc.Option{ipc.WithSchema(schema), ipc.WithAllocator(mem), ipc.WithCompressConcurrency(np)} switch codec { case flatbuf.CompressionTypeLZ4_FRAME: opts = append(opts, ipc.WithLZ4()) case flatbuf.CompressionTypeZSTD: opts = append(opts, ipc.WithZstd()) default: - panic("invalid compression codec, only LZ4_FRAME or ZSTD is allowed") + t.Fatalf("invalid compression codec %v, only LZ4_FRAME or ZSTD is allowed", codec) } w := ipc.NewWriter(f, opts...) diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index 504deb80735..3b38f1bf628 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -366,14 +366,23 @@ func (src *ipcSource) buffer(i int) *memory.Buffer { } else { sr := io.NewSectionReader(src.r, buf.Offset(), buf.Length()) var uncompressedSize uint64 - if err := binary.Read(sr, binary.LittleEndian, &uncompressedSize); err != nil { + + err := binary.Read(sr, binary.LittleEndian, &uncompressedSize) + if err != nil { panic(err) } - src.codec.Reset(sr) - raw = make([]byte, uncompressedSize) - _, err := io.ReadFull(src.codec, raw) - if err != nil { + var r io.Reader = sr + // check for an uncompressed buffer + if int64(uncompressedSize) != -1 { + raw = make([]byte, uncompressedSize) + src.codec.Reset(sr) + r = src.codec + } else { + raw = make([]byte, buf.Length()) + } + + if _, err = io.ReadFull(r, raw); err != nil { panic(err) } } diff --git a/go/arrow/ipc/file_writer.go b/go/arrow/ipc/file_writer.go index cca70e04613..1b6c4fbf3d9 100644 --- a/go/arrow/ipc/file_writer.go +++ b/go/arrow/ipc/file_writer.go @@ -274,8 +274,9 @@ type FileWriter struct { pw PayloadWriter - schema *arrow.Schema - codec flatbuf.CompressionType + schema *arrow.Schema + codec flatbuf.CompressionType + compressNP int } // NewFileWriter opens an Arrow file using the provided writer w. @@ -286,11 +287,12 @@ func NewFileWriter(w io.WriteSeeker, opts ...Option) (*FileWriter, error) { ) f := FileWriter{ - w: w, - pw: &pwriter{w: w, schema: cfg.schema, pos: -1}, - mem: cfg.alloc, - schema: cfg.schema, - codec: cfg.codec, + w: w, + pw: &pwriter{w: w, schema: cfg.schema, pos: -1}, + mem: cfg.alloc, + schema: cfg.schema, + codec: cfg.codec, + compressNP: cfg.compressNP, } pos, err := f.w.Seek(0, io.SeekCurrent) @@ -334,7 +336,7 @@ func (f *FileWriter) Write(rec array.Record) error { const allow64b = true var ( data = Payload{msg: MessageRecordBatch} - enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b, f.codec) + enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b, f.codec, f.compressNP) ) defer data.Release() diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go index d273b34f380..91d6be111b4 100644 --- a/go/arrow/ipc/ipc.go +++ b/go/arrow/ipc/ipc.go @@ -66,7 +66,8 @@ type config struct { footer struct { offset int64 } - codec flatbuf.CompressionType + codec flatbuf.CompressionType + compressNP int } func newConfig(opts ...Option) *config { @@ -123,6 +124,14 @@ func WithZstd() Option { } } +// WithCompressConcurrency specifies a number of goroutines to spin up for +// concurrent compression of the body buffers when writing compress IPC records +func WithCompressConcurrency(n int) Option { + return func(cfg *config) { + cfg.compressNP = n + } +} + var ( _ arrio.Reader = (*Reader)(nil) _ arrio.Writer = (*Writer)(nil) diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index aad884251f2..5f335f10890 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -966,25 +966,25 @@ func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w io.Writer) return err } -func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec compressor) *memory.Buffer { +func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) *memory.Buffer { b := flatbuffers.NewBuilder(0) recFB := recordToFB(b, size, bodyLength, fields, meta, codec) return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength) } -func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec compressor) flatbuffers.UOffsetT { +func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType) flatbuffers.UOffsetT { fieldsFB := writeFieldNodes(b, fields, flatbuf.RecordBatchStartNodesVector) metaFB := writeBuffers(b, meta, flatbuf.RecordBatchStartBuffersVector) var bodyCompressFB flatbuffers.UOffsetT - if codec != nil { - bodyCompressFB = writeBodyCompression(b, codec.Type()) + if codec != -1 { + bodyCompressFB = writeBodyCompression(b, codec) } flatbuf.RecordBatchStart(b) flatbuf.RecordBatchAddLength(b, size) flatbuf.RecordBatchAddNodes(b, fieldsFB) flatbuf.RecordBatchAddBuffers(b, metaFB) - if codec != nil { + if codec != -1 { flatbuf.RecordBatchAddCompression(b, bodyCompressFB) } diff --git a/go/arrow/ipc/stream_test.go b/go/arrow/ipc/stream_test.go index cb9d1bcbe41..42e3709bff7 100644 --- a/go/arrow/ipc/stream_test.go +++ b/go/arrow/ipc/stream_test.go @@ -20,6 +20,7 @@ import ( "io" "io/ioutil" "os" + "strconv" "testing" "github.com/apache/arrow/go/arrow/internal/arrdata" @@ -73,32 +74,38 @@ func TestStreamCompressed(t *testing.T) { flatbuf.CompressionTypeLZ4_FRAME, flatbuf.CompressionTypeZSTD, } - for _, codec := range compressTypes { - for name, recs := range arrdata.Records { - t.Run(name, func(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer mem.AssertSize(t, 0) - - f, err := ioutil.TempFile(tempDir, "go-arrow-stream-") - if err != nil { - t.Fatal(err) - } - defer f.Close() - - arrdata.WriteStreamCompressed(t, f, mem, recs[0].Schema(), recs, codec) - - err = f.Sync() - if err != nil { - t.Fatalf("could not sync data to disk: %v", err) - } - - _, err = f.Seek(0, io.SeekStart) - if err != nil { - t.Fatalf("could not seek to start: %v", err) - } - - arrdata.CheckArrowStream(t, f, mem, recs[0].Schema(), recs) - }) - } + for np := 0; np < 3; np++ { + t.Run("compress concurrency "+strconv.Itoa(np), func(t *testing.T) { + for _, codec := range compressTypes { + t.Run(codec.String(), func(t *testing.T) { + for name, recs := range arrdata.Records { + t.Run(name, func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + f, err := ioutil.TempFile(tempDir, "go-arrow-stream-") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + arrdata.WriteStreamCompressed(t, f, mem, recs[0].Schema(), recs, codec, np) + + err = f.Sync() + if err != nil { + t.Fatalf("could not sync data to disk: %v", err) + } + + _, err = f.Seek(0, io.SeekStart) + if err != nil { + t.Fatalf("could not seek to start: %v", err) + } + + arrdata.CheckArrowStream(t, f, mem, recs[0].Schema(), recs) + }) + } + }) + } + }) } } diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index 73a9239095a..c4c75722610 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -18,9 +18,11 @@ package ipc // import "github.com/apache/arrow/go/arrow/ipc" import ( "bytes" + "context" "encoding/binary" "io" "math" + "sync" "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" @@ -62,9 +64,10 @@ type Writer struct { mem memory.Allocator pw PayloadWriter - started bool - schema *arrow.Schema - codec flatbuf.CompressionType + started bool + schema *arrow.Schema + codec flatbuf.CompressionType + compressNP int } // NewWriterWithPayloadWriter constructs a writer with the provided payload writer @@ -73,10 +76,11 @@ type Writer struct { func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer { cfg := newConfig(opts...) return &Writer{ - mem: cfg.alloc, - pw: pw, - schema: cfg.schema, - codec: cfg.codec, + mem: cfg.alloc, + pw: pw, + schema: cfg.schema, + codec: cfg.codec, + compressNP: cfg.compressNP, } } @@ -129,7 +133,7 @@ func (w *Writer) Write(rec array.Record) error { const allow64b = true var ( data = Payload{msg: MessageRecordBatch} - enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, w.codec) + enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, w.codec, w.compressNP) ) defer data.Release() @@ -163,42 +167,97 @@ type recordEncoder struct { fields []fieldMetadata meta []bufferMetadata - depth int64 - start int64 - allow64b bool - codec compressor + depth int64 + start int64 + allow64b bool + codec flatbuf.CompressionType + compressNP int } -func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, allow64b bool, codec flatbuf.CompressionType) *recordEncoder { +func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, allow64b bool, codec flatbuf.CompressionType, compressNP int) *recordEncoder { return &recordEncoder{ - mem: mem, - start: startOffset, - depth: maxDepth, - allow64b: allow64b, - codec: getCompressor(codec), + mem: mem, + start: startOffset, + depth: maxDepth, + allow64b: allow64b, + codec: codec, + compressNP: compressNP, } } -func (w *recordEncoder) compressBodyBuffers(p *Payload) (err error) { - for idx := range p.body { +func (w *recordEncoder) compressBodyBuffers(p *Payload) error { + compress := func(idx int, codec compressor) error { if p.body[idx] == nil || p.body[idx].Len() == 0 { - continue + return nil } var buf bytes.Buffer - buf.Grow(w.codec.MaxCompressedLen(p.body[idx].Len()) + arrow.Int64SizeBytes) - if err = binary.Write(&buf, binary.LittleEndian, uint64(p.body[idx].Len())); err != nil { - break + buf.Grow(codec.MaxCompressedLen(p.body[idx].Len()) + arrow.Int64SizeBytes) + if err := binary.Write(&buf, binary.LittleEndian, uint64(p.body[idx].Len())); err != nil { + return err } - w.codec.Reset(&buf) - if _, err = w.codec.Write(p.body[idx].Bytes()); err != nil { - break + codec.Reset(&buf) + if _, err := codec.Write(p.body[idx].Bytes()); err != nil { + return err } - if err = w.codec.Close(); err != nil { - break + if err := codec.Close(); err != nil { + return err } p.body[idx] = memory.NewBufferBytes(buf.Bytes()) + return nil + } + + if w.compressNP <= 1 { + codec := getCompressor(w.codec) + for idx := range p.body { + if err := compress(idx, codec); err != nil { + return err + } + } + return nil + } + + var ( + wg sync.WaitGroup + ch = make(chan int) + errch = make(chan error) + ctx, cancel = context.WithCancel(context.Background()) + ) + defer cancel() + + for i := 0; i < w.compressNP; i++ { + go func() { + defer wg.Done() + codec := getCompressor(w.codec) + for { + select { + case idx, ok := <-ch: + if !ok { + // we're done, channel is closed! + return + } + + if err := compress(idx, codec); err != nil { + errch <- err + cancel() + return + } + case <-ctx.Done(): + // cancelled, return early + return + } + } + }() + } + + for idx := range p.body { + ch <- idx } - return + + close(ch) + wg.Wait() + close(errch) + + return <-errch } func (w *recordEncoder) Encode(p *Payload, rec array.Record) error { @@ -211,7 +270,7 @@ func (w *recordEncoder) Encode(p *Payload, rec array.Record) error { } } - if w.codec != nil { + if w.codec != -1 { w.compressBodyBuffers(p) } From a2f04fd8140dc40f94c3d57618e3e54286e0c32d Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Sun, 4 Apr 2021 14:22:59 -0400 Subject: [PATCH 7/8] add concurrency defaults to comment. --- go/arrow/ipc/ipc.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go index 91d6be111b4..e688be2ec4b 100644 --- a/go/arrow/ipc/ipc.go +++ b/go/arrow/ipc/ipc.go @@ -125,7 +125,9 @@ func WithZstd() Option { } // WithCompressConcurrency specifies a number of goroutines to spin up for -// concurrent compression of the body buffers when writing compress IPC records +// concurrent compression of the body buffers when writing compress IPC records. +// If n <= 1 then compression will be done serially without goroutine +// parallelization. Default is 0. func WithCompressConcurrency(n int) Option { return func(cfg *config) { cfg.compressNP = n From 87d902b15ed8620cb54717f6ed32c16abddca542 Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Sun, 4 Apr 2021 16:52:48 -0400 Subject: [PATCH 8/8] fix the nits --- go/arrow/ipc/compression.go | 2 ++ go/arrow/ipc/ipc.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go index 00290fb3553..7b2502f87e0 100644 --- a/go/arrow/ipc/compression.go +++ b/go/arrow/ipc/compression.go @@ -19,6 +19,7 @@ package ipc import ( "io" + "github.com/apache/arrow/go/arrow/internal/debug" "github.com/apache/arrow/go/arrow/internal/flatbuf" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4/v4" @@ -49,6 +50,7 @@ type zstdCompressor struct { // from zstd.h, ZSTD_COMPRESSBOUND func (zstdCompressor) MaxCompressedLen(len int) int { + debug.Assert(len >= 0, "MaxCompressedLen called with len less than 0") extra := uint((uint(128<<10) - uint(len)) >> 11) if len >= (128 << 10) { extra = 0 diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go index e688be2ec4b..fa114c9500d 100644 --- a/go/arrow/ipc/ipc.go +++ b/go/arrow/ipc/ipc.go @@ -108,7 +108,7 @@ func WithSchema(schema *arrow.Schema) Option { } } -// WithLz4 tells the writer to use LZ4 Frame compression on the data +// WithLZ4 tells the writer to use LZ4 Frame compression on the data // buffers before writing. Requires >= Arrow 1.0.0 to read/decompress func WithLZ4() Option { return func(cfg *config) {