From c1264e0b3baa2f9fed7304dc3f050cad960fecba Mon Sep 17 00:00:00 2001 From: lightsing Date: Wed, 24 Dec 2025 13:56:53 +0800 Subject: [PATCH] link journal --- Cargo.lock | 35 +++++++++++++++++++++++++ Cargo.toml | 2 +- crates/calendar/Cargo.toml | 9 ++++++- crates/calendar/src/lib.rs | 5 ++++ crates/calendar/src/main.rs | 9 ++++++- crates/calendar/src/routes/ots.rs | 25 +++++++++++++----- crates/core/src/codec/v1/attestation.rs | 11 +++++++- crates/journal/src/lib.rs | 2 +- crates/journal/src/reader.rs | 2 +- 9 files changed, 87 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbaf6fc..3bf4232 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2103,6 +2103,25 @@ dependencies = [ "subtle", ] +[[package]] +name = "h2" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap 2.12.1", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.7.1" @@ -2243,6 +2262,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http", "http-body", "httparse", @@ -4243,6 +4263,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml_datetime" version = "0.7.3" @@ -4538,6 +4571,7 @@ dependencies = [ "bump-scope", "bytes", "criterion 0.8.1", + "digest 0.11.0-rc.4", "eyre", "itoa", "sha3 0.11.0-rc.3", @@ -4545,6 +4579,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uts-core", + "uts-journal", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index df1f79f..d92e56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ alloy-primitives = "1.5" alloy-signer = "1.1" alloy-signer-local = "1.1" auto_impl = "1.3" -axum = "0.8" +axum = { version = "0.8", default-features = false } axum-extra = "0.12" bump-scope = { version = "1.5", features = ["nightly"] } bytes = "1.11" diff --git a/crates/calendar/Cargo.toml b/crates/calendar/Cargo.toml index 4b3891b..b5c4aff 100644 --- a/crates/calendar/Cargo.toml +++ b/crates/calendar/Cargo.toml @@ -12,9 +12,14 @@ version.workspace = true alloy-primitives = { workspace = true } alloy-signer = { workspace = true } alloy-signer-local = { workspace = true } -axum = { workspace = true, features = ["macros"] } +axum = { workspace = true, default-features = false, features = [ + "macros", + "http2", + "tokio", +] } # http2 only bump-scope.workspace = true bytes = { workspace = true } +digest = { workspace = true } eyre = { workspace = true } itoa = { workspace = true } sha3 = { workspace = true } @@ -22,6 +27,7 @@ tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } uts-core = { workspace = true, features = ["bytes"] } +uts-journal = { workspace = true } [dev-dependencies] criterion.workspace = true @@ -34,4 +40,5 @@ name = "submit_digest" workspace = true [features] +dev = ["axum/http1"] # for easier testing performance = ["tracing/release_max_level_info"] diff --git a/crates/calendar/src/lib.rs b/crates/calendar/src/lib.rs index 7ed561b..6202b7a 100644 --- a/crates/calendar/src/lib.rs +++ b/crates/calendar/src/lib.rs @@ -8,6 +8,9 @@ extern crate tracing; use alloy_signer::k256::ecdsa::SigningKey; use alloy_signer_local::LocalSigner; +use digest::{OutputSizeUser, typenum::Unsigned}; +use sha3::Keccak256; +use uts_journal::Journal; /// Calendar server routes and handlers. pub mod routes; @@ -19,6 +22,8 @@ pub mod time; pub struct AppState { /// Local signer for signing OTS timestamps. pub signer: LocalSigner, + /// Journal + pub journal: Journal<{ ::OutputSize::USIZE }>, } /// Signal for graceful shutdown. diff --git a/crates/calendar/src/main.rs b/crates/calendar/src/main.rs index 8653571..2ca5709 100644 --- a/crates/calendar/src/main.rs +++ b/crates/calendar/src/main.rs @@ -9,6 +9,9 @@ use axum::{ }; use std::sync::Arc; use uts_calendar::{AppState, routes, shutdown_signal, time}; +use uts_journal::Journal; + +const RING_BUFFER_CAPACITY: usize = 1 << 20; // 1 million entries #[tokio::main] async fn main() -> eyre::Result<()> { @@ -19,6 +22,10 @@ async fn main() -> eyre::Result<()> { let signer = LocalSigner::from_bytes(&b256!( "9ba9926331eb5f4995f1e358f57ba1faab8b005b51928d2fdaea16e69a6ad225" ))?; + let journal = Journal::with_capacity(RING_BUFFER_CAPACITY); + + let _reader = journal.reader(); + // TODO: spawn stamper task let app = Router::new() .route( @@ -30,7 +37,7 @@ async fn main() -> eyre::Result<()> { "/timestamp/{hex_commitment}", get(routes::ots::get_timestamp), ) - .with_state(Arc::new(AppState { signer })); + .with_state(Arc::new(AppState { signer, journal })); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; diff --git a/crates/calendar/src/routes/ots.rs b/crates/calendar/src/routes/ots.rs index 5135586..d787186 100644 --- a/crates/calendar/src/routes/ots.rs +++ b/crates/calendar/src/routes/ots.rs @@ -1,9 +1,15 @@ use crate::{AppState, time::current_time_sec}; use alloy_signer::SignerSync; -use axum::{body::Bytes, extract::State}; +use axum::{ + body::Bytes, + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, +}; use bump_scope::Bump; use bytes::BytesMut; -use sha3::{Digest, Keccak256}; +use digest::Digest; +use sha3::Keccak256; use std::{cell::RefCell, sync::Arc}; use uts_core::{ codec::{ @@ -36,10 +42,15 @@ pub const MAX_DIGEST_SIZE: usize = 64; // e.g., SHA3-512 // result attested by Pending: update URI https://localhost:3000 // ``` /// Submit digest to calendar server and get pending timestamp in response. -pub async fn submit_digest(State(state): State>, digest: Bytes) -> Bytes { - let (output, _commitment) = submit_digest_inner(digest, &state.signer); - // TODO: submit commitment to journal - output +pub async fn submit_digest(State(state): State>, digest: Bytes) -> Response { + let (output, commitment) = submit_digest_inner(digest, &state.signer); + match state.journal.try_commit(&commitment) { + Err(_) => { + return (StatusCode::SERVICE_UNAVAILABLE, r#"{"err":"server busy"}"#).into_response(); + } // journal is full + Ok(fut) => fut.await, + } + output.into_response() } // TODO: We need to benchmark this. @@ -114,7 +125,7 @@ pub fn submit_digest_inner(digest: Bytes, signer: impl SignerSync) -> (Bytes, [u timestamp.encode(&mut buf).unwrap(); #[cfg(any(debug_assertions, not(feature = "performance")))] - trace!(timestamp = ?timestamp, encoded_length = buf.len()); + trace!(encoded_length = buf.len(), timestamp = ?timestamp); (buf.freeze(), commitment) }) diff --git a/crates/core/src/codec/v1/attestation.rs b/crates/core/src/codec/v1/attestation.rs index ce86e19..cf519b3 100644 --- a/crates/core/src/codec/v1/attestation.rs +++ b/crates/core/src/codec/v1/attestation.rs @@ -27,7 +27,7 @@ const PENDING_TAG: &[u8; 8] = b"\x83\xdf\xe3\x0d\x2e\xf9\x0c\x8e"; pub type AttestationTag = [u8; TAG_SIZE]; /// Raw Proof that some data existed at a given time. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct RawAttestation { pub tag: AttestationTag, pub data: Vec, @@ -35,6 +35,15 @@ pub struct RawAttestation { pub(crate) value: OnceLock>, } +impl fmt::Debug for RawAttestation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RawAttestation") + .field("tag", &Hexed(&self.tag)) + .field("data", &Hexed(&self.data)) + .finish() + } +} + impl DecodeIn for RawAttestation { fn decode_in(decoder: &mut impl Decoder, alloc: A) -> Result { let mut tag = [0u8; TAG_SIZE]; diff --git a/crates/journal/src/lib.rs b/crates/journal/src/lib.rs index 107aef6..be4f093 100644 --- a/crates/journal/src/lib.rs +++ b/crates/journal/src/lib.rs @@ -227,7 +227,7 @@ pub struct CommitFuture<'a, const ENTRY_SIZE: usize> { active_waker: Option, } -impl<'a, const ENTRY_SIZE: usize> Future for CommitFuture<'a, ENTRY_SIZE> { +impl Future for CommitFuture<'_, ENTRY_SIZE> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { diff --git a/crates/journal/src/reader.rs b/crates/journal/src/reader.rs index 70e77d5..cea3f36 100644 --- a/crates/journal/src/reader.rs +++ b/crates/journal/src/reader.rs @@ -70,7 +70,7 @@ impl JournalReader { target_index: u64, } - impl<'a, const ENTRY_SIZE: usize> Future for WaitForBatch<'a, ENTRY_SIZE> { + impl Future for WaitForBatch<'_, ENTRY_SIZE> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if self.reader.journal.persisted_index.load(Ordering::Acquire) >= self.target_index