Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 250 additions & 9 deletions bql/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/google/badwolf/storage"
"github.com/google/badwolf/triple"
"github.com/google/badwolf/triple/literal"
"github.com/google/badwolf/triple/predicate"
)

// Executor interface unifies the execution of statements.
Expand Down Expand Up @@ -140,7 +141,7 @@ type insertPlan struct {

type updater func(storage.Graph, []*triple.Triple) error

func update(ctx context.Context, stm *semantic.Statement, gbs []string, store storage.Store, f updater) error {
func update(ctx context.Context, ts []*triple.Triple, gbs []string, store storage.Store, f updater) error {
var (
mu sync.Mutex
wg sync.WaitGroup
Expand All @@ -161,7 +162,7 @@ func update(ctx context.Context, stm *semantic.Statement, gbs []string, store st
appendError(err)
return
}
err = f(g, stm.Data())
err = f(g, ts)
if err != nil {
appendError(err)
}
Expand All @@ -180,7 +181,7 @@ func (p *insertPlan) Execute(ctx context.Context) (*table.Table, error) {
if err != nil {
return nil, err
}
return t, update(ctx, p.stm, p.stm.OutputGraphNames(), p.store, func(g storage.Graph, d []*triple.Triple) error {
return t, update(ctx, p.stm.Data(), p.stm.OutputGraphNames(), p.store, func(g storage.Graph, d []*triple.Triple) error {
trace(p.tracer, func() []string {
return []string{"Inserting triples to graph \"" + g.ID(ctx) + "\""}
})
Expand Down Expand Up @@ -217,7 +218,7 @@ func (p *deletePlan) Execute(ctx context.Context) (*table.Table, error) {
if err != nil {
return nil, err
}
return t, update(ctx, p.stm, p.stm.InputGraphNames(), p.store, func(g storage.Graph, d []*triple.Triple) error {
return t, update(ctx, p.stm.Data(), p.stm.InputGraphNames(), p.store, func(g storage.Graph, d []*triple.Triple) error {
trace(p.tracer, func() []string {
return []string{"Removing triples from graph \"" + g.ID(ctx) + "\""}
})
Expand Down Expand Up @@ -587,7 +588,7 @@ func (p *queryPlan) projectAndGroupBy() error {
return p.tbl.ProjectBindings(p.stm.OutputBindings())
}
trace(p.tracer, func() []string {
return []string{"Starting roup reduce and projection"}
return []string{"Starting group reduce and projection"}
})
// The table needs to be group reduced.
// Project only binding involved in the group operation.
Expand Down Expand Up @@ -632,15 +633,15 @@ func (p *queryPlan) projectAndGroupBy() error {
case lexer.ItemSum:
cell := p.tbl.Rows()[0][prj.Binding]
if cell.L == nil {
return fmt.Errorf("cannot only sum int64 and float64 literals; found %s instead for binding %q", cell, prj.Binding)
return fmt.Errorf("can only sum int64 and float64 literals; found %s instead for binding %q", cell, prj.Binding)
}
switch cell.L.Type() {
case literal.Int64:
aap.Acc = table.NewSumInt64LiteralAccumulator(0)
case literal.Float64:
aap.Acc = table.NewSumFloat64LiteralAccumulator(0)
default:
return fmt.Errorf("cannot only sum int64 and float64 literals; found literal type %s instead for binding %q", cell.L.Type(), prj.Binding)
return fmt.Errorf("can only sum int64 and float64 literals; found literal type %s instead for binding %q", cell.L.Type(), prj.Binding)
}
}
aaps = append(aaps, aap)
Expand Down Expand Up @@ -706,7 +707,7 @@ func (p *queryPlan) limit() {

// Execute queries the indicated graphs.
func (p *queryPlan) Execute(ctx context.Context) (*table.Table, error) {
// Fetch and catch graph instances.
// Fetch and cache graph instances.
trace(p.tracer, func() []string {
return []string{fmt.Sprintf("Caching graph instances for graphs %v", p.stm.InputGraphNames())}
})
Expand Down Expand Up @@ -788,8 +789,239 @@ func (p *queryPlan) String() string {
return b.String()
}

// constructPlan encapsulates the sequence of instructions that need to be
// executed in order to satisfy the execution of a valid construct BQL statement.
type constructPlan struct {
stm *semantic.Statement
store storage.Store
tracer io.Writer
bulkSize int
queryPlan *queryPlan
}

func (p *constructPlan) processConstructClause(cc *semantic.ConstructClause, tbl *table.Table, r table.Row) (*triple.Triple, error) {
var err error
sbj, prd, obj := cc.S, cc.P, cc.O
if sbj == nil && tbl.HasBinding(cc.SBinding) {
v, ok := r[cc.SBinding]
if !ok {
return nil, fmt.Errorf("row %+v misses binding %q", r, cc.SBinding)
}
if v.N == nil {
return nil, fmt.Errorf("binding %q requires a node, got %+v instead", cc.SBinding, v)
}
sbj = v.N
}
if prd == nil {
if tbl.HasBinding(cc.PBinding) {
// Try to bind the predicate.
v, ok := r[cc.PBinding]
if !ok {
return nil, fmt.Errorf("row %+v misses binding %q", r, cc.PBinding)
}
if v.P == nil {
return nil, fmt.Errorf("binding %q requires a predicate, got %+v instead", cc.PBinding, v)
}
prd = v.P
} else if cc.PTemporal && cc.PAnchorBinding != "" {
// Try to bind the predicate anchor.
v, ok := r[cc.PAnchorBinding]
if !ok {
return nil, fmt.Errorf("row %+v misses binding %q", r, cc.PAnchorBinding)
}
if v.T == nil {
return nil, fmt.Errorf("binding %q requires a time, got %+v instead", cc.PAnchorBinding, v)
}
prd, err = predicate.NewTemporal(cc.PID, *v.T)
if err != nil {
return nil, err
}
}
}
if obj == nil {
if tbl.HasBinding(cc.OBinding) {
// Try to bind the object
v, ok := r[cc.OBinding]
if !ok {
return nil, fmt.Errorf("row %+v misses binding %q", r, cc.OBinding)
}
co, err := cellToObject(v)
if err != nil {
return nil, err
}
obj = co
} else if cc.OTemporal && cc.OAnchorBinding != "" {
// Try to bind the object anchor.
v, ok := r[cc.OAnchorBinding]
if !ok {
return nil, fmt.Errorf("row %+v misses binding %q", r, cc.OAnchorBinding)
}
if v.T == nil {
return nil, fmt.Errorf("binding %q requires a time, got %+v instead", cc.OAnchorBinding, v)
}
op, err := predicate.NewTemporal(cc.OID, *v.T)
if err != nil {
return nil, err
}
obj = triple.NewPredicateObject(op)
}
}
t, err := triple.New(sbj, prd, obj)
return t, err
}

func (p *constructPlan) processReificationClause(rc *semantic.ReificationClause, tbl *table.Table, r table.Row) (*predicate.Predicate, *triple.Object, error) {
var err error
rprd, robj := rc.P, rc.O
if rprd == nil {
if tbl.HasBinding(rc.PBinding) {
// Try to bind the predicate.
v, ok := r[rc.PBinding]
if !ok {
return nil, nil, fmt.Errorf("row %+v misses binding %q", r, rc.PBinding)
}
if v.P == nil {
return nil, nil, fmt.Errorf("binding %q requires a predicate, got %+v instead", rc.PBinding, v)
}
rprd = v.P
} else if rc.PTemporal && rc.PAnchorBinding != "" {
// Try to bind the predicate anchor.
v, ok := r[rc.PAnchorBinding]
if !ok {
return nil, nil, fmt.Errorf("row %+v misses binding %q", r, rc.PAnchorBinding)
}
if v.T == nil {
return nil, nil, fmt.Errorf("binding %q requires a time, got %+v instead", rc.PAnchorBinding, v)
}
rprd, err = predicate.NewTemporal(rc.PID, *v.T)
if err != nil {
return nil, nil, err
}
}
}
if robj == nil {
if tbl.HasBinding(rc.OBinding) {
// Try to bind the object
v, ok := r[rc.OBinding]
if !ok {
return nil, nil, fmt.Errorf("row %+v misses binding %q", r, rc.OBinding)
}
co, err := cellToObject(v)
if err != nil {
return nil, nil, err
}
robj = co
} else if rc.OTemporal && rc.OAnchorBinding != "" {
// Try to bind the object anchor.
v, ok := r[rc.OAnchorBinding]
if !ok {
return nil, nil, fmt.Errorf("row %+v misses binding %q", r, rc.OAnchorBinding)
}
if v.T == nil {
return nil, nil, fmt.Errorf("binding %q requires a time, got %+v instead", rc.OAnchorBinding, v)
}
rop, err := predicate.NewTemporal(rc.OID, *v.T)
if err != nil {
return nil, nil, err
}
robj = triple.NewPredicateObject(rop)
}

}
return rprd, robj, nil
}

func (p *constructPlan) Execute(ctx context.Context) (*table.Table, error) {
tbl, err := p.queryPlan.Execute(ctx)
if err != nil {
return nil, err
}
// The buffered channel has capacity to accommodate twice the amount of triples stored in a single call.
tripChan := make(chan *triple.Triple, 2*p.bulkSize)
done := make(chan bool)

go func() {
var ts []*triple.Triple
updateFunc := func(g storage.Graph, d []*triple.Triple) error {
trace(p.tracer, func() []string {
return []string{"Inserting triples to graph \"" + g.ID(ctx) + "\""}
})
return g.AddTriples(ctx, d)
}
for elem := range tripChan {
ts = append(ts, elem)
if len(ts) >= p.bulkSize {
update(ctx, ts, p.stm.OutputGraphNames(), p.store, updateFunc)
ts = []*triple.Triple{}
}
}
if len(ts) > 0 {
update(ctx, ts, p.stm.OutputGraphNames(), p.store, updateFunc)
}
done <- true
}()

for _, cc := range p.stm.ConstructClauses() {
for _, r := range tbl.Rows() {
t, err := p.processConstructClause(cc, tbl, r)
if err != nil {
return nil, err
}
if len(cc.ReificationClauses()) > 0 {
// We need to reify a blank node.
rts, bn, err := t.Reify()
if err != nil {
fmt.Errorf("triple.Reify failed to reify %v with error %v", t, err)
return nil, err
}
for _, trpl := range rts[1:] {
tripChan <- trpl
}
for _, rc := range cc.ReificationClauses() {
rprd, robj, err := p.processReificationClause(rc, tbl, r)
if err != nil {
return nil, err
}
rt, err := triple.New(bn, rprd, robj)
if err != nil {
return nil, err
}
tripChan <- rt
}

} else {
tripChan <- t
}
}
}
close(tripChan)
// Wait until all triples are added to the store.
<-done

return tbl, nil
}

// String returns a readable description of the execution plan.
func (p *constructPlan) String() string {
b := bytes.NewBufferString("CONSTRUCT plan:\n\n")
b.WriteString("Input graphs:\n")
for _, gn := range p.stm.InputGraphNames() {
b.WriteString(fmt.Sprintf("\t%v\n", gn))
}
b.WriteString("Output graphs:\n")
for _, gn := range p.stm.OutputGraphNames() {
b.WriteString(fmt.Sprintf("\t%v\n", gn))
}
b.WriteString("Construct clauses:\n")
for _, cc := range p.stm.ConstructClauses() {
b.WriteString(fmt.Sprintf("\t%v\n", cc))
}
b.WriteString(fmt.Sprintf("\n%v", p.queryPlan.String()))
return b.String()
}

// New create a new executable plan given a semantic BQL statement.
func New(ctx context.Context, store storage.Store, stm *semantic.Statement, chanSize int, w io.Writer) (Executor, error) {
func New(ctx context.Context, store storage.Store, stm *semantic.Statement, chanSize, bulkSize int, w io.Writer) (Executor, error) {
switch stm.Type() {
case semantic.Query:
return newQueryPlan(ctx, store, stm, chanSize, w)
Expand Down Expand Up @@ -817,6 +1049,15 @@ func New(ctx context.Context, store storage.Store, stm *semantic.Statement, chan
store: store,
tracer: w,
}, nil
case semantic.Construct:
qp, _ := newQueryPlan(ctx, store, stm, chanSize, w)
return &constructPlan{
stm: stm,
store: store,
tracer: w,
bulkSize: bulkSize,
queryPlan: qp,
}, nil
default:
return nil, fmt.Errorf("planner.New: unknown statement type in statement %v", stm)
}
Expand Down
Loading