diff --git a/bql/planner/planner.go b/bql/planner/planner.go index 1a3a0f7d..5f56c120 100644 --- a/bql/planner/planner.go +++ b/bql/planner/planner.go @@ -35,6 +35,7 @@ import ( "github.com/google/badwolf/storage" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/literal" + "github.com/google/badwolf/triple/predicate" ) // Executor interface unifies the execution of statements. @@ -140,7 +141,7 @@ type insertPlan struct { type updater func(storage.Graph, []*triple.Triple) error -func update(ctx context.Context, stm *semantic.Statement, gbs []string, store storage.Store, f updater) error { +func update(ctx context.Context, ts []*triple.Triple, gbs []string, store storage.Store, f updater) error { var ( mu sync.Mutex wg sync.WaitGroup @@ -161,7 +162,7 @@ func update(ctx context.Context, stm *semantic.Statement, gbs []string, store st appendError(err) return } - err = f(g, stm.Data()) + err = f(g, ts) if err != nil { appendError(err) } @@ -180,7 +181,7 @@ func (p *insertPlan) Execute(ctx context.Context) (*table.Table, error) { if err != nil { return nil, err } - return t, update(ctx, p.stm, p.stm.OutputGraphNames(), p.store, func(g storage.Graph, d []*triple.Triple) error { + return t, update(ctx, p.stm.Data(), p.stm.OutputGraphNames(), p.store, func(g storage.Graph, d []*triple.Triple) error { trace(p.tracer, func() []string { return []string{"Inserting triples to graph \"" + g.ID(ctx) + "\""} }) @@ -217,7 +218,7 @@ func (p *deletePlan) Execute(ctx context.Context) (*table.Table, error) { if err != nil { return nil, err } - return t, update(ctx, p.stm, p.stm.InputGraphNames(), p.store, func(g storage.Graph, d []*triple.Triple) error { + return t, update(ctx, p.stm.Data(), p.stm.InputGraphNames(), p.store, func(g storage.Graph, d []*triple.Triple) error { trace(p.tracer, func() []string { return []string{"Removing triples from graph \"" + g.ID(ctx) + "\""} }) @@ -587,7 +588,7 @@ func (p *queryPlan) projectAndGroupBy() error { return p.tbl.ProjectBindings(p.stm.OutputBindings()) } trace(p.tracer, func() []string { - return []string{"Starting roup reduce and projection"} + return []string{"Starting group reduce and projection"} }) // The table needs to be group reduced. // Project only binding involved in the group operation. @@ -632,7 +633,7 @@ func (p *queryPlan) projectAndGroupBy() error { case lexer.ItemSum: cell := p.tbl.Rows()[0][prj.Binding] if cell.L == nil { - return fmt.Errorf("cannot only sum int64 and float64 literals; found %s instead for binding %q", cell, prj.Binding) + return fmt.Errorf("can only sum int64 and float64 literals; found %s instead for binding %q", cell, prj.Binding) } switch cell.L.Type() { case literal.Int64: @@ -640,7 +641,7 @@ func (p *queryPlan) projectAndGroupBy() error { case literal.Float64: aap.Acc = table.NewSumFloat64LiteralAccumulator(0) default: - return fmt.Errorf("cannot only sum int64 and float64 literals; found literal type %s instead for binding %q", cell.L.Type(), prj.Binding) + return fmt.Errorf("can only sum int64 and float64 literals; found literal type %s instead for binding %q", cell.L.Type(), prj.Binding) } } aaps = append(aaps, aap) @@ -706,7 +707,7 @@ func (p *queryPlan) limit() { // Execute queries the indicated graphs. func (p *queryPlan) Execute(ctx context.Context) (*table.Table, error) { - // Fetch and catch graph instances. + // Fetch and cache graph instances. trace(p.tracer, func() []string { return []string{fmt.Sprintf("Caching graph instances for graphs %v", p.stm.InputGraphNames())} }) @@ -788,8 +789,239 @@ func (p *queryPlan) String() string { return b.String() } +// constructPlan encapsulates the sequence of instructions that need to be +// executed in order to satisfy the execution of a valid construct BQL statement. +type constructPlan struct { + stm *semantic.Statement + store storage.Store + tracer io.Writer + bulkSize int + queryPlan *queryPlan +} + +func (p *constructPlan) processConstructClause(cc *semantic.ConstructClause, tbl *table.Table, r table.Row) (*triple.Triple, error) { + var err error + sbj, prd, obj := cc.S, cc.P, cc.O + if sbj == nil && tbl.HasBinding(cc.SBinding) { + v, ok := r[cc.SBinding] + if !ok { + return nil, fmt.Errorf("row %+v misses binding %q", r, cc.SBinding) + } + if v.N == nil { + return nil, fmt.Errorf("binding %q requires a node, got %+v instead", cc.SBinding, v) + } + sbj = v.N + } + if prd == nil { + if tbl.HasBinding(cc.PBinding) { + // Try to bind the predicate. + v, ok := r[cc.PBinding] + if !ok { + return nil, fmt.Errorf("row %+v misses binding %q", r, cc.PBinding) + } + if v.P == nil { + return nil, fmt.Errorf("binding %q requires a predicate, got %+v instead", cc.PBinding, v) + } + prd = v.P + } else if cc.PTemporal && cc.PAnchorBinding != "" { + // Try to bind the predicate anchor. + v, ok := r[cc.PAnchorBinding] + if !ok { + return nil, fmt.Errorf("row %+v misses binding %q", r, cc.PAnchorBinding) + } + if v.T == nil { + return nil, fmt.Errorf("binding %q requires a time, got %+v instead", cc.PAnchorBinding, v) + } + prd, err = predicate.NewTemporal(cc.PID, *v.T) + if err != nil { + return nil, err + } + } + } + if obj == nil { + if tbl.HasBinding(cc.OBinding) { + // Try to bind the object + v, ok := r[cc.OBinding] + if !ok { + return nil, fmt.Errorf("row %+v misses binding %q", r, cc.OBinding) + } + co, err := cellToObject(v) + if err != nil { + return nil, err + } + obj = co + } else if cc.OTemporal && cc.OAnchorBinding != "" { + // Try to bind the object anchor. + v, ok := r[cc.OAnchorBinding] + if !ok { + return nil, fmt.Errorf("row %+v misses binding %q", r, cc.OAnchorBinding) + } + if v.T == nil { + return nil, fmt.Errorf("binding %q requires a time, got %+v instead", cc.OAnchorBinding, v) + } + op, err := predicate.NewTemporal(cc.OID, *v.T) + if err != nil { + return nil, err + } + obj = triple.NewPredicateObject(op) + } + } + t, err := triple.New(sbj, prd, obj) + return t, err +} + +func (p *constructPlan) processReificationClause(rc *semantic.ReificationClause, tbl *table.Table, r table.Row) (*predicate.Predicate, *triple.Object, error) { + var err error + rprd, robj := rc.P, rc.O + if rprd == nil { + if tbl.HasBinding(rc.PBinding) { + // Try to bind the predicate. + v, ok := r[rc.PBinding] + if !ok { + return nil, nil, fmt.Errorf("row %+v misses binding %q", r, rc.PBinding) + } + if v.P == nil { + return nil, nil, fmt.Errorf("binding %q requires a predicate, got %+v instead", rc.PBinding, v) + } + rprd = v.P + } else if rc.PTemporal && rc.PAnchorBinding != "" { + // Try to bind the predicate anchor. + v, ok := r[rc.PAnchorBinding] + if !ok { + return nil, nil, fmt.Errorf("row %+v misses binding %q", r, rc.PAnchorBinding) + } + if v.T == nil { + return nil, nil, fmt.Errorf("binding %q requires a time, got %+v instead", rc.PAnchorBinding, v) + } + rprd, err = predicate.NewTemporal(rc.PID, *v.T) + if err != nil { + return nil, nil, err + } + } + } + if robj == nil { + if tbl.HasBinding(rc.OBinding) { + // Try to bind the object + v, ok := r[rc.OBinding] + if !ok { + return nil, nil, fmt.Errorf("row %+v misses binding %q", r, rc.OBinding) + } + co, err := cellToObject(v) + if err != nil { + return nil, nil, err + } + robj = co + } else if rc.OTemporal && rc.OAnchorBinding != "" { + // Try to bind the object anchor. + v, ok := r[rc.OAnchorBinding] + if !ok { + return nil, nil, fmt.Errorf("row %+v misses binding %q", r, rc.OAnchorBinding) + } + if v.T == nil { + return nil, nil, fmt.Errorf("binding %q requires a time, got %+v instead", rc.OAnchorBinding, v) + } + rop, err := predicate.NewTemporal(rc.OID, *v.T) + if err != nil { + return nil, nil, err + } + robj = triple.NewPredicateObject(rop) + } + + } + return rprd, robj, nil +} + +func (p *constructPlan) Execute(ctx context.Context) (*table.Table, error) { + tbl, err := p.queryPlan.Execute(ctx) + if err != nil { + return nil, err + } + // The buffered channel has capacity to accommodate twice the amount of triples stored in a single call. + tripChan := make(chan *triple.Triple, 2*p.bulkSize) + done := make(chan bool) + + go func() { + var ts []*triple.Triple + updateFunc := func(g storage.Graph, d []*triple.Triple) error { + trace(p.tracer, func() []string { + return []string{"Inserting triples to graph \"" + g.ID(ctx) + "\""} + }) + return g.AddTriples(ctx, d) + } + for elem := range tripChan { + ts = append(ts, elem) + if len(ts) >= p.bulkSize { + update(ctx, ts, p.stm.OutputGraphNames(), p.store, updateFunc) + ts = []*triple.Triple{} + } + } + if len(ts) > 0 { + update(ctx, ts, p.stm.OutputGraphNames(), p.store, updateFunc) + } + done <- true + }() + + for _, cc := range p.stm.ConstructClauses() { + for _, r := range tbl.Rows() { + t, err := p.processConstructClause(cc, tbl, r) + if err != nil { + return nil, err + } + if len(cc.ReificationClauses()) > 0 { + // We need to reify a blank node. + rts, bn, err := t.Reify() + if err != nil { + fmt.Errorf("triple.Reify failed to reify %v with error %v", t, err) + return nil, err + } + for _, trpl := range rts[1:] { + tripChan <- trpl + } + for _, rc := range cc.ReificationClauses() { + rprd, robj, err := p.processReificationClause(rc, tbl, r) + if err != nil { + return nil, err + } + rt, err := triple.New(bn, rprd, robj) + if err != nil { + return nil, err + } + tripChan <- rt + } + + } else { + tripChan <- t + } + } + } + close(tripChan) + // Wait until all triples are added to the store. + <-done + + return tbl, nil +} + +// String returns a readable description of the execution plan. +func (p *constructPlan) String() string { + b := bytes.NewBufferString("CONSTRUCT plan:\n\n") + b.WriteString("Input graphs:\n") + for _, gn := range p.stm.InputGraphNames() { + b.WriteString(fmt.Sprintf("\t%v\n", gn)) + } + b.WriteString("Output graphs:\n") + for _, gn := range p.stm.OutputGraphNames() { + b.WriteString(fmt.Sprintf("\t%v\n", gn)) + } + b.WriteString("Construct clauses:\n") + for _, cc := range p.stm.ConstructClauses() { + b.WriteString(fmt.Sprintf("\t%v\n", cc)) + } + b.WriteString(fmt.Sprintf("\n%v", p.queryPlan.String())) + return b.String() +} + // New create a new executable plan given a semantic BQL statement. -func New(ctx context.Context, store storage.Store, stm *semantic.Statement, chanSize int, w io.Writer) (Executor, error) { +func New(ctx context.Context, store storage.Store, stm *semantic.Statement, chanSize, bulkSize int, w io.Writer) (Executor, error) { switch stm.Type() { case semantic.Query: return newQueryPlan(ctx, store, stm, chanSize, w) @@ -817,6 +1049,15 @@ func New(ctx context.Context, store storage.Store, stm *semantic.Statement, chan store: store, tracer: w, }, nil + case semantic.Construct: + qp, _ := newQueryPlan(ctx, store, stm, chanSize, w) + return &constructPlan{ + stm: stm, + store: store, + tracer: w, + bulkSize: bulkSize, + queryPlan: qp, + }, nil default: return nil, fmt.Errorf("planner.New: unknown statement type in statement %v", stm) } diff --git a/bql/planner/planner_test.go b/bql/planner/planner_test.go index a7374c1a..7795dfb7 100644 --- a/bql/planner/planner_test.go +++ b/bql/planner/planner_test.go @@ -16,6 +16,7 @@ package planner import ( "bytes" + "fmt" "strings" "testing" @@ -30,6 +31,56 @@ import ( "github.com/google/badwolf/triple/literal" ) +const ( + originalTriples = `/u "parent_of"@[] /u + /u "parent_of"@[] /u + /u "parent_of"@[] /u + /u "parent_of"@[] /u + /u "bought"@[2016-01-01T00:00:00-08:00] /c + /u "bought"@[2016-02-01T00:00:00-08:00] /c + /u "bought"@[2016-03-01T00:00:00-08:00] /c + /u "bought"@[2016-04-01T00:00:00-08:00] /c + /c "is_a"@[] /t + /c "is_a"@[] /t + /c "is_a"@[] /t + /c "is_a"@[] /t + /l "predicate"@[] "turned"@[2016-01-01T00:00:00-08:00] + /l "predicate"@[] "turned"@[2016-02-01T00:00:00-08:00] + /l "predicate"@[] "turned"@[2016-03-01T00:00:00-08:00] + /l "predicate"@[] "turned"@[2016-04-01T00:00:00-08:00] + ` + + tripleFromIssue40 = `/room "connects_to"@[] /room + /room "connects_to"@[] /room + /room "connects_to"@[] /room + /room "connects_to"@[] /room + /room "connects_to"@[] /room + /room "connects_to"@[] /room + /room "connects_to"@[] /room + /room "connects_to"@[] /room + /item/book<000> "in"@[2016-04-10T4:21:00.000000000Z] /room + /item/book<000> "in"@[2016-04-10T4:23:00.000000000Z] /room + /item/book<000> "in"@[2016-04-10T4:25:00.000000000Z] /room + ` + + constructTestSrcTriples = `/person "met"@[] /person + /person "met"@[] /person + /person "met"@[] /person + /person "met_at"@[2016-04-10T4:25:00.000000000Z] /person + /person "met_at"@[2016-04-10T4:25:00.000000000Z] /person + /city "is_connected_to"@[] /city + /city "is_connected_to"@[] /city + /city "is_connected_to"@[] /city + /city "is_connected_to"@[] /city + /city "is_connected_to"@[] /city + ` + + constructTestDestTriples = `/person "met"@[] /person + ` + + testTriples = originalTriples + tripleFromIssue40 +) + func insertAndDeleteTest(t *testing.T) { ctx := context.Background() @@ -45,7 +96,7 @@ func insertAndDeleteTest(t *testing.T) { if err = p.Parse(grammar.NewLLk(bql, 1), stm); err != nil { t.Errorf("Parser.consume: failed to accept BQL %q with error %v", bql, err) } - pln, err := New(ctx, memory.DefaultStore, stm, 0, nil) + pln, err := New(ctx, memory.DefaultStore, stm, 0, 10, nil) if err != nil { t.Errorf("planner.New: should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err) } @@ -82,7 +133,7 @@ func insertAndDeleteTest(t *testing.T) { if err = p.Parse(grammar.NewLLk(bql, 1), stm); err != nil { t.Errorf("Parser.consume: failed to accept BQL %q with error %v", bql, err) } - pln, err = New(ctx, memory.DefaultStore, stm, 0, nil) + pln, err = New(ctx, memory.DefaultStore, stm, 0, 10, nil) if err != nil { t.Errorf("planner.New: should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err) } @@ -131,7 +182,7 @@ func TestPlannerCreateGraph(t *testing.T) { if err = p.Parse(grammar.NewLLk(bql, 1), stm); err != nil { t.Errorf("Parser.consume: failed to accept BQL %q with error %v", bql, err) } - pln, err := New(ctx, memory.DefaultStore, stm, 0, nil) + pln, err := New(ctx, memory.DefaultStore, stm, 0, 10, nil) if err != nil { t.Errorf("planner.New: should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err) } @@ -162,7 +213,7 @@ func TestPlannerDropGraph(t *testing.T) { if err = p.Parse(grammar.NewLLk(bql, 1), stm); err != nil { t.Errorf("Parser.consume: failed to accept BQL %q with error %v", bql, err) } - pln, err := New(ctx, memory.DefaultStore, stm, 0, nil) + pln, err := New(ctx, memory.DefaultStore, stm, 0, 10, nil) if err != nil { t.Errorf("planner.New: should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err) } @@ -177,91 +228,28 @@ func TestPlannerDropGraph(t *testing.T) { } } -const ( - originalTriples = `/u "parent_of"@[] /u - /u "parent_of"@[] /u - /u "parent_of"@[] /u - /u "parent_of"@[] /u - /u "bought"@[2016-01-01T00:00:00-08:00] /c - /u "bought"@[2016-02-01T00:00:00-08:00] /c - /u "bought"@[2016-03-01T00:00:00-08:00] /c - /u "bought"@[2016-04-01T00:00:00-08:00] /c - /c "is_a"@[] /t - /c "is_a"@[] /t - /c "is_a"@[] /t - /c "is_a"@[] /t - /l "predicate"@[] "turned"@[2016-01-01T00:00:00-08:00] - /l "predicate"@[] "turned"@[2016-02-01T00:00:00-08:00] - /l "predicate"@[] "turned"@[2016-03-01T00:00:00-08:00] - /l "predicate"@[] "turned"@[2016-04-01T00:00:00-08:00] - ` - - tripleFromIssue40 = `/room "connects_to"@[] /room - /room "connects_to"@[] /room - /room "connects_to"@[] /room - /room "connects_to"@[] /room - /room "connects_to"@[] /room - /room "connects_to"@[] /room - /room "connects_to"@[] /room - /room "connects_to"@[] /room - /item/book<000> "in"@[2016-04-10T4:21:00.000000000Z] /room - /item/book<000> "in"@[2016-04-10T4:23:00.000000000Z] /room - /item/book<000> "in"@[2016-04-10T4:25:00.000000000Z] /room - ` - - testTriples = originalTriples + tripleFromIssue40 -) - -func populateTestStore(t *testing.T) storage.Store { - s, ctx := memory.NewStore(), context.Background() - g, err := s.NewGraph(ctx, "?test") +func populateStoreWithTriples(s storage.Store, ctx context.Context, gn string, triples string, tb testing.TB) { + g, err := s.NewGraph(ctx, gn) if err != nil { - t.Fatalf("memory.NewGraph failed to create \"?test\" with error %v", err) + tb.Fatalf("memory.NewGraph failed to create \"%v\" with error %v", gn, err) } - b := bytes.NewBufferString(testTriples) + b := bytes.NewBufferString(triples) if _, err := io.ReadIntoGraph(ctx, g, b, literal.DefaultBuilder()); err != nil { - t.Fatalf("io.ReadIntoGraph failed to read test graph with error %v", err) + tb.Fatalf("io.ReadIntoGraph failed to read test graph with error %v", err) } trpls := make(chan *triple.Triple) go func() { if err := g.Triples(ctx, storage.DefaultLookup, trpls); err != nil { - t.Fatal(err) + tb.Fatal(err) } }() cnt := 0 for _ = range trpls { cnt++ } - if got, want := cnt, len(strings.Split(testTriples, "\n"))-1; got != want { - t.Fatalf("Failed to import all test triples; got %v, want %v", got, want) + if got, want := cnt, len(strings.Split(triples, "\n"))-1; got != want { + tb.Fatalf("Failed to import all test triples; got %v, want %v", got, want) } - return s -} - -func populateBenchmarkStore(b *testing.B) storage.Store { - s, ctx := memory.NewStore(), context.Background() - g, err := s.NewGraph(ctx, "?test") - if err != nil { - b.Fatalf("memory.NewGraph failed to create \"?test\" with error %v", err) - } - buf := bytes.NewBufferString(testTriples) - if _, err := io.ReadIntoGraph(ctx, g, buf, literal.DefaultBuilder()); err != nil { - b.Fatalf("io.ReadIntoGraph failed to read test graph with error %v", err) - } - trpls := make(chan *triple.Triple) - go func() { - if err := g.Triples(ctx, storage.DefaultLookup, trpls); err != nil { - b.Fatal(err) - } - }() - cnt := 0 - for _ = range trpls { - cnt++ - } - if got, want := cnt, len(strings.Split(testTriples, "\n"))-1; got != want { - b.Fatalf("Failed to import all test triples; got %v, want %v", got, want) - } - return s } func TestPlannerQuery(t *testing.T) { @@ -463,7 +451,8 @@ func TestPlannerQuery(t *testing.T) { }, } - s := populateTestStore(t) + s, ctx := memory.NewStore(), context.Background() + populateStoreWithTriples(s, ctx, "?test", testTriples, t) p, err := grammar.NewParser(grammar.SemanticBQL()) if err != nil { t.Fatalf("grammar.NewParser: should have produced a valid BQL parser with error %v", err) @@ -473,7 +462,7 @@ func TestPlannerQuery(t *testing.T) { if err := p.Parse(grammar.NewLLk(entry.q, 1), st); err != nil { t.Errorf("Parser.consume: failed to parse query %q with error %v", entry.q, err) } - plnr, err := New(ctx, s, st, 0, nil) + plnr, err := New(ctx, s, st, 0, 10, nil) if err != nil { t.Errorf("planner.New failed to create a valid query plan with error %v", err) } @@ -491,6 +480,258 @@ func TestPlannerQuery(t *testing.T) { } } +func TestPlannerConstructAddsCorrectNumberofTriples(t *testing.T) { + sts, dts := len(strings.Split(constructTestSrcTriples, "\n"))-1, len(strings.Split(constructTestDestTriples, "\n"))-1 + testTable := []struct { + s string + trps int + }{ + { + s: `construct {?s ?p ?o} + into ?dest + from ?src + where {?s ?p ?o};`, + trps: sts + dts, + }, + { + s: `construct {?s "met"@[] ?o; "location"@[] /city} + into ?dest + from ?src + where {?s "met"@[] ?o};`, + // 3 matching triples * 4 new triples per matched triple due to reification + 1 triple in dest graph. + trps: 3*4 + dts, + }, + { + s: `construct {?s "met"@[] ?o; "location"@[] /city; + "outcome"@[] "good"^^type:text } + into ?dest + from ?src + where {?s "met"@[] ?o};`, + // 3 matching triples * 5 new triples per matched triple due to reification + 1 triple in dest graph. + trps: 3*5 + dts, + }, + { + s: `construct {?s "met"@[?t] ?o; "location"@[] /city; + "outcome"@[] "good"^^type:text . + ?s "connected_to"@[] ?o} + into ?dest + from ?src + where {?s "met"@[] ?o. + ?s "met_at"@[?t] ?o};`, + // 2 matching triples * (5 new triples due to reification + 1 explictly constructed triple per matched triple) + + // 1 triple in dest graph. + trps: 2*6 + dts, + }, + { + s: `construct {?s "met"@[?t] ?o; "location"@[] /city; + "outcome"@[] "good"^^type:text . + ?s "connected_to"@[] ?o; "at"@[?t] /city } + into ?dest + from ?src + where {?s "met"@[] ?o. + ?s "met_at"@[?t] ?o};`, + // 2 matching triples * 9 new triples due to reification + 1 triple in dest graph. + trps: 2*9 + dts, + }, + { + s: `construct {?d2 "is_2_hops_from"@[] ?s1 } + into ?dest + from ?src + where {?s1 "is_connected_to"@[] ?d1. + ?d1 "is_connected_to"@[] ?d2};`, + // 2 new triples (/city "is_2_hops_from"@[] /city, /city "is_2_hops_from"@[] /city) + 1 triple in dest graph. + trps: 3, + }, + } + p, err := grammar.NewParser(grammar.SemanticBQL()) + if err != nil { + t.Errorf("grammar.NewParser: should have produced a valid BQL parser, %v", err) + } + for _, entry := range testTable { + + s, ctx := memory.NewStore(), context.Background() + populateStoreWithTriples(s, ctx, "?src", constructTestSrcTriples, t) + populateStoreWithTriples(s, ctx, "?dest", constructTestDestTriples, t) + + st := &semantic.Statement{} + if err := p.Parse(grammar.NewLLk(entry.s, 1), st); err != nil { + t.Errorf("Parser.consume: failed to parse query %q with error %v", entry.s, err) + } + plnr, err := New(ctx, s, st, 0, 10, nil) + if err != nil { + t.Errorf("planner.New failed to create a valid query plan with error %v", err) + } + _, err = plnr.Execute(ctx) + if err != nil { + t.Errorf("planner.Execute failed for query %q with error %v", entry.s, err) + continue + } + + g, err := s.Graph(ctx, "?dest") + if err != nil { + t.Errorf("memory.DefaultStore.Graph(%q) should have not fail with error %v", "?test", err) + } + + i := 0 + ts := make(chan *triple.Triple) + go func() { + if err := g.Triples(ctx, storage.DefaultLookup, ts); err != nil { + t.Error(err) + } + }() + for range ts { + i++ + } + if i != entry.trps { + t.Errorf("g.Triples should have returned %v triples, returned %v instead", entry.trps, i) + } + } + +} + +func TestPlannerConstructAddsCorrectTriples(t *testing.T) { + bql := `construct {?s "met"@[?t] ?o; "location"@[] /city; + "outcome"@[] "good"^^type:text. + ?s "connected_to"@[] ?o } + into ?dest + from ?src + where {?s "met"@[] ?o. + ?s "met_at"@[?t] ?o};` + p, err := grammar.NewParser(grammar.SemanticBQL()) + if err != nil { + t.Errorf("grammar.NewParser: should have produced a valid BQL parser, %v", err) + } + s, ctx := memory.NewStore(), context.Background() + populateStoreWithTriples(s, ctx, "?src", constructTestSrcTriples, t) + populateStoreWithTriples(s, ctx, "?dest", "", t) + + st := &semantic.Statement{} + if err := p.Parse(grammar.NewLLk(bql, 1), st); err != nil { + t.Errorf("Parser.consume: failed to parse query %q with error %v", bql, err) + } + plnr, err := New(ctx, s, st, 0, 10, nil) + if err != nil { + t.Errorf("planner.New failed to create a valid query plan with error %v", err) + } + _, err = plnr.Execute(ctx) + if err != nil { + t.Errorf("planner.Execute failed for query %q with error %v", bql, err) + } + + g, err := s.Graph(ctx, "?dest") + if err != nil { + t.Errorf("memory.DefaultStore.Graph(%q) should have not fail with error %v", "?test", err) + } + + ts := make(chan *triple.Triple) + go func() { + if err := g.Triples(ctx, storage.DefaultLookup, ts); err != nil { + t.Error(err) + } + }() + + bnm := make(map[string]map[string]bool) + bns := make(map[string]string) + bna := map[string]bool{ + "/_": true, + "/_": true, + } + dtm := map[string]bool{ + fmt.Sprintf("%s\t%s\t%s", `/person`, `"connected_to"@[]`, `/person`): false, + fmt.Sprintf("%s\t%s\t%s", `/person`, `"connected_to"@[]`, `/person`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"_subject"@[2016-04-10T04:25:00Z]`, `/person`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"_predicate"@[2016-04-10T04:25:00Z]`, `"met"@[2016-04-10T04:25:00Z]`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"_object"@[2016-04-10T04:25:00Z]`, `/person`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"location"@[]`, `/city`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"outcome"@[]`, `"good"^^type:text`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"_subject"@[2016-04-10T04:25:00Z]`, `/person`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"_predicate"@[2016-04-10T04:25:00Z]`, `"met"@[2016-04-10T04:25:00Z]`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"_object"@[2016-04-10T04:25:00Z]`, `/person`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"location"@[]`, `/city`): false, + fmt.Sprintf("%s\t%s\t%s", `/_`, `"outcome"@[]`, `"good"^^type:text`): false, + } + + // First, we map each blank node generated to a potential blank node placeholder (such as b1 or b2.) + sts := []*triple.Triple{} + for elem := range ts { + sts = append(sts, elem) + if elem.Subject().Type().String() == "/_" { + for k, _ := range dtm { + trp, err := triple.Parse(k, literal.DefaultBuilder()) + if err != nil { + t.Errorf("Unable to parse triple: %v with error %v", k, err) + } + if trp.Subject().Type().String() == "/_" && + trp.Predicate().String() == elem.Predicate().String() && + trp.Object().String() == elem.Object().String() { + if mp, ok := bnm[elem.Subject().String()]; !ok { + bnm[elem.Subject().String()] = map[string]bool{ + trp.Subject().String(): true, + } + } else { + mp[trp.Subject().String()] = true + } + } + } + + } + } + + // Then, we decide which place holder blank nodes can be used to substiute for a given blank node + // by substituting the place holder in every triple where the given blank node is the subject and + // checking if the triple exists in the map of expected triples. + for _, t := range sts { + if t.Subject().Type().String() == "/_" { + for bn, _ := range bnm[t.Subject().String()] { + rep := fmt.Sprintf("%s\t%s\t%s", bn, t.Predicate().String(), t.Object().String()) + if _, ok := dtm[rep]; !ok { + bnm[t.Subject().String()][bn] = false + } + } + } + } + + // Finally, we assign a blank node to a place-holder blank node, if the place-holder blank node is + // not used to substitute any other blank node. + for k, v := range bnm { + for bn, p := range v { + if p && bna[bn] { + bns[k] = bn + bna[bn] = false + break + } + } + } + if len(sts) != len(dtm) { + t.Errorf("g.Triples should have returned %v triples, returned %v instead", len(dtm), len(sts)) + } + for _, elem := range sts { + if elem.Subject().Type().String() == "/_" { + if val, ok := bns[elem.Subject().String()]; ok { + // Substitute the blank node with the mapped place holder blank node id. + rep := fmt.Sprintf("%s\t%s\t%s", val, elem.Predicate().String(), elem.Object().String()) + if _, ok := dtm[rep]; !ok { + t.Errorf("unexpected triple: %v added to graph", elem) + } + dtm[rep] = true + } else { + t.Errorf("unexpected triple: %v added to graph", elem) + } + } else { + sr := elem.String() + if _, ok := dtm[sr]; !ok { + t.Errorf("unexpected triple: %v added to graph", elem) + } + dtm[sr] = true + } + } + for k, v := range dtm { + if v == false { + t.Errorf("g.Triples did not return triple: %v", k) + } + } +} + func TestTreeTraversalToRoot(t *testing.T) { // Graph traversal data. traversalTriples := `/person "born in"@[] /city @@ -524,7 +765,7 @@ func TestTreeTraversalToRoot(t *testing.T) { if err := p.Parse(grammar.NewLLk(traversalQuery, 1), st); err != nil { t.Errorf("Parser.consume: failed to parse query %q with error %v", traversalQuery, err) } - plnr, err := New(ctx, s, st, 0, nil) + plnr, err := New(ctx, s, st, 0, 10, nil) if err != nil { t.Errorf("planner.New failed to create a valid query plan with error %v", err) } @@ -547,7 +788,7 @@ func TestChaining(t *testing.T) { /u "parent_of"@[] /u /u "parent_of"@[] /u` - traversalQuery := `SELECT ?o FROM ?test + traversalQuery := `SELECT ?o FROM ?test WHERE { /u "parent_of"@[] ?o . ?o "parent_of"@[] /u @@ -571,7 +812,7 @@ func TestChaining(t *testing.T) { if err := p.Parse(grammar.NewLLk(traversalQuery, 1), st); err != nil { t.Errorf("Parser.consume: failed to parse query %q with error %v", traversalQuery, err) } - plnr, err := New(ctx, s, st, 0, nil) + plnr, err := New(ctx, s, st, 0, 10, nil) if err != nil { t.Errorf("planner.New failed to create a valid query plan with error %v", err) } @@ -629,7 +870,7 @@ func TestReificationResolutionIssue70(t *testing.T) { if err := p.Parse(grammar.NewLLk(query, 1), st); err != nil { t.Errorf("Parser.consume: failed to parse query %q with error %v", query, err) } - plnr, err := New(ctx, s, st, 0, nil) + plnr, err := New(ctx, s, st, 0, 10, nil) if err != nil { t.Errorf("planner.New failed to create a valid query plan with error %v", err) } @@ -649,7 +890,8 @@ func TestReificationResolutionIssue70(t *testing.T) { func benchmarkQuery(query string, b *testing.B) { ctx := context.Background() - s := populateBenchmarkStore(b) + s, ctx := memory.NewStore(), context.Background() + populateStoreWithTriples(s, ctx, "?test", testTriples, b) p, err := grammar.NewParser(grammar.SemanticBQL()) if err != nil { b.Fatalf("grammar.NewParser: should have produced a valid BQL parser with error %v", err) @@ -660,7 +902,7 @@ func benchmarkQuery(query string, b *testing.B) { if err := p.Parse(grammar.NewLLk(query, 1), st); err != nil { b.Errorf("Parser.consume: failed to parse query %q with error %v", query, err) } - plnr, err := New(ctx, s, st, 0, nil) + plnr, err := New(ctx, s, st, 0, 10, nil) if err != nil { b.Errorf("planner.New failed to create a valid query plan with error %v", err) } diff --git a/bql/semantic/semantic.go b/bql/semantic/semantic.go index 8a35f63d..bdd90ba4 100644 --- a/bql/semantic/semantic.go +++ b/bql/semantic/semantic.go @@ -21,6 +21,7 @@ package semantic import ( "bytes" "context" + "fmt" "reflect" "sort" "time" @@ -149,8 +150,8 @@ type ConstructClause struct { OAnchorBinding string OTemporal bool - reificationClauses []*ReificationClause - workingReificationClause *ReificationClause + reificationClauses []*ReificationClause + workingReificationClause *ReificationClause } // ReificationClause represents a clause used to reify a triple. @@ -342,7 +343,7 @@ func (c *GraphClause) Specificity() int { return s } -// BindingsMap returns the binding map fo he graph clause. +// BindingsMap returns the binding map for the graph clause. func (c *GraphClause) BindingsMap() map[string]int { bm := make(map[string]int) @@ -369,7 +370,7 @@ func (c *GraphClause) BindingsMap() map[string]int { return bm } -// Bindings returns the list of unique bindings listed int he graph clause. +// Bindings returns the list of unique bindings listed in the graph clause. func (c *GraphClause) Bindings() []string { var bs []string for k := range c.BindingsMap() { @@ -383,11 +384,160 @@ func (c *GraphClause) IsEmpty() bool { return reflect.DeepEqual(c, &GraphClause{}) } +// String returns a readable representation of a construct clause. +func (c *ConstructClause) String() string { + b := bytes.NewBufferString("{ ") + + // Subject section. + if c.S != nil { + b.WriteString(c.S.String()) + } else { + b.WriteString(c.SBinding) + } + + // Predicate section. + predicate := false + if c.P != nil { + b.WriteString(" ") + b.WriteString(c.P.String()) + predicate = true + } + if c.PBinding != "" { + b.WriteString(" ") + b.WriteString(c.PBinding) + } + if c.PID != "" { + b.WriteString(" \"") + b.WriteString(c.PID) + b.WriteString("\"") + } + if !predicate { + if !c.PTemporal { + b.WriteString("@[]") + } else { + b.WriteString("@[") + if c.PAnchorBinding != "" { + b.WriteString(c.PAnchorBinding) + } + } + b.WriteString("]") + } + + // Object section. + // Node portion. + object := false + if c.O != nil { + b.WriteString(" ") + b.WriteString(c.O.String()) + object = true + } else { + b.WriteString(" ") + b.WriteString(c.OBinding) + object = true + } + // Predicate portion. + if !object { + if c.OBinding != "" { + b.WriteString(" ") + b.WriteString(c.OBinding) + } + if c.OID != "" { + b.WriteString(" \"") + b.WriteString(c.OID) + b.WriteString("\"") + } + if !c.OTemporal { + b.WriteString("[]") + } else { + b.WriteString("[") + if c.OAnchorBinding != "" { + b.WriteString(c.OAnchorBinding) + } + b.WriteString("]") + } + } + + // Reification clauses portion. + for _, rc := range c.ReificationClauses() { + b.WriteString(fmt.Sprintf(";%v", rc)) + } + b.WriteString(" }") + return b.String() +} + // IsEmpty will return true if there are no set values in the construct clause. func (c *ConstructClause) IsEmpty() bool { return reflect.DeepEqual(c, &ConstructClause{}) } +// String returns a readable representation of a reification clause. +func (c *ReificationClause) String() string { + b := bytes.NewBufferString("") + + // Predicate section. + predicate := false + if c.P != nil { + b.WriteString(" ") + b.WriteString(c.P.String()) + predicate = true + } + if c.PBinding != "" { + b.WriteString(" ") + b.WriteString(c.PBinding) + } + if c.PID != "" { + b.WriteString(" \"") + b.WriteString(c.PID) + b.WriteString("\"") + } + if !predicate { + if !c.PTemporal { + b.WriteString("@[]") + } else { + b.WriteString("@[") + if c.PAnchorBinding != "" { + b.WriteString(c.PAnchorBinding) + } + } + b.WriteString("]") + } + + // Object section. + // Node portion. + object := false + if c.O != nil { + b.WriteString(" ") + b.WriteString(c.O.String()) + object = true + } else { + b.WriteString(" ") + b.WriteString(c.OBinding) + object = true + } + // Predicate portion. + if !object { + if c.OBinding != "" { + b.WriteString(" ") + b.WriteString(c.OBinding) + } + if c.OID != "" { + b.WriteString(" \"") + b.WriteString(c.OID) + b.WriteString("\"") + } + if !c.OTemporal { + b.WriteString("[]") + } else { + b.WriteString("[") + if c.OAnchorBinding != "" { + b.WriteString(c.OAnchorBinding) + } + b.WriteString("]") + } + } + return b.String() +} + // IsEmpty will return true if there are no set values in the reification clause. func (c *ReificationClause) IsEmpty() bool { return reflect.DeepEqual(c, &ReificationClause{}) @@ -724,6 +874,48 @@ func (s *Statement) OutputBindings() []string { res = append(res, p.Binding) } } + set := make(map[string]bool) + set[""] = true + for _, c := range s.constructClauses { + if _, ok := set[c.SBinding]; !ok { + res = append(res, c.SBinding) + set[c.SBinding] = true + } + if _, ok := set[c.PBinding]; !ok { + res = append(res, c.PBinding) + set[c.PBinding] = true + } + if _, ok := set[c.PAnchorBinding]; !ok { + res = append(res, c.PAnchorBinding) + set[c.PAnchorBinding] = true + } + if _, ok := set[c.OBinding]; !ok { + res = append(res, c.OBinding) + set[c.OBinding] = true + } + if _, ok := set[c.OAnchorBinding]; !ok { + res = append(res, c.OAnchorBinding) + set[c.OAnchorBinding] = true + } + for _, r := range c.reificationClauses { + if _, ok := set[r.PBinding]; !ok { + res = append(res, r.PBinding) + set[r.PBinding] = true + } + if _, ok := set[r.PAnchorBinding]; !ok { + res = append(res, r.PAnchorBinding) + set[r.PAnchorBinding] = true + } + if _, ok := set[r.OBinding]; !ok { + res = append(res, r.OBinding) + set[r.OBinding] = true + } + if _, ok := set[r.OAnchorBinding]; !ok { + res = append(res, r.OAnchorBinding) + set[r.OAnchorBinding] = true + } + } + } return res } @@ -811,7 +1003,7 @@ func (c *ConstructClause) WorkingReificationClause() *ReificationClause { // AddWorkingReificationClause adds the working reification clause to the set // of reification clauses belonging to the construct clause. func (c *ConstructClause) AddWorkingReificationClause() { - if c.workingReificationClause != nil && !c.workingReificationClause.IsEmpty(){ + if c.workingReificationClause != nil && !c.workingReificationClause.IsEmpty() { c.reificationClauses = append(c.reificationClauses, c.workingReificationClause) } c.ResetWorkingReificationClause() diff --git a/bql/semantic/semantic_test.go b/bql/semantic/semantic_test.go index 8acdc831..b591e5ed 100644 --- a/bql/semantic/semantic_test.go +++ b/bql/semantic/semantic_test.go @@ -270,7 +270,6 @@ func TestInputOutputBindings(t *testing.T) { PBinding: "?foo9", OBinding: "?foo10", }, - }, }, { @@ -279,19 +278,20 @@ func TestInputOutputBindings(t *testing.T) { reificationClauses: []*ReificationClause{ { PAnchorBinding: "?foo13", - OAnchorBinding: "?foo14", + OAnchorBinding: "?foo13", }, - }, }, }, } want := []string{"?foo", "?bar", "?foo1", "?foo2", "?foo3", "?foo4", "?foo5", "?foo6", - "?foo7","?foo8", "?foo9", "?foo10", "?foo11", "?foo12", "?foo13", "?foo14"} + "?foo7", "?foo8", "?foo9", "?foo10", "?foo11", "?foo12", "?foo13", "?foo13"} if got := s.InputBindings(); !reflect.DeepEqual(got, want) { - t.Errorf("s.InputBindings return the wrong input binding; got %v, want %v", got, want) + t.Errorf("s.InputBindings returned the wrong input bindings; got %v, want %v", got, want) } - if got, want := s.OutputBindings(), []string{"?foo_alias", "?bar"}; !reflect.DeepEqual(got, want) { - t.Errorf("s.OutputBindings return the wrong input binding; got %v, want %v", got, want) + want = []string{"?foo_alias", "?bar", "?foo1", "?foo2", "?foo3", "?foo4", "?foo5", "?foo6", + "?foo7", "?foo8", "?foo9", "?foo10", "?foo11", "?foo12", "?foo13"} + if got := s.OutputBindings(); !reflect.DeepEqual(got, want) { + t.Errorf("s.OutputBindings returned the wrong output bindings; got %v, want %v", got, want) } } diff --git a/bql/table/table.go b/bql/table/table.go index c5821f75..72603210 100644 --- a/bql/table/table.go +++ b/bql/table/table.go @@ -240,7 +240,7 @@ func MergeRows(ms []Row) Row { // DotProduct does the dot product with the provided table func (t *Table) DotProduct(t2 *Table) error { if !disjointBinding(t.mbs, t2.mbs) { - return fmt.Errorf("DotProduct operations requires disjoint bindingts; instead got %v and %v", t.mbs, t2.mbs) + return fmt.Errorf("DotProduct operations requires disjoint bindings; instead got %v and %v", t.mbs, t2.mbs) } // Update the table metadata. m := make(map[string]bool) diff --git a/tools/benchmark/batteries/add.go b/tools/benchmark/batteries/add.go index a670dc62..111327b7 100644 --- a/tools/benchmark/batteries/add.go +++ b/tools/benchmark/batteries/add.go @@ -25,7 +25,7 @@ import ( ) // AddTreeTriplesBenchmark creates the benchmark. -func AddTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func AddTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { bFactors := []int{2, 200} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple @@ -78,7 +78,7 @@ func AddTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int } // AddExistingTreeTriplesBenchmark creates the benchmark. -func AddExistingTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func AddExistingTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { bFactors := []int{2, 200} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple @@ -134,7 +134,7 @@ func AddExistingTreeTriplesBenchmark(ctx context.Context, st storage.Store, chan } // AddGraphTriplesBenchmark creates the benchmark. -func AddGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func AddGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { nodes := []int{317, 1000} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple @@ -187,7 +187,7 @@ func AddGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize in } // AddExistingGraphTriplesBenchmark creates the benchmark. -func AddExistingGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func AddExistingGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { nodes := []int{317, 1000} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple diff --git a/tools/benchmark/batteries/bql.go b/tools/benchmark/batteries/bql.go index e4247992..c4c7c420 100644 --- a/tools/benchmark/batteries/bql.go +++ b/tools/benchmark/batteries/bql.go @@ -262,7 +262,7 @@ var randomGraphWalkingBQL = []string{ } // BQLTreeGraphWalking creates the benchmark. -func BQLTreeGraphWalking(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func BQLTreeGraphWalking(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { bFactors := []int{2, 200} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple @@ -309,7 +309,7 @@ func BQLTreeGraphWalking(ctx context.Context, st storage.Store, chanSize int) ([ }, F: func() error { query := fmt.Sprintf(bql, gID) - _, err := run.BQL(ctx, query, st, chanSize) + _, err := run.BQL(ctx, query, st, chanSize, bulkSize) return err }, TearDown: func() error { @@ -323,7 +323,7 @@ func BQLTreeGraphWalking(ctx context.Context, st storage.Store, chanSize int) ([ } // BQLRandomGraphWalking creates the benchmark. -func BQLRandomGraphWalking(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func BQLRandomGraphWalking(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { rgSize := []int{1000, 10000} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple @@ -370,7 +370,7 @@ func BQLRandomGraphWalking(ctx context.Context, st storage.Store, chanSize int) }, F: func() error { query := fmt.Sprintf(bql, gID) - _, err := run.BQL(ctx, query, st, chanSize) + _, err := run.BQL(ctx, query, st, chanSize, bulkSize) return err }, TearDown: func() error { diff --git a/tools/benchmark/batteries/remove.go b/tools/benchmark/batteries/remove.go index 9690a496..721b08c8 100644 --- a/tools/benchmark/batteries/remove.go +++ b/tools/benchmark/batteries/remove.go @@ -25,7 +25,7 @@ import ( ) // RemoveTreeTriplesBenchmark creates the benchmark. -func RemoveTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func RemoveTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { bFactors := []int{2, 200} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple @@ -78,7 +78,7 @@ func RemoveTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize } // RemoveExistingTreeTriplesBenchmark creates the benchmark. -func RemoveExistingTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func RemoveExistingTreeTriplesBenchmark(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { bFactors := []int{2, 200} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple @@ -134,7 +134,7 @@ func RemoveExistingTreeTriplesBenchmark(ctx context.Context, st storage.Store, c } // RemoveGraphTriplesBenchmark creates the benchmark. -func RemoveGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func RemoveGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { nodes := []int{317, 1000} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple @@ -187,7 +187,7 @@ func RemoveGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize } // RemoveExistingGraphTriplesBenchmark creates the benchmark. -func RemoveExistingGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize int) ([]*runtime.BenchEntry, error) { +func RemoveExistingGraphTriplesBenchmark(ctx context.Context, st storage.Store, chanSize, bulkSize int) ([]*runtime.BenchEntry, error) { nodes := []int{317, 1000} sizes := []int{10, 1000, 100000} var trplSets [][]*triple.Triple diff --git a/tools/compliance/runner.go b/tools/compliance/runner.go index 6a8856c4..697f78d9 100644 --- a/tools/compliance/runner.go +++ b/tools/compliance/runner.go @@ -75,7 +75,7 @@ func (s *Story) cleanSources(ctx context.Context, st storage.Store) error { // runAssertion runs the assertion and compares the outcome. Returns the outcome // of comparing the obtained result table with the assertion table if there is // no error during the assertion. -func (a *Assertion) runAssertion(ctx context.Context, st storage.Store, chanSize int) (bool, *table.Table, *table.Table, error) { +func (a *Assertion) runAssertion(ctx context.Context, st storage.Store, chanSize, bulkSize int) (bool, *table.Table, *table.Table, error) { errorizer := func(e error) (bool, *table.Table, *table.Table, error) { if a.WillFail && e != nil { return true, nil, nil, nil @@ -92,7 +92,7 @@ func (a *Assertion) runAssertion(ctx context.Context, st storage.Store, chanSize if err := p.Parse(grammar.NewLLk(a.Statement, 1), stm); err != nil { return errorizer(fmt.Errorf("Failed to parse BQL statement with error %v", err)) } - pln, err := planner.New(ctx, st, stm, chanSize, nil) + pln, err := planner.New(ctx, st, stm, chanSize, bulkSize, nil) if err != nil { return errorizer(fmt.Errorf("Should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err)) } @@ -116,7 +116,7 @@ func (a *Assertion) runAssertion(ctx context.Context, st storage.Store, chanSize // return an error if something wrong happen along the way. It is worth // mentioning that Run does not clear any data available in the provided // storage. -func (s *Story) Run(ctx context.Context, st storage.Store, b literal.Builder, chanSize int) (map[string]*AssertionOutcome, error) { +func (s *Story) Run(ctx context.Context, st storage.Store, b literal.Builder, chanSize, bulkSize int) (map[string]*AssertionOutcome, error) { // Populate the sources. if err := s.populateSources(ctx, st, b); err != nil { return nil, err @@ -124,7 +124,7 @@ func (s *Story) Run(ctx context.Context, st storage.Store, b literal.Builder, ch // Run assertions. m := make(map[string]*AssertionOutcome) for _, a := range s.Assertions { - b, got, want, err := a.runAssertion(ctx, st, chanSize) + b, got, want, err := a.runAssertion(ctx, st, chanSize, bulkSize) if err != nil { return nil, err } @@ -156,10 +156,10 @@ type AssertionBatteryEntry struct { // RunStories runs a the provided stories and returns the outcome of each of // them. -func RunStories(ctx context.Context, st storage.Store, b literal.Builder, stories []*Story, chanSize int) *AssertionBattery { +func RunStories(ctx context.Context, st storage.Store, b literal.Builder, stories []*Story, chanSize, bulkSize int) *AssertionBattery { results := &AssertionBattery{} for _, s := range stories { - o, err := s.Run(ctx, st, b, chanSize) + o, err := s.Run(ctx, st, b, chanSize, bulkSize) results.Entries = append(results.Entries, &AssertionBatteryEntry{ Story: s, Outcome: o, diff --git a/tools/compliance/runner_test.go b/tools/compliance/runner_test.go index fb02a4fa..abf3c81d 100644 --- a/tools/compliance/runner_test.go +++ b/tools/compliance/runner_test.go @@ -94,7 +94,7 @@ func TestRun(t *testing.T) { ctx := context.Background() for _, s := range testStories { for cs := 0; cs < 10; cs++ { - m, err := s.Run(ctx, memory.NewStore(), literal.DefaultBuilder(), cs) + m, err := s.Run(ctx, memory.NewStore(), literal.DefaultBuilder(), cs, 10) if err != nil { t.Error(err) } @@ -177,7 +177,7 @@ func TestRunStories(t *testing.T) { } ctx := context.Background() for cs := 0; cs < 10; cs++ { - results := RunStories(ctx, memory.NewStore(), literal.DefaultBuilder(), testStories, cs) + results := RunStories(ctx, memory.NewStore(), literal.DefaultBuilder(), testStories, cs, 10) for _, entry := range results.Entries { if entry.Err != nil { t.Error(entry.Err) diff --git a/tools/vcli/bw/assert/assert.go b/tools/vcli/bw/assert/assert.go index 500708f6..f56ea428 100644 --- a/tools/vcli/bw/assert/assert.go +++ b/tools/vcli/bw/assert/assert.go @@ -33,7 +33,7 @@ import ( ) // New creates the help command. -func New(store storage.Store, builder literal.Builder, chanSize int) *command.Command { +func New(store storage.Store, builder literal.Builder, chanSize, bulkSize int) *command.Command { cmd := &command.Command{ UsageLine: "assert folder_path", Short: "asserts all the stories in the indicated folder.", @@ -42,13 +42,13 @@ file containing all the sources and all the assertions to run. `, } cmd.Run = func(ctx context.Context, args []string) int { - return assertCommand(ctx, cmd, args, store, builder, chanSize) + return assertCommand(ctx, cmd, args, store, builder, chanSize, bulkSize) } return cmd } // assertCommand runs all the BQL statements available in the file. -func assertCommand(ctx context.Context, cmd *command.Command, args []string, store storage.Store, builder literal.Builder, chanSize int) int { +func assertCommand(ctx context.Context, cmd *command.Command, args []string, store storage.Store, builder literal.Builder, chanSize, bulkSize int) int { if len(args) < 3 { log.Printf("Missing required folder path. ") cmd.Usage() @@ -97,7 +97,7 @@ func assertCommand(ctx context.Context, cmd *command.Command, args []string, sto } fmt.Println("-------------------------------------------------------------") fmt.Printf("Evaluating %d stories... ", len(stories)) - results := compliance.RunStories(ctx, store, builder, stories, chanSize) + results := compliance.RunStories(ctx, store, builder, stories, chanSize, bulkSize) fmt.Println("done.") fmt.Println("-------------------------------------------------------------") for i, entry := range results.Entries { diff --git a/tools/vcli/bw/benchmark/benchmark.go b/tools/vcli/bw/benchmark/benchmark.go index 1573ea52..1bb2a9d3 100644 --- a/tools/vcli/bw/benchmark/benchmark.go +++ b/tools/vcli/bw/benchmark/benchmark.go @@ -31,10 +31,10 @@ import ( ) // New create the version command. -func New(store storage.Store, chanSize int) *command.Command { +func New(store storage.Store, chanSize, bulkSize int) *command.Command { return &command.Command{ Run: func(ctx context.Context, args []string) int { - return runAll(ctx, store, chanSize) + return runAll(ctx, store, chanSize, bulkSize) }, UsageLine: "benchmark", Short: "runs a set of precan benchmarks.", @@ -46,7 +46,7 @@ a tree or a random graph generator.`, } // runAll executes all the canned benchmarks and prints out the stats. -func runAll(ctx context.Context, st storage.Store, chanSize int) int { +func runAll(ctx context.Context, st storage.Store, chanSize, bulkSize int) int { // - Add non existing triples. (done) // - Add triples that already exist. (done) // - Remove non existing triples. (done) @@ -62,32 +62,32 @@ func runAll(ctx context.Context, st storage.Store, chanSize int) int { var out int // Add non existing triples. - out += runBattery(ctx, st, "adding non existing tree triples", chanSize, batteries.AddTreeTriplesBenchmark) - out += runBattery(ctx, st, "adding non existing graph triples", chanSize, batteries.AddGraphTriplesBenchmark) + out += runBattery(ctx, st, "adding non existing tree triples", chanSize, bulkSize, batteries.AddTreeTriplesBenchmark) + out += runBattery(ctx, st, "adding non existing graph triples", chanSize, bulkSize, batteries.AddGraphTriplesBenchmark) // Add existing triples. - out += runBattery(ctx, st, "adding existing tree triples", chanSize, batteries.AddExistingTreeTriplesBenchmark) - out += runBattery(ctx, st, "adding existing graph triples", chanSize, batteries.AddExistingGraphTriplesBenchmark) + out += runBattery(ctx, st, "adding existing tree triples", chanSize, bulkSize, batteries.AddExistingTreeTriplesBenchmark) + out += runBattery(ctx, st, "adding existing graph triples", chanSize, bulkSize, batteries.AddExistingGraphTriplesBenchmark) // Remove non existing triples. - out += runBattery(ctx, st, "removing non existing tree triples", chanSize, batteries.RemoveTreeTriplesBenchmark) - out += runBattery(ctx, st, "removing non existing graph triples", chanSize, batteries.RemoveGraphTriplesBenchmark) + out += runBattery(ctx, st, "removing non existing tree triples", chanSize, bulkSize, batteries.RemoveTreeTriplesBenchmark) + out += runBattery(ctx, st, "removing non existing graph triples", chanSize, bulkSize, batteries.RemoveGraphTriplesBenchmark) // Remove existing triples. - out += runBattery(ctx, st, "removing existing tree triples", chanSize, batteries.RemoveExistingTreeTriplesBenchmark) - out += runBattery(ctx, st, "removing existing graph triples", chanSize, batteries.RemoveExistingGraphTriplesBenchmark) + out += runBattery(ctx, st, "removing existing tree triples", chanSize, bulkSize, batteries.RemoveExistingTreeTriplesBenchmark) + out += runBattery(ctx, st, "removing existing graph triples", chanSize, bulkSize, batteries.RemoveExistingGraphTriplesBenchmark) // BQL graph walking. - out += runBattery(ctx, st, "walking the tree graph with BQL", chanSize, batteries.BQLTreeGraphWalking) - out += runBattery(ctx, st, "walking the random graph with BQL", chanSize, batteries.BQLRandomGraphWalking) + out += runBattery(ctx, st, "walking the tree graph with BQL", chanSize, bulkSize, batteries.BQLTreeGraphWalking) + out += runBattery(ctx, st, "walking the random graph with BQL", chanSize, bulkSize, batteries.BQLRandomGraphWalking) return out } // runBattery executes all the canned benchmarks and prints out the stats. -func runBattery(ctx context.Context, st storage.Store, name string, chanSize int, f func(context.Context, storage.Store, int) ([]*runtime.BenchEntry, error)) int { +func runBattery(ctx context.Context, st storage.Store, name string, chanSize, bulkSize int, f func(context.Context, storage.Store, int, int) ([]*runtime.BenchEntry, error)) int { // Add triples. fmt.Printf("Creating %s triples benchmark... ", name) - bes, err := f(ctx, st, chanSize) + bes, err := f(ctx, st, chanSize, bulkSize) if err != nil { log.Printf("[ERROR] %v\n", err) return 2 diff --git a/tools/vcli/bw/common/common.go b/tools/vcli/bw/common/common.go index 58b435b2..d03430de 100644 --- a/tools/vcli/bw/common/common.go +++ b/tools/vcli/bw/common/common.go @@ -105,13 +105,13 @@ func InitializeDriver(driverName string, drivers map[string]StoreGenerator) (sto // instance. func InitializeCommands(driver storage.Store, chanSize, bulkTripleOpSize, builderSize int, rl repl.ReadLiner, done chan bool) []*command.Command { return []*command.Command{ - assert.New(driver, literal.DefaultBuilder(), chanSize), - benchmark.New(driver, chanSize), + assert.New(driver, literal.DefaultBuilder(), chanSize, bulkTripleOpSize), + benchmark.New(driver, chanSize, bulkTripleOpSize), export.New(driver, bulkTripleOpSize), load.New(driver, bulkTripleOpSize, builderSize), - run.New(driver, chanSize), + run.New(driver, chanSize, bulkTripleOpSize), repl.New(driver, chanSize, bulkTripleOpSize, builderSize, rl, done), - server.New(driver, chanSize), + server.New(driver, chanSize, bulkTripleOpSize), version.New(), } } diff --git a/tools/vcli/bw/repl/repl.go b/tools/vcli/bw/repl/repl.go index bf01bd35..65abf9aa 100644 --- a/tools/vcli/bw/repl/repl.go +++ b/tools/vcli/bw/repl/repl.go @@ -168,7 +168,7 @@ func REPL(driver storage.Store, input *os.File, rl ReadLiner, chanSize, bulkSize continue } if strings.HasPrefix(l, "desc") { - pln, err := planBQL(ctx, l[4:], driver, chanSize, nil) + pln, err := planBQL(ctx, l[4:], driver, chanSize, bulkSize, nil) if err != nil { fmt.Printf("[ERROR] %s\n\n", err) } else { @@ -180,7 +180,7 @@ func REPL(driver storage.Store, input *os.File, rl ReadLiner, chanSize, bulkSize } if strings.HasPrefix(l, "run") { now := time.Now() - path, cmds, err := runBQLFromFile(ctx, driver, chanSize, strings.TrimSpace(l[:len(l)-1]), tracer) + path, cmds, err := runBQLFromFile(ctx, driver, chanSize, bulkSize, strings.TrimSpace(l[:len(l)-1]), tracer) if err != nil { fmt.Printf("[ERROR] %s\n\n", err) } else { @@ -192,7 +192,7 @@ func REPL(driver storage.Store, input *os.File, rl ReadLiner, chanSize, bulkSize } now := time.Now() - table, err := runBQL(ctx, l, driver, chanSize, tracer) + table, err := runBQL(ctx, l, driver, chanSize, bulkSize, tracer) if err != nil { fmt.Printf("[ERROR] %s\n", err) fmt.Println("Time spent: ", time.Now().Sub(now)) @@ -222,7 +222,7 @@ func printHelp() { } // runBQLFromFile loads all the statements in the file and runs them. -func runBQLFromFile(ctx context.Context, driver storage.Store, chanSize int, line string, w io.Writer) (string, int, error) { +func runBQLFromFile(ctx context.Context, driver storage.Store, chanSize, bulkSize int, line string, w io.Writer) (string, int, error) { ss := strings.Split(strings.TrimSpace(line), " ") if len(ss) != 2 { return "", 0, fmt.Errorf("wrong syntax: run ") @@ -234,7 +234,7 @@ func runBQLFromFile(ctx context.Context, driver storage.Store, chanSize int, lin } for idx, stm := range lines { fmt.Printf("Processing statement (%d/%d)\n", idx+1, len(lines)) - _, err := runBQL(ctx, stm, driver, chanSize, w) + _, err := runBQL(ctx, stm, driver, chanSize, bulkSize, w) if err != nil { return "", 0, fmt.Errorf("%v on\n%s\n", err, stm) } @@ -244,8 +244,8 @@ func runBQLFromFile(ctx context.Context, driver storage.Store, chanSize int, lin } // runBQL attempts to execute the provided query against the given store. -func runBQL(ctx context.Context, bql string, s storage.Store, chanSize int, w io.Writer) (*table.Table, error) { - pln, err := planBQL(ctx, bql, s, chanSize, w) +func runBQL(ctx context.Context, bql string, s storage.Store, chanSize, bulkSize int, w io.Writer) (*table.Table, error) { + pln, err := planBQL(ctx, bql, s, chanSize, bulkSize, w) if err != nil { return nil, err } @@ -257,7 +257,7 @@ func runBQL(ctx context.Context, bql string, s storage.Store, chanSize int, w io } // planBQL attempts to create the execution plan for the provided query against the given store. -func planBQL(ctx context.Context, bql string, s storage.Store, chanSize int, w io.Writer) (planner.Executor, error) { +func planBQL(ctx context.Context, bql string, s storage.Store, chanSize, bulkSize int, w io.Writer) (planner.Executor, error) { p, err := grammar.NewParser(grammar.SemanticBQL()) if err != nil { return nil, fmt.Errorf("failed to initilize a valid BQL parser") @@ -266,7 +266,7 @@ func planBQL(ctx context.Context, bql string, s storage.Store, chanSize int, w i if err := p.Parse(grammar.NewLLk(bql, 1), stm); err != nil { return nil, fmt.Errorf("failed to parse BQL statement with error %v", err) } - pln, err := planner.New(ctx, s, stm, chanSize, w) + pln, err := planner.New(ctx, s, stm, chanSize, bulkSize, w) if err != nil { return nil, fmt.Errorf("should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err) } diff --git a/tools/vcli/bw/run/run.go b/tools/vcli/bw/run/run.go index 2f05c16b..49f2ab3a 100644 --- a/tools/vcli/bw/run/run.go +++ b/tools/vcli/bw/run/run.go @@ -33,7 +33,7 @@ import ( ) // New creates the help command. -func New(store storage.Store, chanSize int) *command.Command { +func New(store storage.Store, chanSize, bulkSize int) *command.Command { cmd := &command.Command{ UsageLine: "run file_path", Short: "runs BQL statements.", @@ -43,13 +43,13 @@ sequentially. `, } cmd.Run = func(ctx context.Context, args []string) int { - return runCommand(ctx, cmd, args, store, chanSize) + return runCommand(ctx, cmd, args, store, chanSize, bulkSize) } return cmd } // runCommand runs all the BQL statements available in the file. -func runCommand(ctx context.Context, cmd *command.Command, args []string, store storage.Store, chanSize int) int { +func runCommand(ctx context.Context, cmd *command.Command, args []string, store storage.Store, chanSize, bulkSize int) int { if len(args) < 3 { log.Printf("[ERROR] Missing required file path. ") cmd.Usage() @@ -64,7 +64,7 @@ func runCommand(ctx context.Context, cmd *command.Command, args []string, store fmt.Printf("Processing file %s\n\n", args[len(args)-1]) for idx, stm := range lines { fmt.Printf("Processing statement (%d/%d):\n%s\n\n", idx+1, len(lines), stm) - tbl, err := BQL(ctx, stm, store, chanSize) + tbl, err := BQL(ctx, stm, store, chanSize, bulkSize) if err != nil { fmt.Printf("[FAIL] %v\n\n", err) continue @@ -79,7 +79,7 @@ func runCommand(ctx context.Context, cmd *command.Command, args []string, store } // BQL attempts to execute the provided query against the given store. -func BQL(ctx context.Context, bql string, s storage.Store, chanSize int) (*table.Table, error) { +func BQL(ctx context.Context, bql string, s storage.Store, chanSize, bulkSize int) (*table.Table, error) { p, err := grammar.NewParser(grammar.SemanticBQL()) if err != nil { return nil, fmt.Errorf("[ERROR] Failed to initilize a valid BQL parser") @@ -88,7 +88,7 @@ func BQL(ctx context.Context, bql string, s storage.Store, chanSize int) (*table if err := p.Parse(grammar.NewLLk(bql, 1), stm); err != nil { return nil, fmt.Errorf("[ERROR] Failed to parse BQL statement with error %v", err) } - pln, err := planner.New(ctx, s, stm, chanSize, nil) + pln, err := planner.New(ctx, s, stm, chanSize, bulkSize, nil) if err != nil { return nil, fmt.Errorf("[ERROR] Should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err) } diff --git a/tools/vcli/bw/server/server.go b/tools/vcli/bw/server/server.go index e980b2a5..a4e537b0 100644 --- a/tools/vcli/bw/server/server.go +++ b/tools/vcli/bw/server/server.go @@ -37,7 +37,7 @@ import ( ) // New creates the help command. -func New(store storage.Store, chanSize int) *command.Command { +func New(store storage.Store, chanSize, bulkSize int) *command.Command { cmd := &command.Command{ UsageLine: "server port", Short: "runs a BQL endoint.", @@ -45,7 +45,7 @@ func New(store storage.Store, chanSize int) *command.Command { all BQL queries and returns a JSON table with the results.`, } cmd.Run = func(ctx context.Context, args []string) int { - return runServer(ctx, cmd, args, store, chanSize) + return runServer(ctx, cmd, args, store, chanSize, bulkSize) } return cmd } @@ -54,10 +54,11 @@ all BQL queries and returns a JSON table with the results.`, type serverConfig struct { store storage.Store chanSize int + bulkSize int } // runServer runs the simple BQL endpoint. -func runServer(ctx context.Context, cmd *command.Command, args []string, store storage.Store, chanSize int) int { +func runServer(ctx context.Context, cmd *command.Command, args []string, store storage.Store, chanSize, bulkSize int) int { // Check parameters. if len(args) < 3 { log.Printf("[%v] Missing required port number. ", time.Now()) @@ -78,6 +79,7 @@ func runServer(ctx context.Context, cmd *command.Command, args []string, store s s := &serverConfig{ store: store, chanSize: chanSize, + bulkSize: bulkSize, } http.HandleFunc("/bql", s.bqlHandler) http.HandleFunc("/", defaultHandler) @@ -121,7 +123,7 @@ func (s *serverConfig) bqlHandler(w http.ResponseWriter, r *http.Request) { if nq, err := url.QueryUnescape(q); err == nil { q = strings.Replace(strings.Replace(nq, "\n", " ", -1), "\r", " ", -1) } - t, err := BQL(ctx, q, s.store, s.chanSize) + t, err := BQL(ctx, q, s.store, s.chanSize, s.bulkSize) r := &result{ Q: q, T: t, @@ -182,7 +184,7 @@ func getQueries(raw []string) []string { } // BQL attempts to execute the provided query against the given store. -func BQL(ctx context.Context, bql string, s storage.Store, chanSize int) (*table.Table, error) { +func BQL(ctx context.Context, bql string, s storage.Store, chanSize, bulkSize int) (*table.Table, error) { p, err := grammar.NewParser(grammar.SemanticBQL()) if err != nil { return nil, fmt.Errorf("[ERROR] Failed to initilize a valid BQL parser") @@ -191,7 +193,7 @@ func BQL(ctx context.Context, bql string, s storage.Store, chanSize int) (*table if err := p.Parse(grammar.NewLLk(bql, 1), stm); err != nil { return nil, fmt.Errorf("[ERROR] Failed to parse BQL statement with error %v", err) } - pln, err := planner.New(ctx, s, stm, chanSize, nil) + pln, err := planner.New(ctx, s, stm, chanSize, bulkSize, nil) if err != nil { return nil, fmt.Errorf("[ERROR] Should have not failed to create a plan using memory.DefaultStorage for statement %v with error %v", stm, err) }