Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions cmd/root/run_event_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
tea "charm.land/bubbletea/v2"

"github.com/docker/docker-agent/pkg/app"
"github.com/docker/docker-agent/pkg/concurrent"
"github.com/docker/docker-agent/pkg/runtime"
"github.com/docker/docker-agent/pkg/shellpath"
)
Expand Down Expand Up @@ -80,33 +81,39 @@ func runEventHook(command string, payload []byte) {
// it, and exits; the spawning goroutine ends with the subprocess.
cmd := exec.CommandContext(context.Background(), shell, append(argsPrefix, command)...)
cmd.Stdin = bytes.NewReader(payload)
var out boundedBuffer
var out boundedWriter
cmd.Stdout = &out
cmd.Stderr = &out
if err := cmd.Run(); err != nil {
slog.Warn("on-event hook failed", "command", command, "error", err, "output", strings.TrimSpace(out.String()))
}
}

// boundedBuffer captures up to maxHookOutput bytes from a hook subprocess
// and silently discards the rest. It implements only io.Writer so it can be
// assigned to exec.Cmd's Stdout/Stderr without forcing exec to buffer the
// full output internally.
type boundedBuffer struct {
buf bytes.Buffer
// boundedWriter captures up to maxHookOutput bytes from a hook subprocess
// and silently discards the rest. It satisfies io.Writer so it can be
// assigned to exec.Cmd's Stdout/Stderr.
//
// exec.Cmd spawns separate copy goroutines for Stdout and Stderr, so the
// underlying buffer must be safe for concurrent writes; that's what
// [concurrent.Buffer] gives us. The cap is enforced softly: between
// Len() and Write() another goroutine may slip in a chunk, so we may
// over-shoot maxHookOutput by at most one Write per concurrent stream
// (a few KB) — acceptable for diagnostic output.
Comment thread
dgageot marked this conversation as resolved.
type boundedWriter struct {
buf concurrent.Buffer
}

func (b *boundedBuffer) Write(p []byte) (int, error) {
func (b *boundedWriter) Write(p []byte) (int, error) {
if remaining := maxHookOutput - b.buf.Len(); remaining > 0 {
if len(p) > remaining {
b.buf.Write(p[:remaining])
} else {
b.buf.Write(p)
chunk := p
if len(chunk) > remaining {
chunk = chunk[:remaining]
}
_, _ = b.buf.Write(chunk)
}
return len(p), nil
}

func (b *boundedBuffer) String() string {
func (b *boundedWriter) String() string {
return b.buf.String()
}
4 changes: 2 additions & 2 deletions cmd/root/run_event_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func TestParseOnEventFlags_BadFormat(t *testing.T) {
}
}

func TestBoundedBuffer_CapsAtMaxHookOutput(t *testing.T) {
var b boundedBuffer
func TestBoundedWriter_CapsAtMaxHookOutput(t *testing.T) {
var b boundedWriter

n, err := b.Write(bytes.Repeat([]byte("a"), maxHookOutput-3))
require.NoError(t, err)
Expand Down
20 changes: 6 additions & 14 deletions pkg/chatserver/conversation_lock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package chatserver

import "sync"
import "github.com/docker/docker-agent/pkg/concurrent"

// conversationLockSet ensures only one in-flight request at a time per
// conversation id. Concurrent requests sharing an id would otherwise share
Expand All @@ -12,12 +12,11 @@ import "sync"
// for two reasons: it surfaces the misuse to the client immediately, and it
// keeps the handler's resource cost bounded (no queue, no waiting goroutines).
type conversationLockSet struct {
mu sync.Mutex
active map[string]struct{}
active concurrent.Map[string, struct{}]
}

func newConversationLockSet() *conversationLockSet {
return &conversationLockSet{active: make(map[string]struct{})}
return &conversationLockSet{}
}

// tryAcquire returns true when id was not already in flight. The caller
Expand All @@ -27,13 +26,8 @@ func (l *conversationLockSet) tryAcquire(id string) bool {
if l == nil || id == "" {
return true
}
l.mu.Lock()
defer l.mu.Unlock()
if _, ok := l.active[id]; ok {
return false
}
l.active[id] = struct{}{}
return true
_, loaded := l.active.LoadOrStore(id, struct{}{})
return !loaded
}

// release marks id as no longer in flight. Safe to call when id is the
Expand All @@ -42,7 +36,5 @@ func (l *conversationLockSet) release(id string) {
if l == nil || id == "" {
return
}
l.mu.Lock()
delete(l.active, id)
l.mu.Unlock()
l.active.Delete(id)
}
16 changes: 16 additions & 0 deletions pkg/concurrent/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package concurrent

import (
"bytes"
"slices"
"sync"
)

Expand All @@ -27,6 +28,21 @@ func (b *Buffer) String() string {
return b.buf.String()
}

// Bytes returns a copy of the buffered content as a byte slice.
// The returned slice is safe to retain and modify.
func (b *Buffer) Bytes() []byte {
b.mu.Lock()
defer b.mu.Unlock()
return slices.Clone(b.buf.Bytes())
}

// Len returns the number of bytes currently buffered.
func (b *Buffer) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Len()
}

// Reset clears the buffer.
func (b *Buffer) Reset() {
b.mu.Lock()
Expand Down
90 changes: 90 additions & 0 deletions pkg/concurrent/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package concurrent

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBuffer_Write(t *testing.T) {
var b Buffer

n, err := b.Write([]byte("hello"))
require.NoError(t, err)
assert.Equal(t, 5, n)

n, err = b.Write([]byte(" world"))
require.NoError(t, err)
assert.Equal(t, 6, n)

assert.Equal(t, "hello world", b.String())
}

func TestBuffer_Bytes(t *testing.T) {
var b Buffer
_, _ = b.Write([]byte("hello"))

got := b.Bytes()
assert.Equal(t, []byte("hello"), got)

// Mutating the returned slice must not affect the buffer.
got[0] = 'H'
assert.Equal(t, "hello", b.String())
}

func TestBuffer_Len(t *testing.T) {
var b Buffer
assert.Equal(t, 0, b.Len())

_, _ = b.Write([]byte("abc"))
assert.Equal(t, 3, b.Len())

_, _ = b.Write([]byte("de"))
assert.Equal(t, 5, b.Len())
}

func TestBuffer_Reset(t *testing.T) {
var b Buffer
_, _ = b.Write([]byte("hello"))

b.Reset()
assert.Equal(t, 0, b.Len())
assert.Empty(t, b.String())
}

func TestBuffer_Drain(t *testing.T) {
var b Buffer
_, _ = b.Write([]byte("hello"))

got := b.Drain()
assert.Equal(t, "hello", got)
assert.Equal(t, 0, b.Len())
assert.Empty(t, b.String())
}

func TestBuffer_Concurrent(t *testing.T) {
var b Buffer
var wg sync.WaitGroup

const writers = 100
for i := range writers {
wg.Go(func() {
_, _ = b.Write(fmt.Appendf(nil, "%03d", i))
})
}

// Concurrent readers should not race with writers.
for range 50 {
wg.Go(func() {
_ = b.String()
_ = b.Len()
_ = b.Bytes()
})
}

wg.Wait()
assert.Equal(t, writers*3, b.Len())
}
55 changes: 52 additions & 3 deletions pkg/concurrent/map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package concurrent

import "sync"
import (
"maps"
"sync"
)

type Map[K comparable, V any] struct {
mu sync.RWMutex
Expand All @@ -25,6 +28,9 @@ func (m *Map[K, V]) Store(key K, value V) {
m.mu.Lock()
defer m.mu.Unlock()

if m.values == nil {
m.values = make(map[K]V)
}
m.values[key] = value
}

Expand All @@ -42,11 +48,54 @@ func (m *Map[K, V]) Length() int {
return len(m.values)
}

// LoadOrStore returns the existing value for key if present; otherwise it
// stores and returns value. The loaded result is true if the value was
// loaded, false if stored.
func (m *Map[K, V]) LoadOrStore(key K, value V) (V, bool) {
m.mu.RLock()
if existing, ok := m.values[key]; ok {
m.mu.RUnlock()
return existing, true
}
m.mu.RUnlock()

m.mu.Lock()
defer m.mu.Unlock()

// Re-check under the write lock: another goroutine may have stored
// the key between releasing the read lock and acquiring the write lock.
if existing, ok := m.values[key]; ok {
return existing, true
}
if m.values == nil {
m.values = make(map[K]V)
}
m.values[key] = value
return value, false
}

// Clear removes all entries from the map.
func (m *Map[K, V]) Clear() {
m.mu.Lock()
defer m.mu.Unlock()

m.values = make(map[K]V)
}

// Range calls f for every key/value pair in the map. Iteration stops early if
// f returns false.
//
// Range iterates over a snapshot of the map taken under a read lock; f is
// invoked without holding any lock, which means callbacks may safely call
// other methods on the same Map (including Store and Delete) without
// deadlocking. As a consequence, mutations performed during iteration are not
// reflected in the values seen by f.
func (m *Map[K, V]) Range(f func(key K, value V) bool) {
m.mu.RLock()
defer m.mu.RUnlock()
snapshot := maps.Clone(m.values)
m.mu.RUnlock()

for k, v := range m.values {
for k, v := range snapshot {
if !f(k, v) {
break
}
Expand Down
Loading
Loading