From 5b6150e3b8c8fe90e247dcf2ad2ed213f6b809e4 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 12 Jun 2026 21:22:31 +0800 Subject: [PATCH] fix: less locking in ExportStatus --- src/db/car/forest.rs | 23 +++++- src/ipld/util.rs | 76 +++++++++++++------ src/rpc/methods/chain.rs | 17 +++-- .../forest__rpc__tests__rpc__v0.snap | 8 ++ .../forest__rpc__tests__rpc__v1.snap | 8 ++ .../forest__rpc__tests__rpc__v2.snap | 8 ++ src/rpc/types/mod.rs | 2 + 7 files changed, 107 insertions(+), 35 deletions(-) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index adaa7f3cb99c..651044f2596a 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -66,6 +66,7 @@ use std::io::{self, Read, Seek, SeekFrom, Write}; use std::path::Path; use std::sync::OnceLock; use std::task::Poll; +use std::time::Duration; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{Decoder, Encoder as _}; @@ -312,6 +313,9 @@ impl Encoder { roots: NonEmpty, mut stream: impl Stream> + Unpin, ) -> anyhow::Result<()> { + // For troubleshooting stuck-ness issue + const ASYNC_OPS_TIMEOUT: Duration = Duration::from_mins(5); + let mut offset = 0; // Write CARv1 header @@ -334,9 +338,17 @@ impl Encoder { // Write seekable zstd and collect a mapping of CIDs to frame_offset+data_offset. let mut builder = index::Builder::new(); let mut n_frames = 0; - while let Some((cids, zstd_frame)) = stream.try_next().await? { + while let Some((cids, zstd_frame)) = + tokio::time::timeout(ASYNC_OPS_TIMEOUT, stream.try_next()) + .await + .with_context(|| { + format!("`stream.try_next` timed out, offset={offset}, n_frames={n_frames}") + })?? + { builder.extend(cids.into_iter().map(|cid| (cid, offset as u64))); - sink.write_all(&zstd_frame).await?; + tokio::time::timeout(ASYNC_OPS_TIMEOUT, sink.write_all(&zstd_frame)) + .await + .with_context(|| format!("`sink.write_all` timed out, offset={offset}, n_frames={n_frames}, zstd_frame_len={}", zstd_frame.len()))??; offset += zstd_frame.len(); n_frames += 1; } @@ -345,7 +357,12 @@ impl Encoder { // Create index let writer = builder.into_writer(); - writer.write_zstd_skip_frames_into(&mut sink).await?; + tokio::time::timeout( + ASYNC_OPS_TIMEOUT, + writer.write_zstd_skip_frames_into(&mut sink), + ) + .await + .context("`writer.write_zstd_skip_frames_into` timed out")??; tracing::info!("Finished writing zstd CAR index frames"); // Write ForestCAR.zst footer, it's a valid ZSTD skip-frame let footer = ForestCarFooter { diff --git a/src/ipld/util.rs b/src/ipld/util.rs index ebaa12a15a13..e9c9066dc737 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -14,52 +14,78 @@ use chrono::{DateTime, Utc}; use cid::Cid; use futures::Stream; use fvm_ipld_blockstore::Blockstore; -use parking_lot::Mutex; +use parking_lot::RwLock; use pin_project_lite::pin_project; use std::borrow::Borrow; use std::collections::VecDeque; use std::pin::Pin; -use std::sync::LazyLock; +use std::sync::atomic::{AtomicBool, AtomicI64}; +use std::sync::{LazyLock, atomic}; use std::task::{Context, Poll}; #[derive(Default)] pub struct ExportStatus { - pub epoch: i64, - pub initial_epoch: i64, - pub exporting: bool, - pub cancelled: bool, - pub start_time: Option>, + pub epoch: AtomicI64, + pub initial_epoch: AtomicI64, + pub exporting: AtomicBool, + pub cancelled: AtomicBool, + pub start_time: RwLock>>, } -pub static CHAIN_EXPORT_STATUS: LazyLock> = - LazyLock::new(|| ExportStatus::default().into()); +impl ExportStatus { + pub fn epoch(&self) -> i64 { + self.epoch.load(atomic::Ordering::Relaxed) + } -fn update_epoch(new_value: i64) { - let mut mutex = CHAIN_EXPORT_STATUS.lock(); - mutex.epoch = new_value; - if mutex.initial_epoch == 0 { - mutex.initial_epoch = new_value; + pub fn initial_epoch(&self) -> i64 { + self.initial_epoch.load(atomic::Ordering::Relaxed) + } + + pub fn exporting(&self) -> bool { + self.exporting.load(atomic::Ordering::Relaxed) + } + + pub fn cancelled(&self) -> bool { + self.cancelled.load(atomic::Ordering::Relaxed) } + + pub fn start_time(&self) -> Option> { + *self.start_time.read() + } +} + +pub static CHAIN_EXPORT_STATUS: LazyLock = LazyLock::new(ExportStatus::default); + +fn update_epoch(new_value: i64) { + let status = &*CHAIN_EXPORT_STATUS; + status.epoch.store(new_value, atomic::Ordering::Relaxed); + _ = status.initial_epoch.compare_exchange( + 0, + new_value, + atomic::Ordering::Relaxed, + atomic::Ordering::Relaxed, + ); } pub fn start_export() { - let mut mutex = CHAIN_EXPORT_STATUS.lock(); - mutex.epoch = 0; - mutex.initial_epoch = 0; - mutex.exporting = true; - mutex.cancelled = false; - mutex.start_time = Some(Utc::now()); + let status = &*CHAIN_EXPORT_STATUS; + status.epoch.store(0, atomic::Ordering::Relaxed); + status.initial_epoch.store(0, atomic::Ordering::Relaxed); + status.exporting.store(true, atomic::Ordering::Relaxed); + status.cancelled.store(false, atomic::Ordering::Relaxed); + *status.start_time.write() = Some(Utc::now()); } pub fn end_export() { - let mut mutex = CHAIN_EXPORT_STATUS.lock(); - mutex.exporting = false; + CHAIN_EXPORT_STATUS + .exporting + .store(false, atomic::Ordering::Relaxed); } pub fn cancel_export() { - let mut mutex = CHAIN_EXPORT_STATUS.lock(); - mutex.exporting = false; - mutex.cancelled = true; + let status = &*CHAIN_EXPORT_STATUS; + status.exporting.store(false, atomic::Ordering::Relaxed); + status.cancelled.store(true, atomic::Ordering::Relaxed); } fn should_save_block_to_snapshot(cid: Cid) -> bool { diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 370ce0084853..790af72cb649 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -424,12 +424,13 @@ impl RpcMethod<0> for ForestChainExportStatus { (): Self::Params, _: &http::Extensions, ) -> Result { - let mutex = CHAIN_EXPORT_STATUS.lock(); - - let progress = if mutex.initial_epoch == 0 { + let status = &*CHAIN_EXPORT_STATUS; + let initial_epoch = status.initial_epoch(); + let epoch = status.epoch(); + let progress = if initial_epoch == 0 { 0.0 } else { - let p = 1.0 - ((mutex.epoch as f64) / (mutex.initial_epoch as f64)); + let p = 1.0 - ((epoch as f64) / (initial_epoch as f64)); if p.is_finite() { p.clamp(0.0, 1.0) } else { @@ -441,9 +442,11 @@ impl RpcMethod<0> for ForestChainExportStatus { let status = ApiExportStatus { progress, - exporting: mutex.exporting, - cancelled: mutex.cancelled, - start_time: mutex.start_time, + exporting: status.exporting(), + cancelled: status.cancelled(), + start_time: status.start_time(), + current_epoch: epoch, + start_epoch: initial_epoch, }; Ok(status) diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap index ae8bde5fcb6e..bc160d5af550 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap @@ -4761,11 +4761,17 @@ components: properties: cancelled: type: boolean + current_epoch: + type: integer + format: int64 exporting: type: boolean progress: type: number format: double + start_epoch: + type: integer + format: int64 start_time: type: - string @@ -4775,6 +4781,8 @@ components: - progress - exporting - cancelled + - start_epoch + - current_epoch ApiF3ParticipationLease: description: "defines the lease granted to a storage provider for\nparticipating in F3 consensus, detailing the session identifier, issuer,\nsubject, and the expiration instance." type: object diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap index d69ca5921eb3..f597aa227c69 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap @@ -4834,11 +4834,17 @@ components: properties: cancelled: type: boolean + current_epoch: + type: integer + format: int64 exporting: type: boolean progress: type: number format: double + start_epoch: + type: integer + format: int64 start_time: type: - string @@ -4848,6 +4854,8 @@ components: - progress - exporting - cancelled + - start_epoch + - current_epoch ApiF3ParticipationLease: description: "defines the lease granted to a storage provider for\nparticipating in F3 consensus, detailing the session identifier, issuer,\nsubject, and the expiration instance." type: object diff --git a/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap b/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap index df3a185f000f..f44b22800655 100644 --- a/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap +++ b/src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap @@ -1705,11 +1705,17 @@ components: properties: cancelled: type: boolean + current_epoch: + type: integer + format: int64 exporting: type: boolean progress: type: number format: double + start_epoch: + type: integer + format: int64 start_time: type: - string @@ -1719,6 +1725,8 @@ components: - progress - exporting - cancelled + - start_epoch + - current_epoch Base64String: type: - string diff --git a/src/rpc/types/mod.rs b/src/rpc/types/mod.rs index a498a77691af..a2e44ea2d1b5 100644 --- a/src/rpc/types/mod.rs +++ b/src/rpc/types/mod.rs @@ -570,6 +570,8 @@ pub struct ApiExportStatus { pub progress: f64, pub exporting: bool, pub cancelled: bool, + pub start_epoch: i64, + pub current_epoch: i64, pub start_time: Option>, }