Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/db/car/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::OnceLock;
use std::task::Poll;
use std::time::Duration;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{Decoder, Encoder as _};

Expand Down Expand Up @@ -312,6 +313,9 @@ impl Encoder {
roots: NonEmpty<Cid>,
mut stream: impl Stream<Item = anyhow::Result<ForestCarFrame>> + Unpin,
) -> anyhow::Result<()> {
// For troubleshooting stuck-ness issue
const ASYNC_OPS_TIMEOUT: Duration = Duration::from_mins(5);

let mut offset = 0;

// Write CARv1 header
Expand All @@ -334,9 +338,17 @@ impl Encoder {
// Write seekable zstd and collect a mapping of CIDs to frame_offset+data_offset.
let mut builder = index::Builder::new();
let mut n_frames = 0;
while let Some((cids, zstd_frame)) = stream.try_next().await? {
while let Some((cids, zstd_frame)) =
tokio::time::timeout(ASYNC_OPS_TIMEOUT, stream.try_next())
.await
.with_context(|| {
format!("`stream.try_next` timed out, offset={offset}, n_frames={n_frames}")
})??
{
builder.extend(cids.into_iter().map(|cid| (cid, offset as u64)));
sink.write_all(&zstd_frame).await?;
tokio::time::timeout(ASYNC_OPS_TIMEOUT, sink.write_all(&zstd_frame))
.await
.with_context(|| format!("`sink.write_all` timed out, offset={offset}, n_frames={n_frames}, zstd_frame_len={}", zstd_frame.len()))??;
offset += zstd_frame.len();
n_frames += 1;
}
Expand All @@ -345,7 +357,12 @@ impl Encoder {

// Create index
let writer = builder.into_writer();
writer.write_zstd_skip_frames_into(&mut sink).await?;
tokio::time::timeout(
ASYNC_OPS_TIMEOUT,
writer.write_zstd_skip_frames_into(&mut sink),
)
.await
.context("`writer.write_zstd_skip_frames_into` timed out")??;
tracing::info!("Finished writing zstd CAR index frames");
// Write ForestCAR.zst footer, it's a valid ZSTD skip-frame
let footer = ForestCarFooter {
Expand Down
76 changes: 51 additions & 25 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,78 @@ use chrono::{DateTime, Utc};
use cid::Cid;
use futures::Stream;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::Mutex;
use parking_lot::RwLock;
use pin_project_lite::pin_project;
use std::borrow::Borrow;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::{LazyLock, atomic};
use std::task::{Context, Poll};

#[derive(Default)]
pub struct ExportStatus {
pub epoch: i64,
pub initial_epoch: i64,
pub exporting: bool,
pub cancelled: bool,
pub start_time: Option<DateTime<Utc>>,
pub epoch: AtomicI64,
pub initial_epoch: AtomicI64,
pub exporting: AtomicBool,
pub cancelled: AtomicBool,
pub start_time: RwLock<Option<DateTime<Utc>>>,
}

pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
LazyLock::new(|| ExportStatus::default().into());
impl ExportStatus {
pub fn epoch(&self) -> i64 {
self.epoch.load(atomic::Ordering::Relaxed)
}

fn update_epoch(new_value: i64) {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.epoch = new_value;
if mutex.initial_epoch == 0 {
mutex.initial_epoch = new_value;
pub fn initial_epoch(&self) -> i64 {
self.initial_epoch.load(atomic::Ordering::Relaxed)
}

pub fn exporting(&self) -> bool {
self.exporting.load(atomic::Ordering::Relaxed)
}

pub fn cancelled(&self) -> bool {
self.cancelled.load(atomic::Ordering::Relaxed)
}

pub fn start_time(&self) -> Option<DateTime<Utc>> {
*self.start_time.read()
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

pub static CHAIN_EXPORT_STATUS: LazyLock<ExportStatus> = LazyLock::new(ExportStatus::default);

fn update_epoch(new_value: i64) {
let status = &*CHAIN_EXPORT_STATUS;
status.epoch.store(new_value, atomic::Ordering::Relaxed);
_ = status.initial_epoch.compare_exchange(
0,
new_value,
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
);
}

pub fn start_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.epoch = 0;
mutex.initial_epoch = 0;
mutex.exporting = true;
mutex.cancelled = false;
mutex.start_time = Some(Utc::now());
let status = &*CHAIN_EXPORT_STATUS;
status.epoch.store(0, atomic::Ordering::Relaxed);
status.initial_epoch.store(0, atomic::Ordering::Relaxed);
status.exporting.store(true, atomic::Ordering::Relaxed);
status.cancelled.store(false, atomic::Ordering::Relaxed);
*status.start_time.write() = Some(Utc::now());
}

pub fn end_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.exporting = false;
CHAIN_EXPORT_STATUS
.exporting
.store(false, atomic::Ordering::Relaxed);
}

pub fn cancel_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.exporting = false;
mutex.cancelled = true;
let status = &*CHAIN_EXPORT_STATUS;
status.exporting.store(false, atomic::Ordering::Relaxed);
status.cancelled.store(true, atomic::Ordering::Relaxed);
}

fn should_save_block_to_snapshot(cid: Cid) -> bool {
Expand Down
17 changes: 10 additions & 7 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,13 @@ impl RpcMethod<0> for ForestChainExportStatus {
(): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
let mutex = CHAIN_EXPORT_STATUS.lock();

let progress = if mutex.initial_epoch == 0 {
let status = &*CHAIN_EXPORT_STATUS;
let initial_epoch = status.initial_epoch();
let epoch = status.epoch();
let progress = if initial_epoch == 0 {
0.0
} else {
let p = 1.0 - ((mutex.epoch as f64) / (mutex.initial_epoch as f64));
let p = 1.0 - ((epoch as f64) / (initial_epoch as f64));
if p.is_finite() {
p.clamp(0.0, 1.0)
} else {
Expand All @@ -441,9 +442,11 @@ impl RpcMethod<0> for ForestChainExportStatus {

let status = ApiExportStatus {
progress,
exporting: mutex.exporting,
cancelled: mutex.cancelled,
start_time: mutex.start_time,
exporting: status.exporting(),
cancelled: status.cancelled(),
start_time: status.start_time(),
current_epoch: epoch,
start_epoch: initial_epoch,
};

Ok(status)
Expand Down
8 changes: 8 additions & 0 deletions src/rpc/snapshots/forest__rpc__tests__rpc__v0.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/rpc/snapshots/forest__rpc__tests__rpc__v1.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/rpc/snapshots/forest__rpc__tests__rpc__v2.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/rpc/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ pub struct ApiExportStatus {
pub progress: f64,
pub exporting: bool,
pub cancelled: bool,
pub start_epoch: i64,
pub current_epoch: i64,
pub start_time: Option<chrono::DateTime<Utc>>,
}

Expand Down
Loading