Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,15 +722,13 @@ func (n *network) cancelDriverWatches() {
}
}

func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
func (c *controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
for {
select {
case ev, ok := <-ch:
if !ok {
return
}

case ev := <-ch.C:
fn(ev)
case <-ch.Done():
return
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,17 @@ func TestNetworkDBWatch(t *testing.T) {
err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
assert.NoError(t, err)

testWatch(t, ch, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")

err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
assert.NoError(t, err)

testWatch(t, ch, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")

err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
assert.NoError(t, err)

testWatch(t, ch, DeleteEvent{}, "test_table", "network1", "test_key", "")
testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")

cancel()
closeNetworkDBInstances(dbs)
Expand Down
4 changes: 2 additions & 2 deletions networkdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type DeleteEvent event
// filter is an empty string it acts as a wildcard for that
// field. Watch returns a channel of events, where the events will be
// sent.
func (nDB *NetworkDB) Watch(tname, nid, key string) (chan events.Event, func()) {
func (nDB *NetworkDB) Watch(tname, nid, key string) (*events.Channel, func()) {
var matcher events.Matcher

if tname != "" || nid != "" || key != "" {
Expand Down Expand Up @@ -82,7 +82,7 @@ func (nDB *NetworkDB) Watch(tname, nid, key string) (chan events.Event, func())
}

nDB.broadcaster.Add(sink)
return ch.C, func() {
return ch, func() {
nDB.broadcaster.Remove(sink)
ch.Close()
sink.Close()
Expand Down