diff --git a/replication.go b/replication.go index a44f618..5fdb9b2 100644 --- a/replication.go +++ b/replication.go @@ -175,6 +175,9 @@ func (s *Server) snapshotJSON() []byte { } } + // Replication term for fencing (PILOT-328). + snap.Term = s.term + // Enterprise config — pointers; not mutated post-init in practice but we // still snapshot the pointer under RLock and Marshal will read whatever // they point to. Acceptable: these are config blobs whose mutation @@ -239,6 +242,16 @@ func (s *Server) applySnapshot(data []byte) error { return fmt.Errorf("unmarshal: %w", err) } + // PILOT-328: reject snapshots from a stale-term primary. + s.mu.RLock() + currentTerm := s.term + s.mu.RUnlock() + if snap.Term < currentTerm { + slog.Warn("replication: rejecting snapshot from stale primary", + "snapshot_term", snap.Term, "current_term", currentTerm) + return nil // not an error — we just ignore the stale push + } + // --- Phase 1: build all the new maps OUTSIDE any lock --- newNodes := make(map[uint32]*NodeInfo, len(snap.Nodes)) @@ -417,6 +430,7 @@ func (s *Server) applySnapshot(data []byte) error { s.inviteInbox = newInviteInbox s.nextNode = snap.NextNode s.nextNet = snap.NextNet + s.term = snap.Term // PILOT-328: track replication epoch if newRBACPreAssign != nil { s.rbacPreAssign = newRBACPreAssign } diff --git a/server.go b/server.go index 519a1d5..06bbbab 100644 --- a/server.go +++ b/server.go @@ -139,6 +139,10 @@ type Server struct { // Extracted to pkg/registry/server/replication (R7.1). replMgr *replpkg.Manager + // term is the monotonic replication epoch, incremented on primary promotion. + // Standbys reject snapshots from a primary whose term is stale. + term uint64 + // authz holds the authorization checker (admin/dashboard tokens, role gates, // enterprise gates, signature verification). Extracted to pkg/registry/server/authz (R3.1). authz *authzpkg.Checker diff --git a/server_api.go b/server_api.go index cd8827b..57689f5 100644 --- a/server_api.go +++ b/server_api.go @@ -27,6 +27,26 @@ func (s *Server) Reap() { s.reapStaleBeacons() } +// PromoteToPrimary increments the replication term and persists it, fencing +// off a stale former primary. Safe to call on any server that should become +// the sole write master. (PILOT-328) +func (s *Server) PromoteToPrimary() error { + s.mu.Lock() + s.term++ + newTerm := s.term + s.mu.Unlock() + slog.Warn("replication: server promoted to primary", "term", newTerm) + return s.flushSave() +} + +// Term returns the current replication epoch. +func (s *Server) Term() uint64 { + s.mu.RLock() + t := s.term + s.mu.RUnlock() + return t +} + // ── Dispatcher interface implementation ─────────────────────────────────────── // // These methods satisfy accept.Dispatcher so that Acceptor can delegate all diff --git a/server_persist.go b/server_persist.go index b8ad324..01c0417 100644 --- a/server_persist.go +++ b/server_persist.go @@ -509,6 +509,7 @@ func (s *Server) load() error { s.nextNode = snap.NextNode s.nextNet = snap.NextNet + s.term = snap.Term // PILOT-328: restore replication epoch // Restore dashboard stats if snap.TotalRequests > 0 { diff --git a/wal/snapshot.go b/wal/snapshot.go index ebbe3df..9f823c8 100644 --- a/wal/snapshot.go +++ b/wal/snapshot.go @@ -102,6 +102,10 @@ type Snapshot struct { IDPConfig *wire.BlueprintIdentityProvider `json:"idp_config,omitempty"` AuditExportCfg *wire.BlueprintAuditExport `json:"audit_export_config,omitempty"` RBACPreAssign map[string][]wire.BlueprintRole `json:"rbac_pre_assign,omitempty"` // networkID -> roles + // Term is the monotonic replication epoch, incremented on primary promotion. + // Standbys reject snapshots from a primary whose term is stale. + Term uint64 `json:"term,omitempty"` + // Integrity: SHA256 hex digest of all fields except Checksum Checksum string `json:"checksum,omitempty"` }