Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ package webhook

import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"log/slog"
"net/http"
Expand Down Expand Up @@ -45,6 +48,7 @@ type dispatcher struct {
failed atomic.Uint64
delivered atomic.Uint64
initialBackoff time.Duration
secret string // HMAC-SHA256 pre-shared secret (empty = no sig)

// Dead letter queue: stores last N failed events for retry/inspection
dlqMu sync.Mutex
Expand Down Expand Up @@ -141,14 +145,32 @@ func (d *dispatcher) post(ev *Event) {
return
}

// HMAC-SHA256 signature: if a secret is configured, sign the body
// so the receiver can verify authenticity+integrity (PILOT-239).
var sigHeader string
if d.secret != "" {
mac := hmac.New(sha256.New, []byte(d.secret))
mac.Write(body)
sigHeader = hex.EncodeToString(mac.Sum(nil))
}

backoff := d.initialBackoff
for attempt := 0; attempt < MaxRetries; attempt++ {
if attempt > 0 {
time.Sleep(backoff)
backoff *= 2
}

resp, err := d.client.Post(d.url, "application/json", bytes.NewReader(body))
req, err := http.NewRequest(http.MethodPost, d.url, bytes.NewReader(body))
if err != nil {
slog.Warn("registry webhook POST request build failed", "action", ev.Action, "error", err)
continue
}
req.Header.Set("Content-Type", "application/json")
if sigHeader != "" {
req.Header.Set("X-Pilot-Signature-256", sigHeader)
}
resp, err := d.client.Do(req)
if err != nil {
slog.Warn("registry webhook POST failed", "action", ev.Action, "attempt", attempt+1, "error", err)
continue
Expand Down Expand Up @@ -240,6 +262,18 @@ func (st *Store) SetURL(url string) {
}
}

// SetSecret sets the HMAC-SHA256 pre-shared secret for outbound webhook
// signatures. When non-empty, every outbound POST includes an
// X-Pilot-Signature-256 header with the hex-encoded HMAC-SHA256 of the
// request body (PILOT-239). No-op when no dispatcher is active.
func (st *Store) SetSecret(secret string) {
st.mu.Lock()
defer st.mu.Unlock()
if st.disp != nil {
st.disp.secret = secret
}
}

// SetInitialBackoff sets the retry backoff. Tests set a short value to avoid
// waiting on retry exhaustion. No-op when no dispatcher is active.
func (st *Store) SetInitialBackoff(d time.Duration) {
Expand Down
97 changes: 97 additions & 0 deletions webhook/zz_more_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
package webhook

import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"io"
"net/http"
"net/http/httptest"
"sync"
Expand Down Expand Up @@ -150,6 +154,99 @@ func TestDispatcher_EmitAfterCloseIsNoop(t *testing.T) {
}

// TestDispatcher_RaceyCloseEmit ensures concurrent Emit+Close doesn't panic.
// TestStore_SetSecret_SignsWebhook verifies that when a secret is set,
// outbound webhook POSTs carry an X-Pilot-Signature-256 header with the
// hex-encoded HMAC-SHA256 of the request body (PILOT-239).
func TestStore_SetSecret_SignsWebhook(t *testing.T) {
t.Parallel()
secret := "test-secret-pilot"
var (
mu sync.Mutex
sigHeader string
bodyBytes []byte
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
sigHeader = r.Header.Get("X-Pilot-Signature-256")
bodyBytes, _ = io.ReadAll(r.Body)
mu.Unlock()
w.WriteHeader(200)
}))
defer srv.Close()

st := NewStore()
defer st.Close()
st.SetInitialBackoff(time.Millisecond)
st.SetURL(srv.URL)
st.SetSecret(secret)

st.Emit("test.secret", map[string]interface{}{"k": "v"})

getSig := func() string { mu.Lock(); defer mu.Unlock(); return sigHeader }
getBody := func() []byte { mu.Lock(); defer mu.Unlock(); return bodyBytes }

deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if getSig() != "" {
break
}
time.Sleep(10 * time.Millisecond)
}
gotSig := getSig()
if gotSig == "" {
t.Fatal("X-Pilot-Signature-256 header not set when secret is configured")
}

// Verify the HMAC ourselves.
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(getBody())
expected := hex.EncodeToString(mac.Sum(nil))
if gotSig != expected {
t.Fatalf("HMAC mismatch: got %s, want %s", gotSig, expected)
}
}

// TestStore_SetSecret_NoSignatureWhenNoSecret verifies that no signature
// header is added when the secret is empty (backward-compatible).
func TestStore_SetSecret_NoSignatureWhenNoSecret(t *testing.T) {
t.Parallel()
var (
mu sync.Mutex
sigHeader string
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
sigHeader = r.Header.Get("X-Pilot-Signature-256")
mu.Unlock()
w.WriteHeader(200)
}))
defer srv.Close()

st := NewStore()
defer st.Close()
st.SetInitialBackoff(time.Millisecond)
st.SetURL(srv.URL)
// Do NOT call SetSecret — default is empty.

st.Emit("test.nosecret", map[string]interface{}{"k": "v"})

deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
_, delivered, _ := st.disp.stats()
if delivered > 0 {
break
}
time.Sleep(10 * time.Millisecond)
}

mu.Lock()
got := sigHeader
mu.Unlock()
if got != "" {
t.Fatal("X-Pilot-Signature-256 should NOT be set when no secret configured")
}
}

func TestDispatcher_RaceyCloseEmit(t *testing.T) {
t.Parallel()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
Expand Down
Loading