Skip to content
This repository was archived by the owner on Nov 8, 2022. It is now read-only.

Commit 4b98fbb

Browse files
jcooklinIzabellaRaulin
authored andcommitted
Adds event listener to avoid sleep statements
1 parent 4f681cb commit 4b98fbb

3 files changed

Lines changed: 87 additions & 29 deletions

File tree

scheduler/distributed_task_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/intelsdi-x/snap/grpc/controlproxy"
3838
"github.com/intelsdi-x/snap/pkg/schedule"
3939
"github.com/intelsdi-x/snap/plugin/helper"
40+
"github.com/intelsdi-x/snap/scheduler/fixtures"
4041
"github.com/intelsdi-x/snap/scheduler/wmap"
4142
. "github.com/smartystreets/goconvey/convey"
4243
)
@@ -302,6 +303,8 @@ func TestDistributedSubscriptions(t *testing.T) {
302303
})
303304
})
304305
Convey("Single run task", func() {
306+
lse := fixtures.NewListenToSchedulerEvent()
307+
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
305308
count := uint(1)
306309
interval := time.Millisecond * 100
307310
sch := schedule.NewWindowedSchedule(interval, nil, nil, count)
@@ -323,9 +326,11 @@ func TestDistributedSubscriptions(t *testing.T) {
323326
So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0)
324327
})
325328
Convey("Task should be ended after an interval", func() {
326-
// wait for the end of the task
327-
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
328-
time.Sleep(interval + time.Millisecond*100)
329+
// wait for the end of the task (or timeout)
330+
select {
331+
case <-lse.Ended:
332+
case <-time.After(time.Duration(interval.Nanoseconds()*int64(count)+interval.Nanoseconds()) + 1*time.Second):
333+
}
329334
So(t.State(), ShouldEqual, core.TaskEnded)
330335

331336
Convey("So all dependencies should have been usubscribed", func() {

scheduler/fixtures/fixtures.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// + build medium legacy
2+
3+
/*
4+
http://www.apache.org/licenses/LICENSE-2.0.txt
5+
6+
7+
Copyright 2017 Intel Corporation
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package fixtures
23+
24+
import "github.com/intelsdi-x/gomit"
25+
import "github.com/intelsdi-x/snap/core/scheduler_event"
26+
27+
type listenToSchedulerEvent struct {
28+
Ended chan struct{}
29+
}
30+
31+
// NewListenToSchedulerEvent
32+
func NewListenToSchedulerEvent() *listenToSchedulerEvent {
33+
return &listenToSchedulerEvent{
34+
Ended: make(chan struct{}),
35+
}
36+
}
37+
38+
func (l *listenToSchedulerEvent) HandleGomitEvent(e gomit.Event) {
39+
switch e.Body.(type) {
40+
case *scheduler_event.TaskEndedEvent:
41+
l.Ended <- struct{}{}
42+
}
43+
}

scheduler/scheduler_medium_test.go

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/intelsdi-x/snap/core/ctypes"
3535
"github.com/intelsdi-x/snap/core/serror"
3636
"github.com/intelsdi-x/snap/pkg/schedule"
37+
"github.com/intelsdi-x/snap/scheduler/fixtures"
3738
"github.com/intelsdi-x/snap/scheduler/wmap"
3839
)
3940

@@ -250,6 +251,8 @@ func TestCreateTask(t *testing.T) {
250251
})
251252
})
252253
Convey("should not error when the schedule is valid", func() {
254+
lse := fixtures.NewListenToSchedulerEvent()
255+
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
253256
start := time.Now().Add(startWait)
254257
stop := time.Now().Add(startWait + windowSize)
255258
sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0)
@@ -260,11 +263,11 @@ func TestCreateTask(t *testing.T) {
260263
task := s.tasks.Get(tsk.ID())
261264
task.Spin()
262265
Convey("the task should be ended after reaching the end of window", func() {
263-
// wait for the end of determined window
264-
time.Sleep(startWait + windowSize)
265-
// wait an interval to be sure that the task state has been updated
266-
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
267-
time.Sleep(interval + time.Millisecond*100)
266+
// wait for task ended event (or timeout)
267+
select {
268+
case <-lse.Ended:
269+
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
270+
}
268271
// check if the task is ended
269272
So(tsk.State(), ShouldEqual, core.TaskEnded)
270273
})
@@ -290,6 +293,8 @@ func TestCreateTask(t *testing.T) {
290293
})
291294
})
292295
Convey("Single run task firing on defined start time", func() {
296+
lse := fixtures.NewListenToSchedulerEvent()
297+
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
293298
count := uint(1)
294299
start := time.Now().Add(startWait)
295300
sch := schedule.NewWindowedSchedule(interval, &start, nil, count)
@@ -300,11 +305,11 @@ func TestCreateTask(t *testing.T) {
300305
task := s.tasks.Get(tsk.ID())
301306
task.Spin()
302307
Convey("the task should be ended after reaching the end of window", func() {
303-
// wait for the end of determined window
304-
time.Sleep(startWait)
305-
// wait an interval to be sure that the task state has been updated
306-
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
307-
time.Sleep(interval + time.Millisecond*100)
308+
// wait for task ended event (or timeout)
309+
select {
310+
case <-lse.Ended:
311+
case <-time.After(time.Duration(interval.Nanoseconds()*int64(count)+interval.Nanoseconds()) + 1*time.Second):
312+
}
308313
// check if the task is ended
309314
So(tsk.State(), ShouldEqual, core.TaskEnded)
310315
})
@@ -400,6 +405,8 @@ func TestStopTask(t *testing.T) {
400405
})
401406
})
402407
Convey("Calling StopTask on an ended task", t, func() {
408+
lse := fixtures.NewListenToSchedulerEvent()
409+
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
403410
start := time.Now().Add(startWait)
404411
stop := time.Now().Add(startWait + windowSize)
405412

@@ -412,11 +419,11 @@ func TestStopTask(t *testing.T) {
412419
task := s.tasks.Get(tsk.ID())
413420
task.Spin()
414421

415-
// wait for the end of determined window
416-
time.Sleep(startWait + windowSize)
417-
// wait an interval to be sure that the task state has been updated
418-
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
419-
time.Sleep(interval + time.Millisecond*100)
422+
// wait for task ended event (or timeout)
423+
select {
424+
case <-lse.Ended:
425+
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
426+
}
420427
// check if the task is ended
421428
So(tsk.State(), ShouldEqual, core.TaskEnded)
422429

@@ -486,6 +493,8 @@ func TestStartTask(t *testing.T) {
486493
})
487494
})
488495
Convey("Calling StartTask on an ended windowed task", t, func() {
496+
lse := fixtures.NewListenToSchedulerEvent()
497+
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
489498
start := time.Now().Add(startWait)
490499
stop := time.Now().Add(startWait + windowSize)
491500

@@ -498,11 +507,11 @@ func TestStartTask(t *testing.T) {
498507
task := s.tasks.Get(tsk.ID())
499508
task.Spin()
500509

501-
// wait for the end of determined window
502-
time.Sleep(startWait + windowSize)
503-
// wait an interval to be sure that the task state has been updated
504-
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
505-
time.Sleep(interval + time.Millisecond*100)
510+
// wait for task ended event (or timeout)
511+
select {
512+
case <-lse.Ended:
513+
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
514+
}
506515

507516
// check if the task is ended
508517
So(tsk.State(), ShouldEqual, core.TaskEnded)
@@ -583,6 +592,8 @@ func TestEnableTask(t *testing.T) {
583592
})
584593
})
585594
Convey("Calling EnableTask on an ended task", t, func() {
595+
lse := fixtures.NewListenToSchedulerEvent()
596+
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
586597
start := time.Now().Add(startWait)
587598
stop := time.Now().Add(startWait + windowSize)
588599

@@ -595,12 +606,11 @@ func TestEnableTask(t *testing.T) {
595606
task := s.tasks.Get(tsk.ID())
596607
task.Spin()
597608

598-
// wait for the end of determined window
599-
time.Sleep(startWait + windowSize)
600-
// wait an interval to be sure that the task state has been updated
601-
/// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
602-
time.Sleep(interval + time.Millisecond*100)
603-
609+
// wait for task ended event (or timeout)
610+
select {
611+
case <-lse.Ended:
612+
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
613+
}
604614
// check if the task is ended
605615
So(tsk.State(), ShouldEqual, core.TaskEnded)
606616

0 commit comments

Comments
 (0)