Skip to content

Commit 3aa3b43

Browse files
authored
task: Implement task scheduling with async and sync support (#91)
* task: Implement task scheduling with async and sync support Signed-off-by: Xuanwo <github@xuanwo.io> * Remove hardcoded host Signed-off-by: Xuanwo <github@xuanwo.io> * Fix nats not set logger Signed-off-by: Xuanwo <github@xuanwo.io> * format import Signed-off-by: Xuanwo <github@xuanwo.io> * Rename conn to queue Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent 9979922 commit 3aa3b43

File tree

12 files changed

+529
-353
lines changed

12 files changed

+529
-353
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.15
55
require (
66
github.com/aos-dev/go-service-fs/v2 v2.0.1-0.20210304102732-f57fecf4e68a
77
github.com/aos-dev/go-storage/v3 v3.4.1
8-
github.com/aos-dev/go-toolbox v0.0.0-20210309062625-6031b9981b43
8+
github.com/aos-dev/go-toolbox v0.0.0-20210310073023-db4ad0026279
99
github.com/golang/protobuf v1.4.2
1010
github.com/google/uuid v1.2.0
1111
github.com/nats-io/nats-server/v2 v2.1.9

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ github.com/aos-dev/go-service-fs/v2 v2.0.1-0.20210304102732-f57fecf4e68a/go.mod
1010
github.com/aos-dev/go-storage/v3 v3.0.1-0.20210205074802-e8a5b22166c2/go.mod h1:JznhvyhPDnETfZ3RyWm3mT/S5ic+uuYFWagSIkUZujc=
1111
github.com/aos-dev/go-storage/v3 v3.4.1 h1:D8xrwx64w50E89POJy3xTc/sUM6o8zs8PYHKuIi/cL0=
1212
github.com/aos-dev/go-storage/v3 v3.4.1/go.mod h1:TjjOLA8NhJISnvYyi3aDxUDRXzAoypM17uvfcLMsfUw=
13-
github.com/aos-dev/go-toolbox v0.0.0-20210309062625-6031b9981b43 h1:uool91dh3joqYa9bRpIFt78UCGtIDLhBdJkMpOJSKwo=
14-
github.com/aos-dev/go-toolbox v0.0.0-20210309062625-6031b9981b43/go.mod h1:z9stQnEScgkLExuPYa2x8YosLzqFJm2W947a6QztsM8=
13+
github.com/aos-dev/go-toolbox v0.0.0-20210310073023-db4ad0026279 h1:wYwWyQ3FyFHW27WCyv8tqmxOk0S9sT2SYL+LJAcjGOk=
14+
github.com/aos-dev/go-toolbox v0.0.0-20210310073023-db4ad0026279/go.mod h1:z9stQnEScgkLExuPYa2x8YosLzqFJm2W947a6QztsM8=
1515
github.com/aos-dev/specs/go v0.0.0-20210205073047-af8ef94af73d/go.mod h1:XTNlLZtPA1inITyDH5hNnQXVjvvKUvo+lurs5GYB8NA=
1616
github.com/aos-dev/specs/go v0.0.0-20210304082749-d044cb2d3362 h1:QqfusB82p+7kHhzXMW/z9V+QHob3CIRxLwUY+L9Qk84=
1717
github.com/aos-dev/specs/go v0.0.0-20210304082749-d044cb2d3362/go.mod h1:XTNlLZtPA1inITyDH5hNnQXVjvvKUvo+lurs5GYB8NA=

proto/job.pb.go

Lines changed: 52 additions & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/job.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ message Job {
1212
message JobReply {
1313
string id = 1;
1414
uint32 status = 2;
15+
string message = 3;
1516
}
1617

1718
message CopyDir {

task/agent.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package task
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
fs "github.com/aos-dev/go-service-fs/v2"
8+
"github.com/aos-dev/go-storage/v3/types"
9+
"github.com/aos-dev/go-toolbox/zapcontext"
10+
"github.com/nats-io/nats.go"
11+
natsproto "github.com/nats-io/nats.go/encoders/protobuf"
12+
"go.uber.org/zap"
13+
14+
"github.com/aos-dev/noah/proto"
15+
)
16+
17+
type Agent struct {
18+
w *Worker
19+
t *proto.Task
20+
21+
queue *nats.EncodedConn
22+
subject string
23+
storages []types.Storager
24+
25+
log *zap.Logger
26+
}
27+
28+
func NewAgent(w *Worker, t *proto.Task) *Agent {
29+
return &Agent{
30+
w: w,
31+
t: t,
32+
33+
log: w.log,
34+
}
35+
}
36+
37+
func (a *Agent) Handle() (err error) {
38+
ctx := context.Background()
39+
40+
reply, err := a.w.node.Upgrade(ctx, &proto.UpgradeRequest{
41+
NodeId: a.w.id,
42+
TaskId: a.t.Id,
43+
})
44+
if err != nil {
45+
return fmt.Errorf("node upgrade: %v", err)
46+
}
47+
a.log.Info("receive upgrade", zap.String("reply", reply.String()))
48+
49+
a.subject = reply.Subject
50+
51+
err = a.parseStorage(ctx)
52+
if err != nil {
53+
return
54+
}
55+
56+
if reply.NodeId == a.w.id {
57+
return a.handleServer(ctx, reply.Addr)
58+
} else {
59+
return a.handleClient(ctx, reply.Addr)
60+
}
61+
}
62+
63+
func (a *Agent) parseStorage(ctx context.Context) (err error) {
64+
for range a.t.Endpoints {
65+
a.storages = append(a.storages, &fs.Storage{})
66+
}
67+
return
68+
}
69+
70+
func (a *Agent) handleServer(ctx context.Context, addr string) (err error) {
71+
conn, err := nats.Connect(addr)
72+
if err != nil {
73+
return fmt.Errorf("nats connect: %w", err)
74+
}
75+
queue, err := nats.NewEncodedConn(conn, natsproto.PROTOBUF_ENCODER)
76+
if err != nil {
77+
return fmt.Errorf("nats encoded connect: %w", err)
78+
}
79+
a.queue = queue
80+
81+
// FIXME: we need to maintain task running status instead of job's
82+
rn := NewRunner(a, a.t.Job)
83+
84+
return rn.Async(ctx, a.t.Job)
85+
}
86+
87+
func (a *Agent) handleClient(ctx context.Context, addr string) (err error) {
88+
log := zapcontext.From(ctx)
89+
90+
log.Info("agent connect to job queue", zap.String("addr", addr))
91+
92+
conn, err := nats.Connect(addr)
93+
if err != nil {
94+
return
95+
}
96+
queue, err := nats.NewEncodedConn(conn, natsproto.PROTOBUF_ENCODER)
97+
if err != nil {
98+
return
99+
}
100+
a.queue = queue
101+
102+
// FIXME: we need to handle the returning subscription.
103+
_, err = a.queue.QueueSubscribe(a.subject, a.subject, a.handleJob)
104+
if err != nil {
105+
return fmt.Errorf("nats subscribe: %w", err)
106+
}
107+
return
108+
}
109+
110+
func (a *Agent) handleJob(subject, reply string, job *proto.Job) {
111+
NewRunner(a, job).Handle(subject, reply)
112+
}
Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,25 @@ package task
22

33
import (
44
"context"
5-
"net"
65
"testing"
76
"time"
87

98
protobuf "github.com/golang/protobuf/proto"
109
"github.com/google/uuid"
11-
"google.golang.org/grpc"
1210

1311
"github.com/aos-dev/noah/proto"
1412
)
1513

16-
const testGRPC = "localhost:7010"
17-
1814
func setupPortal(t *testing.T) *Portal {
19-
l, err := net.Listen("tcp", testGRPC)
20-
if err != nil {
21-
t.Error(err)
22-
}
23-
24-
srv := grpc.NewServer()
25-
26-
p, err := NewPortal()
15+
p, err := NewPortal(context.Background(), PortalConfig{
16+
Host: "localhost",
17+
GrpcPort: 7000,
18+
QueuePort: 7010,
19+
})
2720
if err != nil {
2821
t.Error(err)
2922
}
3023

31-
proto.RegisterNodeServer(srv, p)
32-
go func() {
33-
err = srv.Serve(l)
34-
if err != nil {
35-
t.Error(err)
36-
}
37-
}()
38-
3924
return p
4025
}
4126

@@ -46,7 +31,10 @@ func testWorker(t *testing.T) {
4631
ctx := context.Background()
4732

4833
for i := 0; i < 10; i++ {
49-
w, err := NewWorker(ctx, testGRPC)
34+
w, err := NewWorker(ctx, WorkerConfig{
35+
Host: "localhost",
36+
PortalAddr: "localhost:7000",
37+
})
5038
if err != nil {
5139
t.Error(err)
5240
}

task/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,8 @@ const (
77
TypeCopyMultipartFile
88
TypeCopyMultipart
99
)
10+
11+
const (
12+
JobStatusSucceed uint32 = iota
13+
JobStatusFailed
14+
)

0 commit comments

Comments
 (0)