Skip to content

Commit c3274b5

Browse files
authored
fix: manage resources properly (#17)
* fix(simulation): remove client from map when sess closes * fix(simulation): ensure returns when sess closes * refactor(simulation): pass ctx from outside * fix(mcp): propagate ctx from readLoop * fix(game): do not spawn yet another goroutine for sending inputs * feat(mcp): implement Session.Closed() * fix(game): terminate if sess closed * refactor(simulation): accept context as a param * fix(simulation): break on context cancellation or timeout * refactor(simulation)!: remove unused ctx
1 parent 5455a1b commit c3274b5

File tree

4 files changed

+51
-43
lines changed

4 files changed

+51
-43
lines changed

cmd/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func main() {
2222
ctx, cancel := cli.NewSignalContext()
2323
defer cancel()
2424

25-
sim, err := simulation.New(ctx, localAddr)
25+
sim, err := simulation.New(localAddr)
2626
if err != nil {
2727
slog.Error("failed to instantiate simulation", "error", err)
2828
return

internal/game/game.go

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,32 +58,14 @@ func New(ctx context.Context, raddr string) (*Game, error) {
5858
nextSnapshot: snapshot{},
5959
snapshotLock: sync.Mutex{},
6060
}
61-
go g.sendLoop()
62-
go g.receiveLoop()
61+
go g.receiveLoop(context.Background())
6362
return g, nil
6463
}
6564

66-
func (g *Game) sendLoop() {
67-
ticker := time.NewTicker(time.Second / time.Duration(ebiten.TPS()))
68-
defer ticker.Stop()
69-
for ; ; <-ticker.C {
70-
g.inputBufferLock.Lock()
71-
data, err := g.inputBuffer.MarshalBinary()
72-
g.inputBufferLock.Unlock()
73-
if err != nil {
74-
slog.Warn("failed to marshal input buffer", "error", err)
75-
continue
76-
}
77-
78-
_ = g.sess.TrySend(data)
79-
}
80-
}
81-
82-
func (g *Game) receiveLoop() {
83-
ctx := context.Background()
65+
func (g *Game) receiveLoop(ctx context.Context) {
8466
for {
8567
data, err := g.sess.Receive(ctx)
86-
if errors.Is(err, mcp.ErrClosed) {
68+
if errors.Is(err, mcp.ErrClosed) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
8769
break
8870
}
8971
if err != nil {
@@ -164,7 +146,16 @@ func (g *Game) Update() error {
164146

165147
g.inputBufferLock.Lock()
166148
g.inputBuffer.Append(input)
149+
data, err := g.inputBuffer.MarshalBinary()
167150
g.inputBufferLock.Unlock()
151+
if err != nil {
152+
slog.Warn("failed to marshal input buffer", "error", err)
153+
return nil
154+
}
155+
if g.sess.Closed() {
156+
return ebiten.Termination
157+
}
158+
_ = g.sess.TrySend(data)
168159

169160
g.snapshotLock.Lock()
170161
if !g.nextSnapshot.t.IsZero() {

internal/mcp/listener.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func (ln *Listener) readLoop() {
334334
continue
335335
}
336336

337-
err = ln.handleDatagram(remote, datagram)
337+
err = ln.handleDatagram(context.Background(), remote, datagram)
338338
if err != nil {
339339
ln.logger.Warn("failed to handle datagram", "error", err)
340340
continue
@@ -347,7 +347,7 @@ func (ln *Listener) readLoop() {
347347
}
348348
}
349349

350-
func (ln *Listener) handleDatagram(remote net.Addr, datagram Datagram) error {
350+
func (ln *Listener) handleDatagram(ctx context.Context, remote net.Addr, datagram Datagram) error {
351351
if datagram.Version != version {
352352
return fmt.Errorf("version %d: version is not supported", datagram.Version)
353353
}
@@ -380,7 +380,7 @@ func (ln *Listener) handleDatagram(remote net.Addr, datagram Datagram) error {
380380
sess := ln.sessions[remote.String()]
381381
var err error
382382
sess.dieOnce.Do(func() {
383-
err = sess.partialUncheckedClose(context.TODO())
383+
err = sess.partialUncheckedClose(ctx)
384384
})
385385
if err != nil {
386386
ln.sessionCond.L.Unlock()
@@ -559,6 +559,15 @@ func (sess *Session) Close(ctx context.Context) error {
559559
return err
560560
}
561561

562+
func (sess *Session) Closed() bool {
563+
select {
564+
case _, open := <-sess.die:
565+
return !open
566+
default:
567+
return false
568+
}
569+
}
570+
562571
func (sess *Session) LocalAddr() net.Addr {
563572
return sess.local
564573
}

internal/simulation/simulation.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type Simulation struct {
2525
lastStateIndex uint32
2626
}
2727

28-
func New(ctx context.Context, laddr string) (*Simulation, error) {
28+
func New(laddr string) (*Simulation, error) {
2929
houseImg, err := openImage("./assets/house.png")
3030
if err != nil {
3131
return nil, err
@@ -45,7 +45,7 @@ func New(ctx context.Context, laddr string) (*Simulation, error) {
4545
state: state.State{},
4646
lastStateIndex: 0,
4747
}
48-
go sim.acceptLoop()
48+
go sim.acceptLoop(context.Background())
4949
return sim, nil
5050
}
5151

@@ -54,21 +54,21 @@ type clientType struct {
5454
inputc chan state.Input
5555
}
5656

57-
func (c clientType) start() {
57+
func (c clientType) start(ctx context.Context) {
5858
logger := slog.With("remote", c.sess.RemoteAddr())
5959

60-
// TODO: this is a temporary fix for the busy-loop performance issue
61-
ticker := time.NewTicker(time.Second / time.Duration(ebiten.TPS()))
62-
defer ticker.Stop()
63-
// TODO: plan for killing this infinite loop
64-
for ; ; <-ticker.C {
65-
data, succeeded := c.sess.TryReceive()
66-
if !succeeded {
60+
for {
61+
data, err := c.sess.Receive(ctx)
62+
if errors.Is(err, mcp.ErrClosed) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
63+
break
64+
}
65+
if err != nil {
66+
logger.Warn("failed to receive inputs from session", "error", err)
6767
continue
6868
}
6969

7070
var buf jitter.Buffer
71-
err := buf.UnmarshalBinary(data)
71+
err = buf.UnmarshalBinary(data)
7272
if err != nil {
7373
logger.Warn("failed to unmarshal inputs", "error", err)
7474
continue
@@ -78,6 +78,7 @@ func (c clientType) start() {
7878
b := make([]byte, 2+4)
7979
binary.BigEndian.PutUint16(b, 0 /* type = input ack */)
8080
binary.BigEndian.PutUint32(b[2:], indices[len(indices)-1])
81+
// i refuse to spawn a new goroutine just to do this
8182
_ = c.sess.TrySend(b)
8283
}
8384

@@ -90,28 +91,35 @@ func (c clientType) start() {
9091
}
9192
}
9293

93-
func (sim *Simulation) acceptLoop() {
94-
ctx := context.Background()
94+
func (sim *Simulation) acceptLoop(ctx context.Context) {
9595
for {
9696
sess, err := sim.ln.Accept(ctx)
97-
if errors.Is(err, mcp.ErrClosed) {
97+
if errors.Is(err, mcp.ErrClosed) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
9898
break
9999
}
100100
if err != nil {
101101
slog.Warn("failed to accept session", "error", err)
102102
continue
103103
}
104104

105-
// TODO: remove sess from sessions whenever closed
106-
107105
client := clientType{
108106
sess: sess,
109107
inputc: make(chan state.Input, 1),
110108
}
111-
go client.start()
109+
raddr := sess.RemoteAddr().String()
110+
go func() {
111+
client.start(context.Background())
112+
113+
// should not sess.Close() since the only reason client.start()
114+
// returns is because sess has closed.
115+
116+
sim.clientLock.Lock()
117+
delete(sim.clients, raddr)
118+
sim.clientLock.Unlock()
119+
}()
112120

113121
sim.clientLock.Lock()
114-
sim.clients[sess.RemoteAddr().String()] = client
122+
sim.clients[raddr] = client
115123
sim.clientLock.Unlock()
116124
}
117125
}

0 commit comments

Comments
 (0)