From 7a21ae49b9b6bb2251d9b66ed0150117afde02b4 Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Sat, 30 May 2026 18:15:43 +0000 Subject: [PATCH] fix: persist audit-export entries to disk WAL before channel send (PILOT-302) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add AuditWAL — a simple JSON-lines write-ahead log that persists audit entries before they enter the export channel. On clean shutdown the WAL is truncated after drain; on crash restart all entries are replayed for re-export. Also log a warning (slog.Warn) when the export channel is full and an entry is dropped — entries were previously dropped silently with only an atomic counter increment. Changes: - audit/audit_wal.go: new AuditWAL type (append, replay, truncate) - audit/audit_export.go: integrate WAL into Export/Close, log drops - audit/audit.go: add SetStorePath, derive WAL path from storePath - server_lifecycle.go: wire auditStore.SetStorePath at init The WAL path is {storePath}.audit-export-wal (empty storePath = no persistence, preserving backward compat). --- audit/audit.go | 21 ++++++++- audit/audit_export.go | 69 +++++++++++++++++++++++++-- audit/audit_wal.go | 105 ++++++++++++++++++++++++++++++++++++++++++ server_lifecycle.go | 1 + 4 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 audit/audit_wal.go diff --git a/audit/audit.go b/audit/audit.go index eeba718..731e1cd 100644 --- a/audit/audit.go +++ b/audit/audit.go @@ -42,6 +42,10 @@ type Store struct { exporter *AuditExporter cfg *wire.BlueprintAuditExport + // storePath is the path to the registry snapshot file (e.g. /data/registry.json). + // When set, the audit exporter derives its WAL path from it. + storePath string + // unsub stops the bus subscriber goroutine started by Subscribe. unsub func() } @@ -150,6 +154,8 @@ func (st *Store) RestoreLog(entries []Entry) { // SetExporter replaces the current exporter with a new one built from cfg. // The old exporter (if any) is drained and closed. Pass nil cfg to disable. +// If storePath was previously set via SetStorePath, the new exporter gets +// a write-ahead log at "{storePath}.audit-export-wal". func (st *Store) SetExporter(cfg *wire.BlueprintAuditExport) { st.mu.Lock() old := st.exporter @@ -157,7 +163,11 @@ func (st *Store) SetExporter(cfg *wire.BlueprintAuditExport) { st.exporter = nil st.cfg = nil } else { - st.exporter = newAuditExporter(cfg) + walPath := "" + if st.storePath != "" { + walPath = st.storePath + ".audit-export-wal" + } + st.exporter = newAuditExporter(cfg, walPath) st.cfg = cfg } st.mu.Unlock() @@ -167,6 +177,15 @@ func (st *Store) SetExporter(cfg *wire.BlueprintAuditExport) { } } +// SetStorePath records the registry snapshot file path so the audit +// exporter can derive its WAL path ({storePath}.audit-export-wal). +// Call once during server init, before SetExporter. +func (st *Store) SetStorePath(p string) { + st.mu.Lock() + st.storePath = p + st.mu.Unlock() +} + // ExporterConfig returns the active export configuration (nil = disabled). func (st *Store) ExporterConfig() *wire.BlueprintAuditExport { st.mu.Lock() diff --git a/audit/audit_export.go b/audit/audit_export.go index 076cc15..fd42336 100644 --- a/audit/audit_export.go +++ b/audit/audit_export.go @@ -28,28 +28,65 @@ type AuditExporter struct { closed chan struct{} exported atomic.Uint64 dropped atomic.Uint64 + wal *AuditWAL } // NewAuditExporter creates and starts a new AuditExporter for the given config. // It is exported so that the server package shim (audit_export.go) can // delegate to it without the sub-package re-implementing the constructor. func NewAuditExporter(cfg *wire.BlueprintAuditExport) *AuditExporter { - return newAuditExporter(cfg) + return newAuditExporter(cfg, "") } -func newAuditExporter(cfg *wire.BlueprintAuditExport) *AuditExporter { +// NewAuditExporterWithWAL creates and starts a new AuditExporter with an +// on-disk write-ahead log at walPath. Use empty walPath to disable the WAL. +func NewAuditExporterWithWAL(cfg *wire.BlueprintAuditExport, walPath string) *AuditExporter { + return newAuditExporter(cfg, walPath) +} + +func newAuditExporter(cfg *wire.BlueprintAuditExport, walPath string) *AuditExporter { + w, err := NewAuditWAL(walPath) + if err != nil { + slog.Warn("audit exporter: failed to open WAL, continuing without persistence", "path", walPath, "error", err) + } + ae := &AuditExporter{ config: cfg, ch: make(chan *Entry, 1024), client: &http.Client{Timeout: 10 * time.Second}, done: make(chan struct{}), closed: make(chan struct{}), + wal: w, } + + // Replay any WAL entries from a previous crash into the channel. + // Non-blocking: if the channel fills, remaining entries are dropped + // (they will be replayed again on next restart until exported). + if w != nil { + pending, err := w.Pending() + if err != nil { + slog.Error("audit exporter: WAL replay failed", "error", err) + } + for i := range pending { + entry := pending[i] // capture + select { + case ae.ch <- &entry: + default: + slog.Warn("audit exporter: dropping replayed WAL entry (channel full)", "action", entry.Action) + } + } + if len(pending) > 0 { + slog.Info("audit exporter: replayed WAL entries", "count", len(pending)) + } + } + go ae.run() return ae } -// Export queues an audit entry for export. Non-blocking; drops if buffer full. +// Export queues an audit entry for export. The entry is persisted to the +// write-ahead log before entering the channel. Non-blocking; drops if the +// channel buffer is full, but the WAL copy survives a crash restart. func (ae *AuditExporter) Export(entry *Entry) { if ae == nil { return @@ -59,15 +96,30 @@ func (ae *AuditExporter) Export(entry *Entry) { return default: } + + // Persist to WAL before attempting channel send. On a crash restart, + // WAL entries are replayed into the channel so no event is lost. + if ae.wal != nil { + if err := ae.wal.Append(entry); err != nil { + slog.Error("audit exporter: WAL append failed", "action", entry.Action, "error", err) + } + } + select { case ae.ch <- entry: case <-ae.closed: default: ae.dropped.Add(1) + slog.Warn("audit exporter: dropping entry (channel full)", + "action", entry.Action, + "dropped_total", ae.dropped.Load(), + ) } } // Close signals the background goroutine to stop and waits for it to drain. +// After a clean drain, the WAL is truncated — all pending entries have been +// sent to the external system. func (ae *AuditExporter) Close() { if ae == nil { return @@ -84,6 +136,17 @@ func (ae *AuditExporter) Close() { case <-time.After(5 * time.Second): slog.Warn("audit exporter drain timeout") } + + // Truncate WAL after clean drain. On crash, the WAL is preserved and + // replayed on next startup. + if ae.wal != nil { + if err := ae.wal.Truncate(); err != nil { + slog.Error("audit exporter: WAL truncate failed", "error", err) + } + if err := ae.wal.Close(); err != nil { + slog.Error("audit exporter: WAL close failed", "error", err) + } + } } func (ae *AuditExporter) run() { diff --git a/audit/audit_wal.go b/audit/audit_wal.go new file mode 100644 index 0000000..d6ca71e --- /dev/null +++ b/audit/audit_wal.go @@ -0,0 +1,105 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package audit + +import ( + "encoding/json" + "fmt" + "io" + "os" + "sync" +) + +// AuditWAL persists audit entries to disk in JSON-lines format before they +// enter the export channel. On clean shutdown the WAL is truncated; on crash +// restart all entries are replayed for re-export. +type AuditWAL struct { + mu sync.Mutex + f *os.File + path string +} + +// NewAuditWAL opens or creates the WAL file at path. Returns nil when path +// is empty (no persistence configured). +func NewAuditWAL(path string) (*AuditWAL, error) { + if path == "" { + return nil, nil + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0600) + if err != nil { + return nil, fmt.Errorf("open audit WAL: %w", err) + } + return &AuditWAL{f: f, path: path}, nil +} + +// Append writes an audit entry as a JSON line to the WAL and fsyncs. +func (w *AuditWAL) Append(entry *Entry) error { + if w == nil { + return nil + } + data, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("marshal audit WAL: %w", err) + } + w.mu.Lock() + defer w.mu.Unlock() + line := append(data, '\n') + if _, err := w.f.Write(line); err != nil { + return fmt.Errorf("write audit WAL: %w", err) + } + return w.f.Sync() +} + +// Pending reads all entries from the WAL for replay, oldest first. +func (w *AuditWAL) Pending() ([]Entry, error) { + if w == nil { + return nil, nil + } + w.mu.Lock() + defer w.mu.Unlock() + if _, err := w.f.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("seek audit WAL start: %w", err) + } + var entries []Entry + dec := json.NewDecoder(w.f) + for { + var e Entry + if err := dec.Decode(&e); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + return entries, fmt.Errorf("decode audit WAL entry: %w", err) + } + entries = append(entries, e) + } + if _, err := w.f.Seek(0, io.SeekEnd); err != nil { + return entries, fmt.Errorf("seek audit WAL end: %w", err) + } + return entries, nil +} + +// Truncate clears the WAL. Called after clean drain on shutdown. +func (w *AuditWAL) Truncate() error { + if w == nil { + return nil + } + w.mu.Lock() + defer w.mu.Unlock() + if err := w.f.Truncate(0); err != nil { + return fmt.Errorf("truncate audit WAL: %w", err) + } + if _, err := w.f.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("seek audit WAL after truncate: %w", err) + } + return w.f.Sync() +} + +// Close closes the underlying file. +func (w *AuditWAL) Close() error { + if w == nil { + return nil + } + w.mu.Lock() + defer w.mu.Unlock() + return w.f.Close() +} diff --git a/server_lifecycle.go b/server_lifecycle.go index 918ae09..0cef0a9 100644 --- a/server_lifecycle.go +++ b/server_lifecycle.go @@ -320,6 +320,7 @@ func NewWithStore(beaconAddr, storePath string) *Server { s.webhook = webhookpkg.NewStore() // R1.3: webhook sub-package s.webhook.Subscribe(s.bus) // fan-out audit.entry events → webhook HTTP POST s.auditStore = auditpkg.NewStore() // R1.2: audit sub-package + s.auditStore.SetStorePath(s.storePath) // enable audit-export WAL when persistence is configured s.auditStore.Subscribe(s.bus) // async ring-buffer + exporter fan-out s.trust = trustpkg.NewStore(s, trustpkg.Callbacks{ // R2.1: trust sub-package Save: s.save,