diff --git a/sdks/go/gogradle.lock b/sdks/go/gogradle.lock index c24b7e2a5e38..675475700df4 100644 --- a/sdks/go/gogradle.lock +++ b/sdks/go/gogradle.lock @@ -52,13 +52,6 @@ dependencies: vcs: "git" vendorPath: "vendor/github.com/coreos/bbolt" transitive: false - - urls: - - "https://github.com/etcd-io/etcd.git" - - "git@github.com:etcd-io/etcd.git" - vcs: "git" - name: "github.com/etcd-io/etcd" - commit: "11214aa33bf5a47d3d9d8dafe0f6b97237dfe921" - transitive: false - name: "github.com/coreos/go-semver" host: name: "github.com/etcd-io/etcd" @@ -144,6 +137,13 @@ dependencies: name: "github.com/eapache/queue" commit: "44cc805cf13205b55f69e14bcb69867d1ae92f98" transitive: false + - urls: + - "https://github.com/etcd-io/etcd.git" + - "git@github.com:etcd-io/etcd.git" + vcs: "git" + name: "github.com/etcd-io/etcd" + commit: "11214aa33bf5a47d3d9d8dafe0f6b97237dfe921" + transitive: false - urls: - "https://github.com/fsnotify/fsnotify.git" - "git@github.com:fsnotify/fsnotify.git" @@ -200,7 +200,7 @@ dependencies: - "git@github.com:golang/protobuf.git" vcs: "git" name: "github.com/golang/protobuf" - commit: "ed6926b37a637426117ccab59282c3839528a700" + commit: "d04d7b157bb510b1e0c10132224b616ac0e26b17" transitive: false - urls: - "https://github.com/golang/snappy.git" @@ -224,7 +224,7 @@ dependencies: - "git@github.com:google/go-cmp.git" vcs: "git" name: "github.com/google/go-cmp" - commit: "3af367b6b30c263d47e8895973edcca9a49cf029" + commit: "9680bfaf28748393e28e00238d94070fb9972fd8" transitive: false - urls: - "https://github.com/google/pprof.git" @@ -701,4 +701,21 @@ dependencies: vcs: "git" vendorPath: "vendor/gopkg.in/yaml.v2" transitive: false - test: [] + test: + - urls: + - "https://github.com/google/go-cmp.git" + - "git@github.com:google/go-cmp.git" + vcs: "git" + name: "github.com/google/go-cmp" + commit: "9680bfaf28748393e28e00238d94070fb9972fd8" + transitive: false + - vcs: "git" + name: "golang.org/x/xerrors" + commit: "5ec99f83aff198f5fbd629d6c8d8eb38a04218ca" + url: "https://go.googlesource.com/xerrors" + transitive: false + - vcs: "git" + name: "google.golang.org/protobuf" + commit: "d165be301fb1e13390ad453281ded24385fd8ebc" + url: "https://go.googlesource.com/protobuf" + transitive: false diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go index ac724b38725b..40ed972a7967 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go @@ -28,28 +28,169 @@ package schema import ( "fmt" "reflect" + "strconv" "strings" + "sync/atomic" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" ) +var lastShortID int64 + +// TODO(BEAM-9615): Replace with UUIDs. +func getNextID() string { + id := atomic.AddInt64(&lastShortID, 1) + // No reason not to use the smallest string short ids possible. + return strconv.FormatInt(id, 36) +} + +var ( + // Maps types to schemas for reuse, caching the UUIDs. + typeToSchema = map[reflect.Type]*pipepb.Schema{} + // Maps synthetic types to user types. Keys must be generated from a schema. + // This works around using the generated type assertion shims failing to type assert. + // Type assertion isn't assignability, which is closer to how the reflection based + // shims operate. + // User types are mapped to themselves to also signify they've been registered. + syntheticToUser = map[reflect.Type]reflect.Type{} +) + +// Registered returns whether the given type has been registered with +// the schema package. +func Registered(ut reflect.Type) bool { + _, ok := syntheticToUser[ut] + return ok +} + +// RegisterType converts the type to it's schema representation, and converts it back to +// a synthetic type so we can map from the synthetic type back to the user type. +// Recursively registers other named struct types in any component parts. +func RegisterType(ut reflect.Type) { + registerType(ut, map[reflect.Type]struct{}{}) +} + +func registerType(ut reflect.Type, seen map[reflect.Type]struct{}) { + if _, ok := syntheticToUser[ut]; ok { + return + } + if _, ok := seen[ut]; ok { + return // already processed in this pass, don't reprocess. + } + seen[ut] = struct{}{} + + // Lets do some recursion to register fundamental type parts. + t := ut + switch t.Kind() { + case reflect.Map: + registerType(t.Key(), seen) + fallthrough + case reflect.Array, reflect.Slice, reflect.Ptr: + registerType(t.Elem(), seen) + return + case reflect.Struct: // What we expect here. + default: + return + } + runtime.RegisterType(ut) + + for i := 0; i < t.NumField(); i++ { + sf := ut.Field(i) + registerType(sf.Type, seen) + } + + schm, err := FromType(ut) + if err != nil { + panic(errors.WithContextf(err, "converting %v to schema", ut)) + } + synth, err := ToType(schm) + if err != nil { + panic(errors.WithContextf(err, "converting %v's back to a synthetic type", ut)) + } + synth = reflectx.SkipPtr(synth) + ut = reflectx.SkipPtr(ut) + syntheticToUser[synth] = ut + syntheticToUser[reflect.PtrTo(synth)] = reflect.PtrTo(ut) + syntheticToUser[ut] = ut + syntheticToUser[reflect.PtrTo(ut)] = reflect.PtrTo(ut) +} + // FromType returns a Beam Schema of the passed in type. // Returns an error if the type cannot be converted to a Schema. func FromType(ot reflect.Type) (*pipepb.Schema, error) { - t := ot // keep the original type for errors. - // The top level schema for a pointer to struct and the struct is the same. - if t.Kind() == reflect.Ptr { - t = t.Elem() - } - if t.Kind() != reflect.Struct { + if reflectx.SkipPtr(ot).Kind() != reflect.Struct { return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot) } - return structToSchema(t) + schm, err := structToSchema(ot) + if err != nil { + return nil, err + } + if ot.Kind() == reflect.Ptr { + schm.Options = append(schm.Options, &pipepb.Option{ + Name: optGoNillable, + }) + } + return schm, nil +} + +// Schema Option urns. +const ( + // optGoNillable indicates that this top level schema should be returned as a pointer type. + optGoNillable = "beam:schema:go:nillable:v1" + // optGoInt indicates that this field should be decoded to an int, rather than an int64. + optGoInt = "beam:schema:go:int:v1" + // Since maps, arrays, and iterables don't have options, we need additional options + // to handle plain go integers. + optGoIntKey = "beam:schema:go:intkey:v1" // For int map keys + optGoIntElem = "beam:schema:go:intelem:v1" // For int values for maps,slices, and arrays +) + +func goIntOptions(t reflect.Type) []*pipepb.Option { + var opts []*pipepb.Option + switch t.Kind() { + case reflect.Int: + opts = append(opts, &pipepb.Option{ + Name: optGoInt, + }) + case reflect.Map: + if t.Key().Kind() == reflect.Int { + opts = append(opts, &pipepb.Option{ + Name: optGoIntKey, + }) + } + fallthrough + case reflect.Array, reflect.Slice: + if t.Elem().Kind() == reflect.Int { + opts = append(opts, &pipepb.Option{ + Name: optGoIntElem, + }) + } + } + return opts +} + +// nillableFromOptions converts the passed in type to it's pointer version +// if the option is present. This permits go types to be pointers. +func nillableFromOptions(opts []*pipepb.Option, t reflect.Type) reflect.Type { + return checkOptions(opts, optGoNillable, reflect.PtrTo(t)) } -func structToSchema(t reflect.Type) (*pipepb.Schema, error) { +func checkOptions(opts []*pipepb.Option, urn string, rt reflect.Type) reflect.Type { + for _, opt := range opts { + if opt.GetName() == urn { + return rt + } + } + return nil +} + +func structToSchema(ot reflect.Type) (*pipepb.Schema, error) { + if schm, ok := typeToSchema[ot]; ok { + return schm, nil + } + t := reflectx.SkipPtr(ot) fields := make([]*pipepb.Field, 0, t.NumField()) for i := 0; i < t.NumField(); i++ { f, err := structFieldToField(t.Field(i)) @@ -58,9 +199,13 @@ func structToSchema(t reflect.Type) (*pipepb.Schema, error) { } fields = append(fields, f) } - return &pipepb.Schema{ + + schm := &pipepb.Schema{ Fields: fields, - }, nil + Id: getNextID(), + } + typeToSchema[ot] = schm + return schm, nil } func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) { @@ -68,17 +213,18 @@ func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) { if tag := sf.Tag.Get("beam"); tag != "" { name, _ = parseTag(tag) } - ftype, err := reflectTypeToFieldType(sf.Type) + ftype, opts, err := reflectTypeToFieldType(sf.Type) if err != nil { return nil, err } return &pipepb.Field{ - Name: name, - Type: ftype, + Name: name, + Type: ftype, + Options: opts, }, nil } -func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) { +func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Option, error) { var isPtr bool t := ot if t.Kind() == reflect.Ptr { @@ -87,13 +233,13 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) { } switch t.Kind() { case reflect.Map: - kt, err := reflectTypeToFieldType(t.Key()) + kt, _, err := reflectTypeToFieldType(t.Key()) if err != nil { - return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot) + return nil, nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot) } - vt, err := reflectTypeToFieldType(t.Elem()) + vt, _, err := reflectTypeToFieldType(t.Elem()) if err != nil { - return nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot) + return nil, nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot) } return &pipepb.FieldType{ Nullable: isPtr, @@ -103,11 +249,11 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) { ValueType: vt, }, }, - }, nil + }, goIntOptions(t), nil case reflect.Struct: sch, err := structToSchema(t) if err != nil { - return nil, errors.Wrapf(err, "unable to convert %v to schema field", ot) + return nil, nil, errors.Wrapf(err, "unable to convert %v to schema field", ot) } return &pipepb.FieldType{ Nullable: isPtr, @@ -116,7 +262,7 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) { Schema: sch, }, }, - }, nil + }, nil, nil case reflect.Slice, reflect.Array: // Special handling for []byte if t == reflectx.ByteSlice { @@ -125,12 +271,13 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) { TypeInfo: &pipepb.FieldType_AtomicType{ AtomicType: pipepb.AtomicType_BYTES, }, - }, nil + }, nil, nil } - vt, err := reflectTypeToFieldType(t.Elem()) + vt, _, err := reflectTypeToFieldType(t.Elem()) if err != nil { - return nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot) + return nil, nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot) } + opts := goIntOptions(t) return &pipepb.FieldType{ Nullable: isPtr, TypeInfo: &pipepb.FieldType_ArrayType{ @@ -138,9 +285,9 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) { ElementType: vt, }, }, - }, nil + }, opts, nil case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64: - return nil, errors.Errorf("unable to convert unsupported type %v to schema", ot) + return nil, nil, errors.Errorf("unable to convert unsupported type %v to schema", ot) default: // must be an atomic type if enum, ok := reflectTypeToAtomicTypeMap[t.Kind()]; ok { return &pipepb.FieldType{ @@ -148,9 +295,9 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) { TypeInfo: &pipepb.FieldType_AtomicType{ AtomicType: enum, }, - }, nil + }, goIntOptions(t), nil } - return nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t) + return nil, nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t) } } @@ -178,20 +325,32 @@ func ToType(s *pipepb.Schema) (reflect.Type, error) { } fields = append(fields, rf) } - return reflect.StructOf(fields), nil + ret := reflect.StructOf(fields) + if ut, ok := syntheticToUser[ret]; ok { + ret = ut + } + if t := nillableFromOptions(s.GetOptions(), ret); t != nil { + return t, nil + } + return ret, nil } func fieldToStructField(sf *pipepb.Field) (reflect.StructField, error) { name := sf.GetName() - rt, err := fieldTypeToReflectType(sf.GetType()) + rt, err := fieldTypeToReflectType(sf.GetType(), sf.Options) if err != nil { return reflect.StructField{}, err } - return reflect.StructField{ + + rsf := reflect.StructField{ Name: strings.ToUpper(name[:1]) + name[1:], // Go field name must be capitalized for export and encoding. Type: rt, - Tag: reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name)), - }, nil + } + // Add a name tag if they don't match. + if name != rsf.Name { + rsf.Tag = reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name)) + } + return rsf, nil } var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{ @@ -206,7 +365,7 @@ var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{ pipepb.AtomicType_BYTES: reflectx.ByteSlice, } -func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) { +func fieldTypeToReflectType(sft *pipepb.FieldType, opts []*pipepb.Option) (reflect.Type, error) { var t reflect.Type switch sft.GetTypeInfo().(type) { case *pipepb.FieldType_AtomicType: @@ -214,21 +373,35 @@ func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) { if t, ok = atomicTypeToReflectType[sft.GetAtomicType()]; !ok { return nil, errors.Errorf("unknown atomic type: %v", sft.GetAtomicType()) } + // Handle duplicate type matchings. + if optT := checkOptions(opts, optGoInt, reflectx.Int); optT != nil { + t = optT + } case *pipepb.FieldType_ArrayType: - rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType()) + rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType(), nil) if err != nil { return nil, errors.Wrap(err, "unable to convert array element type") } + // Handle duplicate type matchings. + if optT := checkOptions(opts, optGoIntElem, reflectx.Int); optT != nil { + rt = optT + } t = reflect.SliceOf(rt) case *pipepb.FieldType_MapType: - kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType()) + kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType(), nil) if err != nil { return nil, errors.Wrap(err, "unable to convert map key type") } - vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType()) + if optT := checkOptions(opts, optGoIntKey, reflectx.Int); optT != nil { + kt = optT + } + vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType(), nil) if err != nil { return nil, errors.Wrap(err, "unable to convert map value type") } + if optT := checkOptions(opts, optGoIntElem, reflectx.Int); optT != nil { + vt = optT + } t = reflect.MapOf(kt, vt) // Panics for invalid map keys (slices/iterables) case *pipepb.FieldType_RowType: rt, err := ToType(sft.GetRowType().GetSchema()) @@ -239,8 +412,9 @@ func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) { // case *pipepb.FieldType_IterableType: // TODO(BEAM-9615): handle IterableTypes. - // case *pipepb.FieldType_LogicalType: + //case *pipepb.FieldType_LogicalType: // TODO(BEAM-9615): handle LogicalTypes types. + //sft.GetLogicalType(). // Logical Types are for things that have more specialized user representation already, or // things like Time or protocol buffers. diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go index f6b5ff77b4a1..741b79fb1fb5 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go @@ -20,10 +20,32 @@ import ( "reflect" "testing" + "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/testing/protocmp" ) +type registeredType struct { + A, B string + C bool +} + +type sRegisteredType struct { + D int32 +} + +type justAType struct { + A, B string + C int +} + +func init() { + runtime.RegisterType(reflect.TypeOf((*registeredType)(nil))) + RegisterType(reflect.TypeOf((*sRegisteredType)(nil))) +} + func TestSchemaConversion(t *testing.T) { tests := []struct { st *pipepb.Schema @@ -150,6 +172,177 @@ func TestSchemaConversion(t *testing.T) { rt: reflect.TypeOf(struct { Payloads [][]byte `beam:"payloads"` }{}), + }, { + st: &pipepb.Schema{ + Fields: []*pipepb.Field{ + &pipepb.Field{ + Name: "AString", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_STRING, + }, + }, + }, + &pipepb.Field{ + Name: "AnIntPtr", + Type: &pipepb.FieldType{ + Nullable: true, + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_INT32, + }, + }, + }, + }, + }, + rt: reflect.TypeOf(struct { + AString string + AnIntPtr *int32 + }{}), + }, { + st: &pipepb.Schema{ + Fields: []*pipepb.Field{ + &pipepb.Field{ + Name: "A", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_STRING, + }, + }, + }, + &pipepb.Field{ + Name: "B", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_STRING, + }, + }, + }, + &pipepb.Field{ + Name: "C", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_BOOLEAN, + }, + }, + }, + }, + }, + rt: reflect.TypeOf(registeredType{}), + }, { + st: &pipepb.Schema{ + Fields: []*pipepb.Field{ + &pipepb.Field{ + Name: "D", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_INT32, + }, + }, + }, + }, + }, + rt: reflect.TypeOf(sRegisteredType{}), + }, { + st: &pipepb.Schema{ + Fields: []*pipepb.Field{ + &pipepb.Field{ + Name: "A", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_STRING, + }, + }, + }, + &pipepb.Field{ + Name: "B", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_STRING, + }, + }, + }, + &pipepb.Field{ + Name: "C", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_INT64, + }, + }, + Options: []*pipepb.Option{{ + Name: optGoInt, + }}, + }, + }, + }, + rt: reflect.TypeOf(justAType{}), + }, { + st: &pipepb.Schema{ + Fields: []*pipepb.Field{ + { + Name: "Q", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_MapType{ + MapType: &pipepb.MapType{ + KeyType: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_INT64, + }, + }, + ValueType: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_INT64, + }, + }, + }, + }, + }, + Options: []*pipepb.Option{{ + Name: optGoIntKey, + }, { + Name: optGoIntElem, + }}, + }, { + Name: "T", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_ArrayType{ + ArrayType: &pipepb.ArrayType{ + ElementType: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_INT64, + }, + }, + }, + }, + }, + Options: []*pipepb.Option{{ + Name: optGoIntElem, + }}, + }, + }, + }, + rt: reflect.TypeOf(struct { + Q map[int]int + T []int + }{}), + }, { + st: &pipepb.Schema{ + Fields: []*pipepb.Field{ + { + Name: "SuperNES", + Type: &pipepb.FieldType{ + TypeInfo: &pipepb.FieldType_AtomicType{ + AtomicType: pipepb.AtomicType_INT16, + }, + }, + }, + }, + Options: []*pipepb.Option{{ + Name: optGoNillable, + }}, + }, + rt: reflect.TypeOf(&struct { + SuperNES int16 + }{}), }, } @@ -161,9 +354,11 @@ func TestSchemaConversion(t *testing.T) { if err != nil { t.Fatalf("error ToType(%v) = %v", test.st, err) } - - if d := cmp.Diff(reflect.New(test.rt).Elem().Interface(), reflect.New(got).Elem().Interface()); d != "" { - t.Errorf("diff (-want, +got): %v", d) + if !test.rt.AssignableTo(got) { + t.Errorf("%v not assignable to %v", test.rt, got) + if d := cmp.Diff(reflect.New(test.rt).Elem().Interface(), reflect.New(got).Elem().Interface()); d != "" { + t.Errorf("diff (-want, +got): %v", d) + } } } { @@ -171,8 +366,10 @@ func TestSchemaConversion(t *testing.T) { if err != nil { t.Fatalf("error FromType(%v) = %v", test.rt, err) } - - if d := cmp.Diff(test.st, got); d != "" { + if d := cmp.Diff(test.st, got, + protocmp.Transform(), + protocmp.IgnoreFields(proto.MessageV2(&pipepb.Schema{}), "id"), + ); d != "" { t.Errorf("diff (-want, +got): %v", d) }