Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Store struct {
exporter *AuditExporter
cfg *wire.BlueprintAuditExport

// storePath is the path to the registry snapshot file (e.g. /data/registry.json).
// When set, the audit exporter derives its WAL path from it.
storePath string

// unsub stops the bus subscriber goroutine started by Subscribe.
unsub func()
}
Expand Down Expand Up @@ -150,14 +154,20 @@ func (st *Store) RestoreLog(entries []Entry) {

// SetExporter replaces the current exporter with a new one built from cfg.
// The old exporter (if any) is drained and closed. Pass nil cfg to disable.
// If storePath was previously set via SetStorePath, the new exporter gets
// a write-ahead log at "{storePath}.audit-export-wal".
func (st *Store) SetExporter(cfg *wire.BlueprintAuditExport) {
st.mu.Lock()
old := st.exporter
if cfg == nil {
st.exporter = nil
st.cfg = nil
} else {
st.exporter = newAuditExporter(cfg)
walPath := ""
if st.storePath != "" {
walPath = st.storePath + ".audit-export-wal"
}
st.exporter = newAuditExporter(cfg, walPath)
st.cfg = cfg
}
st.mu.Unlock()
Expand All @@ -167,6 +177,15 @@ func (st *Store) SetExporter(cfg *wire.BlueprintAuditExport) {
}
}

// SetStorePath records the registry snapshot file path so the audit
// exporter can derive its WAL path ({storePath}.audit-export-wal).
// Call once during server init, before SetExporter.
func (st *Store) SetStorePath(p string) {
st.mu.Lock()
st.storePath = p
st.mu.Unlock()
}

// ExporterConfig returns the active export configuration (nil = disabled).
func (st *Store) ExporterConfig() *wire.BlueprintAuditExport {
st.mu.Lock()
Expand Down
69 changes: 66 additions & 3 deletions audit/audit_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,65 @@ type AuditExporter struct {
closed chan struct{}
exported atomic.Uint64
dropped atomic.Uint64
wal *AuditWAL
}

// NewAuditExporter creates and starts a new AuditExporter for the given config.
// It is exported so that the server package shim (audit_export.go) can
// delegate to it without the sub-package re-implementing the constructor.
func NewAuditExporter(cfg *wire.BlueprintAuditExport) *AuditExporter {
return newAuditExporter(cfg)
return newAuditExporter(cfg, "")
}

func newAuditExporter(cfg *wire.BlueprintAuditExport) *AuditExporter {
// NewAuditExporterWithWAL creates and starts a new AuditExporter with an
// on-disk write-ahead log at walPath. Use empty walPath to disable the WAL.
func NewAuditExporterWithWAL(cfg *wire.BlueprintAuditExport, walPath string) *AuditExporter {
return newAuditExporter(cfg, walPath)
}

func newAuditExporter(cfg *wire.BlueprintAuditExport, walPath string) *AuditExporter {
w, err := NewAuditWAL(walPath)
if err != nil {
slog.Warn("audit exporter: failed to open WAL, continuing without persistence", "path", walPath, "error", err)
}

ae := &AuditExporter{
config: cfg,
ch: make(chan *Entry, 1024),
client: &http.Client{Timeout: 10 * time.Second},
done: make(chan struct{}),
closed: make(chan struct{}),
wal: w,
}

// Replay any WAL entries from a previous crash into the channel.
// Non-blocking: if the channel fills, remaining entries are dropped
// (they will be replayed again on next restart until exported).
if w != nil {
pending, err := w.Pending()
if err != nil {
slog.Error("audit exporter: WAL replay failed", "error", err)
}
for i := range pending {
entry := pending[i] // capture
select {
case ae.ch <- &entry:
default:
slog.Warn("audit exporter: dropping replayed WAL entry (channel full)", "action", entry.Action)
}
}
if len(pending) > 0 {
slog.Info("audit exporter: replayed WAL entries", "count", len(pending))
}
}

go ae.run()
return ae
}

// Export queues an audit entry for export. Non-blocking; drops if buffer full.
// Export queues an audit entry for export. The entry is persisted to the
// write-ahead log before entering the channel. Non-blocking; drops if the
// channel buffer is full, but the WAL copy survives a crash restart.
func (ae *AuditExporter) Export(entry *Entry) {
if ae == nil {
return
Expand All @@ -59,15 +96,30 @@ func (ae *AuditExporter) Export(entry *Entry) {
return
default:
}

// Persist to WAL before attempting channel send. On a crash restart,
// WAL entries are replayed into the channel so no event is lost.
if ae.wal != nil {
if err := ae.wal.Append(entry); err != nil {
slog.Error("audit exporter: WAL append failed", "action", entry.Action, "error", err)
}
}

select {
case ae.ch <- entry:
case <-ae.closed:
default:
ae.dropped.Add(1)
slog.Warn("audit exporter: dropping entry (channel full)",
"action", entry.Action,
"dropped_total", ae.dropped.Load(),
)
}
}

// Close signals the background goroutine to stop and waits for it to drain.
// After a clean drain, the WAL is truncated — all pending entries have been
// sent to the external system.
func (ae *AuditExporter) Close() {
if ae == nil {
return
Expand All @@ -84,6 +136,17 @@ func (ae *AuditExporter) Close() {
case <-time.After(5 * time.Second):
slog.Warn("audit exporter drain timeout")
}

// Truncate WAL after clean drain. On crash, the WAL is preserved and
// replayed on next startup.
if ae.wal != nil {
if err := ae.wal.Truncate(); err != nil {
slog.Error("audit exporter: WAL truncate failed", "error", err)
}
if err := ae.wal.Close(); err != nil {
slog.Error("audit exporter: WAL close failed", "error", err)
}
}
}

func (ae *AuditExporter) run() {
Expand Down
105 changes: 105 additions & 0 deletions audit/audit_wal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

package audit

import (
"encoding/json"
"fmt"
"io"
"os"
"sync"
)

// AuditWAL persists audit entries to disk in JSON-lines format before they
// enter the export channel. On clean shutdown the WAL is truncated; on crash
// restart all entries are replayed for re-export.
type AuditWAL struct {
mu sync.Mutex
f *os.File
path string
}

// NewAuditWAL opens or creates the WAL file at path. Returns nil when path
// is empty (no persistence configured).
func NewAuditWAL(path string) (*AuditWAL, error) {
if path == "" {
return nil, nil
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0600)
if err != nil {
return nil, fmt.Errorf("open audit WAL: %w", err)
}
return &AuditWAL{f: f, path: path}, nil
}

// Append writes an audit entry as a JSON line to the WAL and fsyncs.
func (w *AuditWAL) Append(entry *Entry) error {
if w == nil {
return nil
}
data, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("marshal audit WAL: %w", err)
}
w.mu.Lock()
defer w.mu.Unlock()
line := append(data, '\n')
if _, err := w.f.Write(line); err != nil {
return fmt.Errorf("write audit WAL: %w", err)
}
return w.f.Sync()
}

// Pending reads all entries from the WAL for replay, oldest first.
func (w *AuditWAL) Pending() ([]Entry, error) {
if w == nil {
return nil, nil
}
w.mu.Lock()
defer w.mu.Unlock()
if _, err := w.f.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("seek audit WAL start: %w", err)
}
var entries []Entry
dec := json.NewDecoder(w.f)
for {
var e Entry
if err := dec.Decode(&e); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return entries, fmt.Errorf("decode audit WAL entry: %w", err)
}
entries = append(entries, e)
}
if _, err := w.f.Seek(0, io.SeekEnd); err != nil {
return entries, fmt.Errorf("seek audit WAL end: %w", err)
}
return entries, nil
}

// Truncate clears the WAL. Called after clean drain on shutdown.
func (w *AuditWAL) Truncate() error {
if w == nil {
return nil
}
w.mu.Lock()
defer w.mu.Unlock()
if err := w.f.Truncate(0); err != nil {
return fmt.Errorf("truncate audit WAL: %w", err)
}
if _, err := w.f.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek audit WAL after truncate: %w", err)
}
return w.f.Sync()
}

// Close closes the underlying file.
func (w *AuditWAL) Close() error {
if w == nil {
return nil
}
w.mu.Lock()
defer w.mu.Unlock()
return w.f.Close()
}
1 change: 1 addition & 0 deletions server_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func NewWithStore(beaconAddr, storePath string) *Server {
s.webhook = webhookpkg.NewStore() // R1.3: webhook sub-package
s.webhook.Subscribe(s.bus) // fan-out audit.entry events → webhook HTTP POST
s.auditStore = auditpkg.NewStore() // R1.2: audit sub-package
s.auditStore.SetStorePath(s.storePath) // enable audit-export WAL when persistence is configured
s.auditStore.Subscribe(s.bus) // async ring-buffer + exporter fan-out
s.trust = trustpkg.NewStore(s, trustpkg.Callbacks{ // R2.1: trust sub-package
Save: s.save,
Expand Down
Loading