Skip to content

Commit 8f94ebc

Browse files
Mbd06bclaude
andcommitted
refactor(storage): remove fire-and-forget auto-publish from content POST
The drain loop is now the sole publisher of EPR Heads to Kademlia. The previous tokio::spawn auto-publish bypassed the peer gate and silently 'succeeded' into empty Kad on writes before peers connected — creating phantom published state. It also didn't set p2p_published_at, which created a visibility inconsistency after Phase A: new content was DHT-published but stayed gated from external reads until the drain tick. After this change: - POST /db/content and POST /db/content/bulk only write to SQLite; DHT publish happens via the next drain_interval tick (within ~15s). - Content is visible to external reads once the drain has marked it. - The seeder should poll /p2p/publish-state (Task E1) until pending=0 to confirm drain completion before treating seeding as done. P2PHandle::publish_epr_head and P2PCommand::PublishEprHead are now unused but retained with #[allow(dead_code)] as part of the P2PHandle abstraction for potential future use. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b07686c commit 8f94ebc

File tree

2 files changed

+19
-120
lines changed

2 files changed

+19
-120
lines changed

elohim/elohim-storage/src/http.rs

Lines changed: 7 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -2174,23 +2174,6 @@ impl HttpServer {
21742174
let input_view: CreateContentInputView = serde_json::from_slice(&body_bytes)
21752175
.map_err(|e| StorageError::Parse(format!("Invalid JSON: {}", e)))?;
21762176

2177-
// Capture EPR-relevant data before consuming input_view
2178-
#[cfg(feature = "p2p")]
2179-
let epr_data = (
2180-
input_view.id.clone(),
2181-
input_view.title.clone(),
2182-
input_view
2183-
.content_type
2184-
.clone()
2185-
.unwrap_or_else(|| "concept".to_string()),
2186-
input_view.description.clone(),
2187-
input_view.content_format.clone(),
2188-
input_view.blob_cid.clone(),
2189-
input_view.reach.clone(),
2190-
input_view.created_by.clone(),
2191-
input_view.tags.clone(),
2192-
);
2193-
21942177
// Capture manifest-relevant data before consuming input_view
21952178
let manifest_data = (
21962179
input_view.id.clone(),
@@ -2206,44 +2189,9 @@ impl HttpServer {
22062189
let input: db::content_diesel::CreateContentInput = input_view.into();
22072190
let result = services.content.create(input);
22082191

2209-
// Auto-publish EPR Head on successful create
2210-
#[cfg(feature = "p2p")]
2211-
if result.is_ok() {
2212-
if let Some(ref handle) = self.p2p_handle {
2213-
let handle = handle.clone();
2214-
let (id, title, content_type, desc, fmt, cid, reach, author, tags) =
2215-
epr_data;
2216-
tokio::spawn(async move {
2217-
let head = crate::epr_codec::EprHead {
2218-
version: 1,
2219-
id: id.clone(),
2220-
content: cid.unwrap_or_default(),
2221-
lamad: crate::epr_codec::EprLamadContext {
2222-
title,
2223-
content_type,
2224-
description: desc,
2225-
content_format: fmt,
2226-
tags,
2227-
},
2228-
shefa: crate::epr_codec::EprShefaContext {
2229-
stewards: vec![],
2230-
allocations: vec![],
2231-
},
2232-
qahal: crate::epr_codec::EprQahalContext {
2233-
reach,
2234-
layer: None,
2235-
attestation_requirements: vec![],
2236-
},
2237-
relationships: vec![],
2238-
author,
2239-
updated: Some(chrono::Utc::now().to_rfc3339()),
2240-
};
2241-
if let Ok(bytes) = rmp_serde::to_vec(&head) {
2242-
handle.publish_epr_head(id, bytes).await;
2243-
}
2244-
});
2245-
}
2246-
}
2192+
// EPR Head publishing is handled by the drain loop (p2p/mod.rs::drain_publish_queue),
2193+
// which is the sole writer of p2p_published_at. New content becomes visible to
2194+
// external reads within ~drain_interval (default 15s).
22472195

22482196
// Capture distribution data before manifest recording consumes manifest_data
22492197
#[cfg(feature = "p2p")]
@@ -2377,27 +2325,6 @@ impl HttpServer {
23772325
return Ok(response::error_response(StorageError::InvalidInput(msg)));
23782326
}
23792327

2380-
// Capture EPR-relevant data before input_views are consumed
2381-
#[cfg(feature = "p2p")]
2382-
let epr_inputs: Vec<_> = input_views
2383-
.iter()
2384-
.map(|v| {
2385-
(
2386-
v.id.clone(),
2387-
v.title.clone(),
2388-
v.content_type
2389-
.clone()
2390-
.unwrap_or_else(|| "concept".to_string()),
2391-
v.description.clone(),
2392-
v.content_format.clone(),
2393-
v.blob_cid.clone(),
2394-
v.reach.clone(),
2395-
v.created_by.clone(),
2396-
v.tags.clone(),
2397-
)
2398-
})
2399-
.collect();
2400-
24012328
// Capture manifest-relevant data before input_views are consumed
24022329
let manifest_inputs: Vec<_> = input_views
24032330
.iter()
@@ -2420,49 +2347,10 @@ impl HttpServer {
24202347

24212348
match services.content.bulk_create(items) {
24222349
Ok(result) => {
2423-
// Auto-publish EPR Heads to DHT for cross-peer discovery
2424-
#[cfg(feature = "p2p")]
2425-
if let Some(ref handle) = self.p2p_handle {
2426-
let handle = handle.clone();
2427-
let inserted = result.inserted;
2428-
if inserted > 0 {
2429-
let epr_data = epr_inputs;
2430-
tokio::spawn(async move {
2431-
for (id, title, content_type, desc, fmt, cid, reach, author, tags) in
2432-
epr_data
2433-
{
2434-
let head = crate::epr_codec::EprHead {
2435-
version: 1,
2436-
id: id.clone(),
2437-
content: cid.unwrap_or_default(),
2438-
lamad: crate::epr_codec::EprLamadContext {
2439-
title,
2440-
content_type,
2441-
description: desc,
2442-
content_format: fmt,
2443-
tags,
2444-
},
2445-
shefa: crate::epr_codec::EprShefaContext {
2446-
stewards: vec![],
2447-
allocations: vec![],
2448-
},
2449-
qahal: crate::epr_codec::EprQahalContext {
2450-
reach,
2451-
layer: None,
2452-
attestation_requirements: vec![],
2453-
},
2454-
relationships: vec![],
2455-
author,
2456-
updated: Some(chrono::Utc::now().to_rfc3339()),
2457-
};
2458-
if let Ok(bytes) = rmp_serde::to_vec(&head) {
2459-
handle.publish_epr_head(id, bytes).await;
2460-
}
2461-
}
2462-
info!(count = inserted, "Published EPR Heads to DHT");
2463-
});
2464-
}
2465-
}
2350+
// EPR Head publishing is handled by the drain loop
2351+
// (p2p/mod.rs::drain_publish_queue), which is the sole writer of
2352+
// p2p_published_at. New content becomes visible to external reads
2353+
// within ~drain_interval (default 15s).
24662354

24672355
// Capture distribution data before manifest_inputs is consumed
24682356
#[cfg(feature = "p2p")]

elohim/elohim-storage/src/p2p/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,12 @@ pub struct P2PStatusInfo {
238238

239239
/// Commands sent from HTTP handlers to the P2P event loop.
240240
pub enum P2PCommand {
241-
/// Publish an EPR Head to Kademlia DHT
241+
/// Publish an EPR Head to Kademlia DHT.
242+
///
243+
/// Currently unused: the drain loop (`drain_publish_queue`) is the sole
244+
/// publisher of EPR Heads and calls `put_record` directly on the swarm.
245+
/// Retained as part of the P2PHandle abstraction for future use.
246+
#[allow(dead_code)]
242247
PublishEprHead { id: String, head_bytes: Vec<u8> },
243248
/// Resolve an EPR Head via Kademlia DHT lookup
244249
ResolveEpr {
@@ -287,6 +292,12 @@ impl P2PHandle {
287292
}
288293

289294
/// Publish an EPR Head to the DHT. Fire-and-forget.
295+
///
296+
/// Currently unused: the drain loop (`drain_publish_queue`) is the sole
297+
/// publisher of EPR Heads to Kademlia and calls `put_record` directly,
298+
/// also setting `p2p_published_at` via `mark_published`. This method is
299+
/// retained as part of the P2PHandle abstraction for future use.
300+
#[allow(dead_code)]
290301
pub async fn publish_epr_head(&self, id: String, head_bytes: Vec<u8>) {
291302
if let Err(e) = self
292303
.command_tx

0 commit comments

Comments
 (0)