From edf13ab9f82c6dbf08f7d6e98346afd1d6234c1a Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 3 Oct 2025 11:00:22 +0800 Subject: [PATCH 01/15] feat: new version and async appender Signed-off-by: tison --- Cargo.toml | 24 ++++++++++++------------ appenders/fastrace/Cargo.toml | 2 +- appenders/file/Cargo.toml | 2 +- appenders/journald/Cargo.toml | 2 +- appenders/opentelemetry/Cargo.toml | 2 +- appenders/syslog/Cargo.toml | 2 +- bridges/log/Cargo.toml | 2 +- core/Cargo.toml | 2 +- diagnostics/fastrace/Cargo.toml | 2 +- layouts/google-cloud-logging/Cargo.toml | 2 +- layouts/json/Cargo.toml | 2 +- layouts/logfmt/Cargo.toml | 2 +- layouts/text/Cargo.toml | 2 +- 13 files changed, 24 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dacffa7..bafc32e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,18 +35,18 @@ rust-version = "1.85.0" [workspace.dependencies] # Workspace dependencies -logforth-append-fastrace = { version = "0.1.0", path = "appenders/fastrace" } -logforth-append-file = { version = "0.1.0", path = "appenders/file" } -logforth-append-journald = { version = "0.1.0", path = "appenders/journald" } -logforth-append-opentelemetry = { version = "0.1.0", path = "appenders/opentelemetry" } -logforth-append-syslog = { version = "0.1.0", path = "appenders/syslog" } -logforth-bridge-log = { version = "0.1.0", path = "bridges/log" } -logforth-core = { version = "0.1.0", path = "core" } -logforth-diagnostic-fastrace = { version = "0.1.0", path = "diagnostics/fastrace" } -logforth-layout-google-cloud-logging = { version = "0.1.0", path = "layouts/google-cloud-logging" } -logforth-layout-json = { version = "0.1.0", path = "layouts/json" } -logforth-layout-logfmt = { version = "0.1.0", path = "layouts/logfmt" } -logforth-layout-text = { version = "0.1.0", path = "layouts/text" } +logforth-append-fastrace = { version = "0.2.0", path = "appenders/fastrace" } +logforth-append-file = { version = "0.2.0", path = "appenders/file" } +logforth-append-journald = { version = "0.2.0", path = "appenders/journald" } +logforth-append-opentelemetry = { version = "0.2.0", path = "appenders/opentelemetry" } +logforth-append-syslog = { version = "0.2.0", path = "appenders/syslog" } +logforth-bridge-log = { version = "0.2.0", path = "bridges/log" } +logforth-core = { version = "0.2.0", path = "core" } +logforth-diagnostic-fastrace = { version = "0.2.0", path = "diagnostics/fastrace" } +logforth-layout-google-cloud-logging = { version = "0.2.0", path = "layouts/google-cloud-logging" } +logforth-layout-json = { version = "0.2.0", path = "layouts/json" } +logforth-layout-logfmt = { version = "0.2.0", path = "layouts/logfmt" } +logforth-layout-text = { version = "0.2.0", path = "layouts/text" } # Crates.io dependencies anyhow = { version = "1.0" } diff --git a/appenders/fastrace/Cargo.toml b/appenders/fastrace/Cargo.toml index a674fca..80a4777 100644 --- a/appenders/fastrace/Cargo.toml +++ b/appenders/fastrace/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-fastrace" -version = "0.1.0" +version = "0.2.0" description = "Fastrace appender for Logforth." keywords = ["logging", "log", "fastrace"] diff --git a/appenders/file/Cargo.toml b/appenders/file/Cargo.toml index eb60497..49c78e2 100644 --- a/appenders/file/Cargo.toml +++ b/appenders/file/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-file" -version = "0.1.0" +version = "0.2.0" description = "File appender for Logforth with optional rollover strategy." keywords = ["logging", "log", "file-appender"] diff --git a/appenders/journald/Cargo.toml b/appenders/journald/Cargo.toml index cb00767..06b4fb0 100644 --- a/appenders/journald/Cargo.toml +++ b/appenders/journald/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-journald" -version = "0.1.0" +version = "0.2.0" description = "Journald appender for Logforth." keywords = ["logging", "log", "journald"] diff --git a/appenders/opentelemetry/Cargo.toml b/appenders/opentelemetry/Cargo.toml index 42dde9b..23851dd 100644 --- a/appenders/opentelemetry/Cargo.toml +++ b/appenders/opentelemetry/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-opentelemetry" -version = "0.1.0" +version = "0.2.0" description = "Opemtelemetry appender for Logforth." keywords = ["logging", "log", "opentelemtry"] diff --git a/appenders/syslog/Cargo.toml b/appenders/syslog/Cargo.toml index c99c881..53fc892 100644 --- a/appenders/syslog/Cargo.toml +++ b/appenders/syslog/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-syslog" -version = "0.1.0" +version = "0.2.0" description = "Syslog appender for Logforth." keywords = ["logging", "log", "syslog"] diff --git a/bridges/log/Cargo.toml b/bridges/log/Cargo.toml index 43f8c9b..c84da1e 100644 --- a/bridges/log/Cargo.toml +++ b/bridges/log/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-bridge-log" -version = "0.1.0" +version = "0.2.0" description = "Bridge Logforth with the log crate." keywords = ["logging", "log"] diff --git a/core/Cargo.toml b/core/Cargo.toml index 6dd2501..64fb973 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-core" -version = "0.1.0" +version = "0.2.0" description = "Core structs and functions for Logforth." keywords = ["logging", "log"] diff --git a/diagnostics/fastrace/Cargo.toml b/diagnostics/fastrace/Cargo.toml index 9de0ff2..4b14d19 100644 --- a/diagnostics/fastrace/Cargo.toml +++ b/diagnostics/fastrace/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-diagnostic-fastrace" -version = "0.1.0" +version = "0.2.0" description = "Fastrace diagnostic for Logforth." keywords = ["logging", "log", "fastrace"] diff --git a/layouts/google-cloud-logging/Cargo.toml b/layouts/google-cloud-logging/Cargo.toml index df26490..b39ab20 100644 --- a/layouts/google-cloud-logging/Cargo.toml +++ b/layouts/google-cloud-logging/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-layout-google-cloud-logging" -version = "0.1.0" +version = "0.2.0" description = "Google Cloud Structured Logging layout for Logforth." keywords = ["logging", "log", "google-cloud-logging"] diff --git a/layouts/json/Cargo.toml b/layouts/json/Cargo.toml index eadd6e6..f4d42e5 100644 --- a/layouts/json/Cargo.toml +++ b/layouts/json/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-layout-json" -version = "0.1.0" +version = "0.2.0" description = "JSON layout for Logforth." keywords = ["logging", "log", "json"] diff --git a/layouts/logfmt/Cargo.toml b/layouts/logfmt/Cargo.toml index d2f8474..32a067b 100644 --- a/layouts/logfmt/Cargo.toml +++ b/layouts/logfmt/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-layout-logfmt" -version = "0.1.0" +version = "0.2.0" description = "Logfmt layout for Logforth." keywords = ["logging", "log", "logfmt"] diff --git a/layouts/text/Cargo.toml b/layouts/text/Cargo.toml index 8539339..9811e8d 100644 --- a/layouts/text/Cargo.toml +++ b/layouts/text/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-layout-text" -version = "0.1.0" +version = "0.2.0" description = "Optionally colored text layout for Logforth." keywords = ["logging", "log", "colored", "terminal"] From a03bab817435c83f2d596e6d23f5b5466457ca6f Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 3 Oct 2025 11:22:27 +0800 Subject: [PATCH 02/15] add async appender stub Signed-off-by: tison --- appenders/async/Cargo.toml | 34 ++++++++++++++++++++++++++++++++++ appenders/async/src/lib.rs | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 appenders/async/Cargo.toml create mode 100644 appenders/async/src/lib.rs diff --git a/appenders/async/Cargo.toml b/appenders/async/Cargo.toml new file mode 100644 index 0000000..8156c4f --- /dev/null +++ b/appenders/async/Cargo.toml @@ -0,0 +1,34 @@ +# Copyright 2024 FastLabs Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[package] +name = "logforth-append-async" +version = "0.2.0" + +description = "Async appender for Logforth." +keywords = ["logging", "log", "async"] + +categories.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +logforth-core = { workspace = true } + +[lints] +workspace = true diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs new file mode 100644 index 0000000..659563d --- /dev/null +++ b/appenders/async/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A composable appender, logging and flushing asynchronously. + +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +use std::sync::Arc; + +use logforth_core::Append; +use logforth_core::Diagnostic; +use logforth_core::Error; +use logforth_core::record::Record; + +/// A composable appender, logging and flushing asynchronously. +#[derive(Debug)] +pub struct Async { + appender: Arc, +} + +impl Append for Async { + fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error> { + self.appender.append(record, diags) + } +} From 3835ebfa08fb9a597fc2fa84453ccd07dd8df1d4 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 3 Oct 2025 11:32:44 +0800 Subject: [PATCH 03/15] Append flush has no default now Signed-off-by: tison --- appenders/async/src/lib.rs | 5 +++++ appenders/journald/src/lib.rs | 5 +++++ core/src/append/mod.rs | 6 +----- core/src/append/testing.rs | 5 +++++ 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs index 659563d..db3452c 100644 --- a/appenders/async/src/lib.rs +++ b/appenders/async/src/lib.rs @@ -33,4 +33,9 @@ impl Append for Async { fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error> { self.appender.append(record, diags) } + + fn flush(&self) -> Result<(), Error> { + // TODO(@tisonkun): implement actual async flushing. + Ok(()) + } } diff --git a/appenders/journald/src/lib.rs b/appenders/journald/src/lib.rs index 61d54b8..addd38b 100644 --- a/appenders/journald/src/lib.rs +++ b/appenders/journald/src/lib.rs @@ -312,4 +312,9 @@ impl Append for Journald { self.send_payload(&buffer)?; Ok(()) } + + fn flush(&self) -> Result<(), Error> { + // UnixDatagram doesn't buffer anything, so nothing to do here. + Ok(()) + } } diff --git a/core/src/append/mod.rs b/core/src/append/mod.rs index 16c7089..573e80a 100644 --- a/core/src/append/mod.rs +++ b/core/src/append/mod.rs @@ -33,11 +33,7 @@ pub trait Append: fmt::Debug + Send + Sync + 'static { fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error>; /// Flush any buffered records. - /// - /// Default to a no-op. - fn flush(&self) -> Result<(), Error> { - Ok(()) - } + fn flush(&self) -> Result<(), Error>; } impl From for Box { diff --git a/core/src/append/testing.rs b/core/src/append/testing.rs index ae5d555..d595ef9 100644 --- a/core/src/append/testing.rs +++ b/core/src/append/testing.rs @@ -67,4 +67,9 @@ impl Append for Testing { eprintln!("{}", String::from_utf8_lossy(&bytes)); Ok(()) } + + fn flush(&self) -> Result<(), Error> { + // nothing to flush for eprintln + Ok(()) + } } From 0b65ef896a05a7d9d58fcd385d7f3d307aed867c Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 5 Oct 2025 20:48:28 +0800 Subject: [PATCH 04/15] impl Drop for appenders when useful Signed-off-by: tison --- appenders/fastrace/src/lib.rs | 6 ++++++ appenders/file/src/append.rs | 7 +++++++ appenders/opentelemetry/src/lib.rs | 6 ++++++ appenders/syslog/src/lib.rs | 7 +++++++ 4 files changed, 26 insertions(+) diff --git a/appenders/fastrace/src/lib.rs b/appenders/fastrace/src/lib.rs index 5e42070..14bd22c 100644 --- a/appenders/fastrace/src/lib.rs +++ b/appenders/fastrace/src/lib.rs @@ -75,6 +75,12 @@ impl Append for FastraceEvent { } } +impl Drop for FastraceEvent { + fn drop(&mut self) { + fastrace::flush(); + } +} + struct KvCollector { kv: Vec<(String, String)>, } diff --git a/appenders/file/src/append.rs b/appenders/file/src/append.rs index 4bd795a..3aa1e8c 100644 --- a/appenders/file/src/append.rs +++ b/appenders/file/src/append.rs @@ -151,3 +151,10 @@ impl Append for File { Ok(()) } } + +impl Drop for File { + fn drop(&mut self) { + let writer = self.writer.get_mut().unwrap_or_else(|e| e.into_inner()); + let _ = writer.flush(); + } +} diff --git a/appenders/opentelemetry/src/lib.rs b/appenders/opentelemetry/src/lib.rs index 23c3c08..3c2710f 100644 --- a/appenders/opentelemetry/src/lib.rs +++ b/appenders/opentelemetry/src/lib.rs @@ -277,6 +277,12 @@ impl Append for OpentelemetryLog { } } +impl Drop for OpentelemetryLog { + fn drop(&mut self) { + let _ = self.provider.force_flush(); + } +} + fn log_level_to_otel_severity(level: Level) -> opentelemetry::logs::Severity { match level { Level::Error => opentelemetry::logs::Severity::Error, diff --git a/appenders/syslog/src/lib.rs b/appenders/syslog/src/lib.rs index 43958de..7d98c66 100644 --- a/appenders/syslog/src/lib.rs +++ b/appenders/syslog/src/lib.rs @@ -308,6 +308,13 @@ impl Append for Syslog { } } +impl Drop for Syslog { + fn drop(&mut self) { + let sender = self.sender.get_mut().unwrap_or_else(|e| e.into_inner()); + let _ = sender.flush(); + } +} + #[derive(Debug)] struct SyslogFormatter { format: SyslogFormat, From ca345e9c1e2578da953d0adbe7f42463848b07c8 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 5 Oct 2025 21:58:46 +0800 Subject: [PATCH 05/15] fixup links Signed-off-by: tison --- logforth/src/lib.rs | 2 ++ logforth/src/starter_log.rs | 14 +++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/logforth/src/lib.rs b/logforth/src/lib.rs index 55d6989..bdfbf5c 100644 --- a/logforth/src/lib.rs +++ b/logforth/src/lib.rs @@ -93,6 +93,8 @@ pub mod append { /// Bridge logforth with other logging frameworks. pub mod bridge { /// Bridge logforth with [`log`]. + /// + /// [`log`]: https://docs.rs/log/ #[cfg(feature = "bridge-log")] pub mod log { #[cfg(feature = "bridge-log")] diff --git a/logforth/src/starter_log.rs b/logforth/src/starter_log.rs index 374da53..d5f6068 100644 --- a/logforth/src/starter_log.rs +++ b/logforth/src/starter_log.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Starter configurations for quickly setting up logforth with the [`log`] crate +//! Starter configurations for quickly setting up logforth with the `log` crate use crate::Append; use crate::Error; @@ -23,7 +23,7 @@ use crate::core::DispatchBuilder; use crate::core::LoggerBuilder; use crate::filter::env_filter::EnvFilterBuilder; -/// A builder for setting up logforth with the [`log`] crate. +/// A builder for setting up logforth with the `log` crate. pub struct LogStarterBuilder { builder: LoggerBuilder, } @@ -70,7 +70,7 @@ impl LogStarterBuilder { /// before initialization will be ignored. /// /// This function will set the global maximum log level to `Trace`. To override this, call - /// [`log::set_max_level`] after this function. + /// `log::set_max_level` after this function. /// /// # Errors /// @@ -103,7 +103,7 @@ impl LogStarterBuilder { /// initialized a global logger. /// /// This function will set the global maximum log level to `Trace`. To override this, call - /// [`log::set_max_level`] after this function. + /// `log::set_max_level` after this function. /// /// # Panics /// @@ -125,7 +125,7 @@ enum StdStream { Stderr(append::Stderr), } -/// A builder for setting up logforth with the [`log`] crate, using standard output/error streams. +/// A builder for setting up logforth with the `log` crate, using standard output/error streams. pub struct LogStarterStdStreamBuilder { append: StdStream, filter: Box, @@ -207,7 +207,7 @@ impl LogStarterStdStreamBuilder { /// before initialization will be ignored. /// /// This function will set the global maximum log level to `Trace`. To override this, call - /// [`log::set_max_level`] after this function. + /// `log::set_max_level` after this function. /// /// # Errors /// @@ -246,7 +246,7 @@ impl LogStarterStdStreamBuilder { /// initialized a global logger. /// /// This function will set the global maximum log level to `Trace`. To override this, call - /// [`log::set_max_level`] after this function. + /// `log::set_max_level` after this function. /// /// # Panics /// From 9cd2c6034480070a0ea5b7a1677afe7d13819bd1 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 5 Oct 2025 22:51:28 +0800 Subject: [PATCH 06/15] impl Async appender Signed-off-by: tison --- Cargo.toml | 1 + appenders/async/Cargo.toml | 1 + appenders/async/src/lib.rs | 39 +++++++++++++++++++++++++++++++++++--- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bafc32e..5eb1b01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ logforth-layout-text = { version = "0.2.0", path = "layouts/text" } # Crates.io dependencies anyhow = { version = "1.0" } colored = { version = "3.0" } +crossbeam-channel = { version = "0.5.15" } fastrace = { version = "0.7" } fasyslog = { version = "1.0.0" } insta = { version = "1.43.2" } diff --git a/appenders/async/Cargo.toml b/appenders/async/Cargo.toml index 8156c4f..4c7bff8 100644 --- a/appenders/async/Cargo.toml +++ b/appenders/async/Cargo.toml @@ -28,6 +28,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] +crossbeam-channel = { workspace = true } logforth-core = { workspace = true } [lints] diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs index db3452c..d7d07f1 100644 --- a/appenders/async/src/lib.rs +++ b/appenders/async/src/lib.rs @@ -16,12 +16,12 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] -use std::sync::Arc; - +use crossbeam_channel::Receiver; use logforth_core::Append; use logforth_core::Diagnostic; use logforth_core::Error; -use logforth_core::record::Record; +use logforth_core::record::{Record, RecordOwned}; +use std::sync::Arc; /// A composable appender, logging and flushing asynchronously. #[derive(Debug)] @@ -39,3 +39,36 @@ impl Append for Async { Ok(()) } } + +enum Task { + Log { + appender: Arc, + record: RecordOwned, + }, + Flush { + appender: Arc, + }, +} + +struct Worker { + receiver: Receiver, +} + +impl Worker { + fn run(&self) { + while let Ok(task) = self.receiver.recv() { + match task { + Task::Log { appender, record } => { + if let Err(err) = appender.append(&record.as_record(), &[]) { + eprintln!("failed to append record asynchronously: {err:?}"); + } + } + Task::Flush { appender } => { + if let Err(err) = appender.flush() { + eprintln!("failed to flush asynchronously: {err:?}"); + } + } + } + } + } +} From 9316c787ed108692ad3dfd83b82697fed1d200f0 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 5 Oct 2025 23:07:02 +0800 Subject: [PATCH 07/15] diags as record field Signed-off-by: tison --- core/src/record.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/core/src/record.rs b/core/src/record.rs index efe8899..91d25c3 100644 --- a/core/src/record.rs +++ b/core/src/record.rs @@ -73,6 +73,9 @@ pub struct Record<'a> { // structural logging kvs: KeyValues<'a>, + + // Mapped Diagnostic Context (MDC) + diags: KeyValues<'a>, } impl<'a> Record<'a> { @@ -164,6 +167,11 @@ impl<'a> Record<'a> { .iter() .map(|(k, v)| (k.to_owned(), v.to_owned())) .collect(), + diags: self + .diags + .iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect(), } } @@ -181,6 +189,7 @@ impl<'a> Record<'a> { line: self.line, payload: self.payload.clone(), kvs: self.kvs.clone(), + diags: self.diags.clone(), }, } } @@ -208,6 +217,7 @@ impl Default for RecordBuilder<'_> { line: None, payload: Default::default(), kvs: Default::default(), + diags: Default::default(), }, } } @@ -283,6 +293,15 @@ impl<'a> RecordBuilder<'a> { pub fn build(self) -> Record<'a> { self.record } + + /// Set [`diags`](struct.Record.html#method.diags) + /// + /// This is only called by the logging framework to set the current + /// diagnostic context. + pub(crate) fn diags(mut self, diags: impl Into>) -> Self { + self.record.diags = diags.into(); + self + } } /// Metadata about a log message. @@ -372,6 +391,9 @@ pub struct RecordOwned { // structural logging kvs: Vec<(kv::KeyOwned, kv::ValueOwned)>, + + // Mapped Diagnostic Context (MDC) + diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, } /// Owned version of metadata about a log message. @@ -395,6 +417,7 @@ impl RecordOwned { line: self.line, payload: self.payload.clone(), kvs: KeyValues::from(self.kvs.as_slice()), + diags: KeyValues::from(self.diags.as_slice()), } } } From 960605abd0dc7c9c3b434b51e4e04ae4ec14d174 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 5 Oct 2025 23:24:00 +0800 Subject: [PATCH 08/15] Revert "diags as record field" This reverts commit 9316c787ed108692ad3dfd83b82697fed1d200f0. --- core/src/record.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/core/src/record.rs b/core/src/record.rs index 91d25c3..efe8899 100644 --- a/core/src/record.rs +++ b/core/src/record.rs @@ -73,9 +73,6 @@ pub struct Record<'a> { // structural logging kvs: KeyValues<'a>, - - // Mapped Diagnostic Context (MDC) - diags: KeyValues<'a>, } impl<'a> Record<'a> { @@ -167,11 +164,6 @@ impl<'a> Record<'a> { .iter() .map(|(k, v)| (k.to_owned(), v.to_owned())) .collect(), - diags: self - .diags - .iter() - .map(|(k, v)| (k.to_owned(), v.to_owned())) - .collect(), } } @@ -189,7 +181,6 @@ impl<'a> Record<'a> { line: self.line, payload: self.payload.clone(), kvs: self.kvs.clone(), - diags: self.diags.clone(), }, } } @@ -217,7 +208,6 @@ impl Default for RecordBuilder<'_> { line: None, payload: Default::default(), kvs: Default::default(), - diags: Default::default(), }, } } @@ -293,15 +283,6 @@ impl<'a> RecordBuilder<'a> { pub fn build(self) -> Record<'a> { self.record } - - /// Set [`diags`](struct.Record.html#method.diags) - /// - /// This is only called by the logging framework to set the current - /// diagnostic context. - pub(crate) fn diags(mut self, diags: impl Into>) -> Self { - self.record.diags = diags.into(); - self - } } /// Metadata about a log message. @@ -391,9 +372,6 @@ pub struct RecordOwned { // structural logging kvs: Vec<(kv::KeyOwned, kv::ValueOwned)>, - - // Mapped Diagnostic Context (MDC) - diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, } /// Owned version of metadata about a log message. @@ -417,7 +395,6 @@ impl RecordOwned { line: self.line, payload: self.payload.clone(), kvs: KeyValues::from(self.kvs.as_slice()), - diags: KeyValues::from(self.diags.as_slice()), } } } From 64be2d2193a2c744f0f32f94d298e2b038bbf59a Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 6 Oct 2025 11:45:09 +0800 Subject: [PATCH 09/15] impl async sink Signed-off-by: tison --- Cargo.toml | 2 +- appenders/async/Cargo.toml | 5 + appenders/async/src/lib.rs | 259 +++++++++++++++++++++++++++++++++---- core/Cargo.toml | 4 + core/src/error.rs | 14 ++ core/src/lib.rs | 2 +- 6 files changed, 261 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5eb1b01..6deba40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ members = [ "core", "logforth", - "appenders/*", "bridges/*", "diagnostics/*", @@ -50,6 +49,7 @@ logforth-layout-text = { version = "0.2.0", path = "layouts/text" } # Crates.io dependencies anyhow = { version = "1.0" } +arc-swap = { version = "1.7.1" } colored = { version = "3.0" } crossbeam-channel = { version = "0.5.15" } fastrace = { version = "0.7" } diff --git a/appenders/async/Cargo.toml b/appenders/async/Cargo.toml index 4c7bff8..593a52d 100644 --- a/appenders/async/Cargo.toml +++ b/appenders/async/Cargo.toml @@ -27,7 +27,12 @@ readme.workspace = true repository.workspace = true rust-version.workspace = true +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + [dependencies] +arc-swap = { workspace = true } crossbeam-channel = { workspace = true } logforth-core = { workspace = true } diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs index d7d07f1..8d77157 100644 --- a/appenders/async/src/lib.rs +++ b/appenders/async/src/lib.rs @@ -16,59 +16,272 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] -use crossbeam_channel::Receiver; -use logforth_core::Append; -use logforth_core::Diagnostic; +use arc_swap::ArcSwapOption; +use crossbeam_channel::{Receiver, Sender}; use logforth_core::Error; +use logforth_core::kv::Visitor; use logforth_core::record::{Record, RecordOwned}; +use logforth_core::{Append, kv}; +use logforth_core::{Diagnostic, ErrorSink}; use std::sync::Arc; +use std::thread::JoinHandle; /// A composable appender, logging and flushing asynchronously. #[derive(Debug)] pub struct Async { - appender: Arc, + appends: Arc<[Box]>, + overflow_policy: OverflowPolicy, + state: ArcSwapOption, +} + +#[derive(Debug)] +struct AsyncState { + sender: Sender, + thread_handle: JoinHandle<()>, } impl Append for Async { fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error> { - self.appender.append(record, diags) + let mut diagnostics = vec![]; + + struct DiagCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>); + + impl<'a> Visitor for DiagCollector<'a> { + fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> { + self.0.push((key.to_owned(), value.to_owned())); + Ok(()) + } + } + + let mut collector = DiagCollector(&mut diagnostics); + for d in diags { + d.visit(&mut collector)?; + } + + self.send_task(Task::Log { + appends: self.appends.clone(), + record: record.to_owned(), + diags: diagnostics, + }) } fn flush(&self) -> Result<(), Error> { - // TODO(@tisonkun): implement actual async flushing. - Ok(()) + self.send_task(Task::Flush { + appends: self.appends.clone(), + }) } } -enum Task { - Log { - appender: Arc, - record: RecordOwned, - }, - Flush { - appender: Arc, - }, +impl Async { + fn send_task(&self, task: Task) -> Result<(), Error> { + let state = self.state.load(); + // SAFETY: state is always Some before dropped. + let state = state.as_ref().unwrap(); + let sender = &state.sender; + + match self.overflow_policy { + OverflowPolicy::Block => sender.send(task).map_err(|err| { + Error::new(match err.0 { + Task::Log { .. } => "failed to send log task to async appender", + Task::Flush { .. } => "failed to send flush task to async appender", + }) + }), + OverflowPolicy::DropIncoming => match sender.try_send(task) { + Ok(()) => Ok(()), + Err(crossbeam_channel::TrySendError::Full(_)) => Ok(()), + Err(crossbeam_channel::TrySendError::Disconnected(task)) => { + Err(Error::new(match task { + Task::Log { .. } => "failed to send log task to async appender", + Task::Flush { .. } => "failed to send flush task to async appender", + })) + } + }, + } + } + + fn destroy(&self) { + if let Some(state) = self.state.swap(None) { + // SAFETY: state has always one strong count before swapped. + let AsyncState { + sender, + thread_handle, + } = Arc::into_inner(state).unwrap(); + + drop(sender); + thread_handle.join().unwrap(); + } + } +} + +impl Drop for Async { + fn drop(&mut self) { + self.destroy(); + } +} + +/// +pub struct AsyncBuilder { + thread_name: String, + appends: Vec>, + buffered_lines_limit: Option, + error_sink: Box, + overflow_policy: OverflowPolicy, +} + +impl AsyncBuilder { + /// Create a new async appender builder. + pub fn new(thread_name: impl Into) -> AsyncBuilder { + AsyncBuilder { + thread_name: thread_name.into(), + appends: vec![], + buffered_lines_limit: None, + error_sink: Box::new(PrintErrorSink), + overflow_policy: OverflowPolicy::Block, + } + } + + /// Set the buffer size of pending messages. + pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option) -> Self { + self.buffered_lines_limit = buffered_lines_limit; + self + } + + /// Set the overflow policy for this async appender. + pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self { + self.overflow_policy = overflow_policy; + self + } + + /// Set the error sink for this async appender. + pub fn error_sink(mut self, error_sink: impl Into>) -> Self { + self.error_sink = error_sink.into(); + self + } + + /// Add an appender to this async appender. + pub fn append(mut self, append: impl Into>) -> Self { + self.appends.push(append.into()); + self + } + + /// Build the async appender. + pub fn build(self) -> Async { + let Self { + thread_name, + appends, + buffered_lines_limit, + error_sink, + overflow_policy, + } = self; + + let (sender, receiver) = match buffered_lines_limit { + Some(limit) => crossbeam_channel::bounded(limit), + None => crossbeam_channel::unbounded(), + }; + + let worker = Worker { + receiver, + error_sink, + }; + + let thread_handle = std::thread::Builder::new() + .name(thread_name) + .spawn(move || worker.run()) + .expect("failed to spawn async appender thread"); + + Async { + appends: appends.into_boxed_slice().into(), + overflow_policy, + state: ArcSwapOption::from(Some(Arc::new(AsyncState { + sender, + thread_handle, + }))), + } + } +} + +struct PrintErrorSink; + +impl ErrorSink for PrintErrorSink { + fn sink(&self, err: &Error) { + eprintln!("{err}"); + } } struct Worker { receiver: Receiver, + error_sink: Box, } impl Worker { - fn run(&self) { - while let Ok(task) = self.receiver.recv() { + fn run(self) { + let Self { + receiver, + error_sink, + } = self; + + while let Ok(task) = receiver.recv() { match task { - Task::Log { appender, record } => { - if let Err(err) = appender.append(&record.as_record(), &[]) { - eprintln!("failed to append record asynchronously: {err:?}"); + Task::Log { + appends, + record, + diags, + } => { + let diags: Vec> = vec![Box::new(OwnedDiagnostic(diags))]; + let diags = diags.as_slice(); + let record = record.as_record(); + for append in appends.iter() { + if let Err(err) = append.append(&record, diags) { + let err = Error::new("failed to append record").set_source(err); + error_sink.sink(&err); + } } } - Task::Flush { appender } => { - if let Err(err) = appender.flush() { - eprintln!("failed to flush asynchronously: {err:?}"); + Task::Flush { appends } => { + for append in appends.iter() { + if let Err(err) = append.flush() { + let err = Error::new("failed to flush").set_source(err); + error_sink.sink(&err); + } } } } } } } + +/// Overflow policy for [`Async`]. +/// +/// When the channel is full, an incoming operation is handled according to the +/// specified policy. +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +#[non_exhaustive] +pub enum OverflowPolicy { + /// Blocks until the channel is not full. + Block, + /// Drops the incoming operation. + DropIncoming, +} + +enum Task { + Log { + appends: Arc<[Box]>, + record: RecordOwned, + diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, + }, + Flush { + appends: Arc<[Box]>, + }, +} + +#[derive(Debug)] +struct OwnedDiagnostic(Vec<(kv::KeyOwned, kv::ValueOwned)>); + +impl Diagnostic for OwnedDiagnostic { + fn visit(&self, visitor: &mut dyn Visitor) -> Result<(), Error> { + for (key, value) in &self.0 { + visitor.visit(key.by_ref(), value.by_ref())?; + } + Ok(()) + } +} diff --git a/core/Cargo.toml b/core/Cargo.toml index 64fb973..b808adf 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -27,6 +27,10 @@ readme.workspace = true repository.workspace = true rust-version.workspace = true +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + [features] default = [] serde = ["value-bag/serde"] diff --git a/core/src/error.rs b/core/src/error.rs index c15aa9f..ea0d701 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -12,9 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Error type and error sink trait. + use std::fmt; use std::io; +/// An error sink to receive errors. +pub trait ErrorSink: Send + Sync + 'static { + /// Receive an error + fn sink(&self, err: &Error); +} + +impl From for Box { + fn from(value: T) -> Self { + Box::new(value) + } +} + /// The error struct of logforth. pub struct Error { message: String, diff --git a/core/src/lib.rs b/core/src/lib.rs index 0716ae3..f6ee5c1 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -28,7 +28,7 @@ pub use self::diagnostic::Diagnostic; pub use self::filter::Filter; pub use self::layout::Layout; -mod error; +pub mod error; pub use self::error::*; mod logger; From 286d74c3922a6186e102fe9886d843797020238f Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 6 Oct 2025 13:05:18 +0800 Subject: [PATCH 10/15] refactor out Signed-off-by: tison --- Cargo.toml | 1 + appenders/async/src/lib.rs | 287 ------------------- appenders/{async => asynchronous}/Cargo.toml | 4 +- appenders/asynchronous/README.md | 5 + appenders/asynchronous/src/append.rs | 169 +++++++++++ appenders/asynchronous/src/lib.rs | 42 +++ appenders/asynchronous/src/state.rs | 86 ++++++ appenders/asynchronous/src/worker.rs | 83 ++++++ appenders/file/{src => }/README.md | 0 appenders/journald/{src => }/README.md | 0 logforth/Cargo.toml | 2 + logforth/src/lib.rs | 4 + 12 files changed, 394 insertions(+), 289 deletions(-) delete mode 100644 appenders/async/src/lib.rs rename appenders/{async => asynchronous}/Cargo.toml (92%) create mode 100644 appenders/asynchronous/README.md create mode 100644 appenders/asynchronous/src/append.rs create mode 100644 appenders/asynchronous/src/lib.rs create mode 100644 appenders/asynchronous/src/state.rs create mode 100644 appenders/asynchronous/src/worker.rs rename appenders/file/{src => }/README.md (100%) rename appenders/journald/{src => }/README.md (100%) diff --git a/Cargo.toml b/Cargo.toml index 6deba40..389a225 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ rust-version = "1.85.0" [workspace.dependencies] # Workspace dependencies +logforth-append-asynchronous = { version = "0.2.0", path = "appenders/asynchronous" } logforth-append-fastrace = { version = "0.2.0", path = "appenders/fastrace" } logforth-append-file = { version = "0.2.0", path = "appenders/file" } logforth-append-journald = { version = "0.2.0", path = "appenders/journald" } diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs deleted file mode 100644 index 8d77157..0000000 --- a/appenders/async/src/lib.rs +++ /dev/null @@ -1,287 +0,0 @@ -// Copyright 2024 FastLabs Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! A composable appender, logging and flushing asynchronously. - -#![cfg_attr(docsrs, feature(doc_auto_cfg))] - -use arc_swap::ArcSwapOption; -use crossbeam_channel::{Receiver, Sender}; -use logforth_core::Error; -use logforth_core::kv::Visitor; -use logforth_core::record::{Record, RecordOwned}; -use logforth_core::{Append, kv}; -use logforth_core::{Diagnostic, ErrorSink}; -use std::sync::Arc; -use std::thread::JoinHandle; - -/// A composable appender, logging and flushing asynchronously. -#[derive(Debug)] -pub struct Async { - appends: Arc<[Box]>, - overflow_policy: OverflowPolicy, - state: ArcSwapOption, -} - -#[derive(Debug)] -struct AsyncState { - sender: Sender, - thread_handle: JoinHandle<()>, -} - -impl Append for Async { - fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error> { - let mut diagnostics = vec![]; - - struct DiagCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>); - - impl<'a> Visitor for DiagCollector<'a> { - fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> { - self.0.push((key.to_owned(), value.to_owned())); - Ok(()) - } - } - - let mut collector = DiagCollector(&mut diagnostics); - for d in diags { - d.visit(&mut collector)?; - } - - self.send_task(Task::Log { - appends: self.appends.clone(), - record: record.to_owned(), - diags: diagnostics, - }) - } - - fn flush(&self) -> Result<(), Error> { - self.send_task(Task::Flush { - appends: self.appends.clone(), - }) - } -} - -impl Async { - fn send_task(&self, task: Task) -> Result<(), Error> { - let state = self.state.load(); - // SAFETY: state is always Some before dropped. - let state = state.as_ref().unwrap(); - let sender = &state.sender; - - match self.overflow_policy { - OverflowPolicy::Block => sender.send(task).map_err(|err| { - Error::new(match err.0 { - Task::Log { .. } => "failed to send log task to async appender", - Task::Flush { .. } => "failed to send flush task to async appender", - }) - }), - OverflowPolicy::DropIncoming => match sender.try_send(task) { - Ok(()) => Ok(()), - Err(crossbeam_channel::TrySendError::Full(_)) => Ok(()), - Err(crossbeam_channel::TrySendError::Disconnected(task)) => { - Err(Error::new(match task { - Task::Log { .. } => "failed to send log task to async appender", - Task::Flush { .. } => "failed to send flush task to async appender", - })) - } - }, - } - } - - fn destroy(&self) { - if let Some(state) = self.state.swap(None) { - // SAFETY: state has always one strong count before swapped. - let AsyncState { - sender, - thread_handle, - } = Arc::into_inner(state).unwrap(); - - drop(sender); - thread_handle.join().unwrap(); - } - } -} - -impl Drop for Async { - fn drop(&mut self) { - self.destroy(); - } -} - -/// -pub struct AsyncBuilder { - thread_name: String, - appends: Vec>, - buffered_lines_limit: Option, - error_sink: Box, - overflow_policy: OverflowPolicy, -} - -impl AsyncBuilder { - /// Create a new async appender builder. - pub fn new(thread_name: impl Into) -> AsyncBuilder { - AsyncBuilder { - thread_name: thread_name.into(), - appends: vec![], - buffered_lines_limit: None, - error_sink: Box::new(PrintErrorSink), - overflow_policy: OverflowPolicy::Block, - } - } - - /// Set the buffer size of pending messages. - pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option) -> Self { - self.buffered_lines_limit = buffered_lines_limit; - self - } - - /// Set the overflow policy for this async appender. - pub fn overflow_policy(mut self, overflow_policy: OverflowPolicy) -> Self { - self.overflow_policy = overflow_policy; - self - } - - /// Set the error sink for this async appender. - pub fn error_sink(mut self, error_sink: impl Into>) -> Self { - self.error_sink = error_sink.into(); - self - } - - /// Add an appender to this async appender. - pub fn append(mut self, append: impl Into>) -> Self { - self.appends.push(append.into()); - self - } - - /// Build the async appender. - pub fn build(self) -> Async { - let Self { - thread_name, - appends, - buffered_lines_limit, - error_sink, - overflow_policy, - } = self; - - let (sender, receiver) = match buffered_lines_limit { - Some(limit) => crossbeam_channel::bounded(limit), - None => crossbeam_channel::unbounded(), - }; - - let worker = Worker { - receiver, - error_sink, - }; - - let thread_handle = std::thread::Builder::new() - .name(thread_name) - .spawn(move || worker.run()) - .expect("failed to spawn async appender thread"); - - Async { - appends: appends.into_boxed_slice().into(), - overflow_policy, - state: ArcSwapOption::from(Some(Arc::new(AsyncState { - sender, - thread_handle, - }))), - } - } -} - -struct PrintErrorSink; - -impl ErrorSink for PrintErrorSink { - fn sink(&self, err: &Error) { - eprintln!("{err}"); - } -} - -struct Worker { - receiver: Receiver, - error_sink: Box, -} - -impl Worker { - fn run(self) { - let Self { - receiver, - error_sink, - } = self; - - while let Ok(task) = receiver.recv() { - match task { - Task::Log { - appends, - record, - diags, - } => { - let diags: Vec> = vec![Box::new(OwnedDiagnostic(diags))]; - let diags = diags.as_slice(); - let record = record.as_record(); - for append in appends.iter() { - if let Err(err) = append.append(&record, diags) { - let err = Error::new("failed to append record").set_source(err); - error_sink.sink(&err); - } - } - } - Task::Flush { appends } => { - for append in appends.iter() { - if let Err(err) = append.flush() { - let err = Error::new("failed to flush").set_source(err); - error_sink.sink(&err); - } - } - } - } - } - } -} - -/// Overflow policy for [`Async`]. -/// -/// When the channel is full, an incoming operation is handled according to the -/// specified policy. -#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] -#[non_exhaustive] -pub enum OverflowPolicy { - /// Blocks until the channel is not full. - Block, - /// Drops the incoming operation. - DropIncoming, -} - -enum Task { - Log { - appends: Arc<[Box]>, - record: RecordOwned, - diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, - }, - Flush { - appends: Arc<[Box]>, - }, -} - -#[derive(Debug)] -struct OwnedDiagnostic(Vec<(kv::KeyOwned, kv::ValueOwned)>); - -impl Diagnostic for OwnedDiagnostic { - fn visit(&self, visitor: &mut dyn Visitor) -> Result<(), Error> { - for (key, value) in &self.0 { - visitor.visit(key.by_ref(), value.by_ref())?; - } - Ok(()) - } -} diff --git a/appenders/async/Cargo.toml b/appenders/asynchronous/Cargo.toml similarity index 92% rename from appenders/async/Cargo.toml rename to appenders/asynchronous/Cargo.toml index 593a52d..63b7ad8 100644 --- a/appenders/async/Cargo.toml +++ b/appenders/asynchronous/Cargo.toml @@ -13,10 +13,10 @@ # limitations under the License. [package] -name = "logforth-append-async" +name = "logforth-append-asynchronous" version = "0.2.0" -description = "Async appender for Logforth." +description = "Asynchronous appender for Logforth." keywords = ["logging", "log", "async"] categories.workspace = true diff --git a/appenders/asynchronous/README.md b/appenders/asynchronous/README.md new file mode 100644 index 0000000..93e6769 --- /dev/null +++ b/appenders/asynchronous/README.md @@ -0,0 +1,5 @@ +# Asynchronous Appender + +This appender is a remix of [spdlog-rs's AsyncPoolSink](https://docs.rs/spdlog-rs/*/spdlog/sink/struct.AsyncPoolSink.html), with several modifications to fit this crate's need: + +* Instead of a thread pool, it uses a single background thread to drain the log queue. diff --git a/appenders/asynchronous/src/append.rs b/appenders/asynchronous/src/append.rs new file mode 100644 index 0000000..0c56987 --- /dev/null +++ b/appenders/asynchronous/src/append.rs @@ -0,0 +1,169 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use logforth_core::Append; +use logforth_core::Diagnostic; +use logforth_core::Error; +use logforth_core::ErrorSink; +use logforth_core::kv; +use logforth_core::kv::Visitor; +use logforth_core::record::Record; + +use crate::Task; +use crate::state::AppendState; +use crate::worker::Worker; + +/// A composable appender, logging and flushing asynchronously. +#[derive(Debug)] +pub struct Asynchronous { + appends: Arc<[Box]>, + overflow: Overflow, + state: AppendState, +} + +impl Append for Asynchronous { + fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error> { + let mut diagnostics = vec![]; + + let mut collector = DiagnosticCollector(&mut diagnostics); + for d in diags { + d.visit(&mut collector)?; + } + + let overflow = self.overflow; + let task = Task::Log { + appends: self.appends.clone(), + record: Box::new(record.to_owned()), + diags: diagnostics, + }; + self.state.send_task(task, overflow) + } + + fn flush(&self) -> Result<(), Error> { + let overflow = self.overflow; + let task = Task::Flush { + appends: self.appends.clone(), + }; + self.state.send_task(task, overflow) + } +} + +/// A builder for configuring an asynchronous appender. +pub struct AsyncBuilder { + thread_name: String, + appends: Vec>, + buffered_lines_limit: Option, + error_sink: Box, + overflow: Overflow, +} + +impl AsyncBuilder { + /// Create a new asynchronous appender builder. + pub fn new(thread_name: impl Into) -> AsyncBuilder { + AsyncBuilder { + thread_name: thread_name.into(), + appends: vec![], + buffered_lines_limit: None, + error_sink: Box::new(PrintErrorSink), + overflow: Overflow::Block, + } + } + + /// Set the buffer size of pending messages. + pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option) -> Self { + self.buffered_lines_limit = buffered_lines_limit; + self + } + + /// Set the overflow policy for this asynchronous appender. + pub fn overflow(mut self, overflow: Overflow) -> Self { + self.overflow = overflow; + self + } + + /// Set the error sink for this asynchronous appender. + pub fn error_sink(mut self, error_sink: impl Into>) -> Self { + self.error_sink = error_sink.into(); + self + } + + /// Add an appender to this asynchronous appender. + pub fn append(mut self, append: impl Into>) -> Self { + self.appends.push(append.into()); + self + } + + /// Build the asynchronous appender. + pub fn build(self) -> Asynchronous { + let Self { + thread_name, + appends, + buffered_lines_limit, + error_sink, + overflow, + } = self; + + let appends = appends.into_boxed_slice().into(); + + let (sender, receiver) = match buffered_lines_limit { + Some(limit) => crossbeam_channel::bounded(limit), + None => crossbeam_channel::unbounded(), + }; + + let worker = Worker::new(receiver, error_sink); + let thread_handle = std::thread::Builder::new() + .name(thread_name) + .spawn(move || worker.run()) + .expect("failed to spawn asynchronous appender thread"); + let state = AppendState::new(sender, thread_handle); + + Asynchronous { + appends, + overflow, + state, + } + } +} + +/// Overflow policy for [`Asynchronous`]. +/// +/// When the channel is full, an incoming operation is handled according to the +/// specified policy. +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +#[non_exhaustive] +pub enum Overflow { + /// Blocks until the channel is not full. + Block, + /// Drops the incoming operation. + DropIncoming, +} + +struct PrintErrorSink; + +impl ErrorSink for PrintErrorSink { + fn sink(&self, err: &Error) { + eprintln!("{err}"); + } +} + +struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>); + +impl<'a> Visitor for DiagnosticCollector<'a> { + fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> { + self.0.push((key.to_owned(), value.to_owned())); + Ok(()) + } +} diff --git a/appenders/asynchronous/src/lib.rs b/appenders/asynchronous/src/lib.rs new file mode 100644 index 0000000..cee601c --- /dev/null +++ b/appenders/asynchronous/src/lib.rs @@ -0,0 +1,42 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A composable appender, logging and flushing asynchronously. + +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +use std::sync::Arc; + +use logforth_core::Append; +use logforth_core::kv; +use logforth_core::record::RecordOwned; + +mod append; +mod state; +mod worker; + +pub use self::append::AsyncBuilder; +pub use self::append::Asynchronous; +pub use self::append::Overflow; + +enum Task { + Log { + appends: Arc<[Box]>, + record: Box, + diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, + }, + Flush { + appends: Arc<[Box]>, + }, +} diff --git a/appenders/asynchronous/src/state.rs b/appenders/asynchronous/src/state.rs new file mode 100644 index 0000000..5d9dba6 --- /dev/null +++ b/appenders/asynchronous/src/state.rs @@ -0,0 +1,86 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::thread::JoinHandle; + +use arc_swap::ArcSwapOption; +use crossbeam_channel::Sender; +use logforth_core::Error; + +use crate::Task; +use crate::append::Overflow; + +#[derive(Debug)] +pub(crate) struct AppendState(ArcSwapOption); + +#[derive(Debug)] +struct State { + sender: Sender, + handle: JoinHandle<()>, +} + +impl AppendState { + pub(crate) fn new(sender: Sender, handle: JoinHandle<()>) -> Self { + let state = State { sender, handle }; + Self(ArcSwapOption::from(Some(Arc::new(state)))) + } + + pub(crate) fn send_task(&self, task: Task, overflow: Overflow) -> Result<(), Error> { + let state = self.0.load(); + // SAFETY: state is always Some before dropped. + let state = state.as_ref().unwrap(); + let sender = &state.sender; + + match overflow { + Overflow::Block => sender.send(task).map_err(|err| { + Error::new(match err.0 { + Task::Log { .. } => "failed to send log task to asynchronous appender", + Task::Flush { .. } => "failed to send flush task to asynchronous appender", + }) + }), + Overflow::DropIncoming => match sender.try_send(task) { + Ok(()) => Ok(()), + Err(crossbeam_channel::TrySendError::Full(_)) => Ok(()), + Err(crossbeam_channel::TrySendError::Disconnected(task)) => { + Err(Error::new(match task { + Task::Log { .. } => "failed to send log task to asynchronous appender", + Task::Flush { .. } => "failed to send flush task to asynchronous appender", + })) + } + }, + } + } + + pub(crate) fn destroy(&self) { + if let Some(state) = self.0.swap(None) { + // SAFETY: state has always one strong count before swapped. + let State { sender, handle } = Arc::into_inner(state).unwrap(); + + // drop our sender, threads will break the loop after receiving and processing + drop(sender); + + // wait for the thread to finish + handle + .join() + .expect("failed to join asynchronous appender thread"); + } + } +} + +impl Drop for AppendState { + fn drop(&mut self) { + self.destroy(); + } +} diff --git a/appenders/asynchronous/src/worker.rs b/appenders/asynchronous/src/worker.rs new file mode 100644 index 0000000..4670fb9 --- /dev/null +++ b/appenders/asynchronous/src/worker.rs @@ -0,0 +1,83 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crossbeam_channel::Receiver; +use logforth_core::Diagnostic; +use logforth_core::Error; +use logforth_core::ErrorSink; +use logforth_core::kv; +use logforth_core::kv::Visitor; + +use crate::Task; + +pub(crate) struct Worker { + receiver: Receiver, + error_sink: Box, +} + +impl Worker { + pub(crate) fn new(receiver: Receiver, error_sink: Box) -> Self { + Self { + receiver, + error_sink, + } + } + + pub(crate) fn run(self) { + let Self { + receiver, + error_sink, + } = self; + + while let Ok(task) = receiver.recv() { + match task { + Task::Log { + appends, + record, + diags, + } => { + let diags: Vec> = vec![Box::new(OwnedDiagnostic(diags))]; + let diags = diags.as_slice(); + let record = record.as_record(); + for append in appends.iter() { + if let Err(err) = append.append(&record, diags) { + let err = Error::new("failed to append record").set_source(err); + error_sink.sink(&err); + } + } + } + Task::Flush { appends } => { + for append in appends.iter() { + if let Err(err) = append.flush() { + let err = Error::new("failed to flush").set_source(err); + error_sink.sink(&err); + } + } + } + } + } + } +} + +#[derive(Debug)] +struct OwnedDiagnostic(Vec<(kv::KeyOwned, kv::ValueOwned)>); + +impl Diagnostic for OwnedDiagnostic { + fn visit(&self, visitor: &mut dyn Visitor) -> Result<(), Error> { + for (key, value) in &self.0 { + visitor.visit(key.by_ref(), value.by_ref())?; + } + Ok(()) + } +} diff --git a/appenders/file/src/README.md b/appenders/file/README.md similarity index 100% rename from appenders/file/src/README.md rename to appenders/file/README.md diff --git a/appenders/journald/src/README.md b/appenders/journald/README.md similarity index 100% rename from appenders/journald/src/README.md rename to appenders/journald/README.md diff --git a/logforth/Cargo.toml b/logforth/Cargo.toml index b56eb36..a9bf650 100644 --- a/logforth/Cargo.toml +++ b/logforth/Cargo.toml @@ -41,6 +41,7 @@ starter-log = ["bridge-log", "append-file", "layout-text", "layout-json"] bridge-log = ["dep:logforth-bridge-log"] # Appenders +append-asynchronous = ["dep:logforth-append-asynchronous"] append-fastrace = ["dep:logforth-append-fastrace"] append-file = ["dep:logforth-append-file"] append-journald = ["dep:logforth-append-journald"] @@ -64,6 +65,7 @@ rustls = ["logforth-append-syslog?/rustls"] logforth-core = { workspace = true } # Optional dependencies +logforth-append-asynchronous = { workspace = true, optional = true } logforth-append-fastrace = { workspace = true, optional = true } logforth-append-file = { workspace = true, optional = true } logforth-append-journald = { workspace = true, optional = true } diff --git a/logforth/src/lib.rs b/logforth/src/lib.rs index bdfbf5c..57fda6a 100644 --- a/logforth/src/lib.rs +++ b/logforth/src/lib.rs @@ -71,6 +71,10 @@ pub use logforth_core::record; /// Dispatch log records to various targets. pub mod append { + #[cfg(feature = "append-asynchronous")] + pub use logforth_append_asynchronous as asynchronous; + #[cfg(feature = "append-asynchronous")] + pub use logforth_append_asynchronous::Asynchronous; #[cfg(feature = "append-fastrace")] pub use logforth_append_fastrace::FastraceEvent; #[cfg(feature = "append-file")] From 17ade1b2eeee932b87c73f9a48cf8119b3373e67 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 6 Oct 2025 13:23:05 +0800 Subject: [PATCH 11/15] naming Signed-off-by: tison --- Cargo.toml | 2 +- appenders/{asynchronous => async}/Cargo.toml | 2 +- appenders/{asynchronous => async}/README.md | 2 +- .../{asynchronous => async}/src/append.rs | 57 ++++++++++++------- appenders/{asynchronous => async}/src/lib.rs | 20 +------ .../{asynchronous => async}/src/state.rs | 20 +++---- .../{asynchronous => async}/src/worker.rs | 2 +- core/src/lib.rs | 2 +- logforth/Cargo.toml | 4 +- logforth/src/lib.rs | 8 +-- 10 files changed, 56 insertions(+), 63 deletions(-) rename appenders/{asynchronous => async}/Cargo.toml (96%) rename appenders/{asynchronous => async}/README.md (91%) rename appenders/{asynchronous => async}/src/append.rs (77%) rename appenders/{asynchronous => async}/src/lib.rs (66%) rename appenders/{asynchronous => async}/src/state.rs (87%) rename appenders/{asynchronous => async}/src/worker.rs (99%) diff --git a/Cargo.toml b/Cargo.toml index 389a225..02f8401 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ rust-version = "1.85.0" [workspace.dependencies] # Workspace dependencies -logforth-append-asynchronous = { version = "0.2.0", path = "appenders/asynchronous" } +logforth-append-async = { version = "0.2.0", path = "appenders/async" } logforth-append-fastrace = { version = "0.2.0", path = "appenders/fastrace" } logforth-append-file = { version = "0.2.0", path = "appenders/file" } logforth-append-journald = { version = "0.2.0", path = "appenders/journald" } diff --git a/appenders/asynchronous/Cargo.toml b/appenders/async/Cargo.toml similarity index 96% rename from appenders/asynchronous/Cargo.toml rename to appenders/async/Cargo.toml index 63b7ad8..4124d1b 100644 --- a/appenders/asynchronous/Cargo.toml +++ b/appenders/async/Cargo.toml @@ -13,7 +13,7 @@ # limitations under the License. [package] -name = "logforth-append-asynchronous" +name = "logforth-append-async" version = "0.2.0" description = "Asynchronous appender for Logforth." diff --git a/appenders/asynchronous/README.md b/appenders/async/README.md similarity index 91% rename from appenders/asynchronous/README.md rename to appenders/async/README.md index 93e6769..39fa161 100644 --- a/appenders/asynchronous/README.md +++ b/appenders/async/README.md @@ -1,4 +1,4 @@ -# Asynchronous Appender +# Async Appender This appender is a remix of [spdlog-rs's AsyncPoolSink](https://docs.rs/spdlog-rs/*/spdlog/sink/struct.AsyncPoolSink.html), with several modifications to fit this crate's need: diff --git a/appenders/asynchronous/src/append.rs b/appenders/async/src/append.rs similarity index 77% rename from appenders/asynchronous/src/append.rs rename to appenders/async/src/append.rs index 0c56987..5db8408 100644 --- a/appenders/asynchronous/src/append.rs +++ b/appenders/async/src/append.rs @@ -21,20 +21,20 @@ use logforth_core::ErrorSink; use logforth_core::kv; use logforth_core::kv::Visitor; use logforth_core::record::Record; +use logforth_core::record::RecordOwned; -use crate::Task; -use crate::state::AppendState; +use crate::state::AsyncState; use crate::worker::Worker; /// A composable appender, logging and flushing asynchronously. #[derive(Debug)] -pub struct Asynchronous { +pub struct Async { appends: Arc<[Box]>, overflow: Overflow, - state: AppendState, + state: AsyncState, } -impl Append for Asynchronous { +impl Append for Async { fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error> { let mut diagnostics = vec![]; @@ -61,7 +61,7 @@ impl Append for Asynchronous { } } -/// A builder for configuring an asynchronous appender. +/// A builder for configuring an async appender. pub struct AsyncBuilder { thread_name: String, appends: Vec>, @@ -71,7 +71,7 @@ pub struct AsyncBuilder { } impl AsyncBuilder { - /// Create a new asynchronous appender builder. + /// Create a new async appender builder. pub fn new(thread_name: impl Into) -> AsyncBuilder { AsyncBuilder { thread_name: thread_name.into(), @@ -88,26 +88,32 @@ impl AsyncBuilder { self } - /// Set the overflow policy for this asynchronous appender. - pub fn overflow(mut self, overflow: Overflow) -> Self { - self.overflow = overflow; + /// Set the overflow policy to block when the buffer is full. + pub fn overflow_block(mut self) -> Self { + self.overflow = Overflow::Block; self } - /// Set the error sink for this asynchronous appender. + /// Set the overflow policy to drop incoming messages when the buffer is full. + pub fn overflow_drop_incoming(mut self) -> Self { + self.overflow = Overflow::DropIncoming; + self + } + + /// Set the error sink for this async appender. pub fn error_sink(mut self, error_sink: impl Into>) -> Self { self.error_sink = error_sink.into(); self } - /// Add an appender to this asynchronous appender. + /// Add an appender to this async appender. pub fn append(mut self, append: impl Into>) -> Self { self.appends.push(append.into()); self } - /// Build the asynchronous appender. - pub fn build(self) -> Asynchronous { + /// Build the async appender. + pub fn build(self) -> Async { let Self { thread_name, appends, @@ -127,10 +133,10 @@ impl AsyncBuilder { let thread_handle = std::thread::Builder::new() .name(thread_name) .spawn(move || worker.run()) - .expect("failed to spawn asynchronous appender thread"); - let state = AppendState::new(sender, thread_handle); + .expect("failed to spawn async appender thread"); + let state = AsyncState::new(sender, thread_handle); - Asynchronous { + Async { appends, overflow, state, @@ -138,13 +144,20 @@ impl AsyncBuilder { } } -/// Overflow policy for [`Asynchronous`]. -/// -/// When the channel is full, an incoming operation is handled according to the -/// specified policy. +pub(crate) enum Task { + Log { + appends: Arc<[Box]>, + record: Box, + diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, + }, + Flush { + appends: Arc<[Box]>, + }, +} + #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] #[non_exhaustive] -pub enum Overflow { +pub(crate) enum Overflow { /// Blocks until the channel is not full. Block, /// Drops the incoming operation. diff --git a/appenders/asynchronous/src/lib.rs b/appenders/async/src/lib.rs similarity index 66% rename from appenders/asynchronous/src/lib.rs rename to appenders/async/src/lib.rs index cee601c..cdd9715 100644 --- a/appenders/asynchronous/src/lib.rs +++ b/appenders/async/src/lib.rs @@ -16,27 +16,9 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] -use std::sync::Arc; - -use logforth_core::Append; -use logforth_core::kv; -use logforth_core::record::RecordOwned; - mod append; mod state; mod worker; +pub use self::append::Async; pub use self::append::AsyncBuilder; -pub use self::append::Asynchronous; -pub use self::append::Overflow; - -enum Task { - Log { - appends: Arc<[Box]>, - record: Box, - diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, - }, - Flush { - appends: Arc<[Box]>, - }, -} diff --git a/appenders/asynchronous/src/state.rs b/appenders/async/src/state.rs similarity index 87% rename from appenders/asynchronous/src/state.rs rename to appenders/async/src/state.rs index 5d9dba6..7ef9146 100644 --- a/appenders/asynchronous/src/state.rs +++ b/appenders/async/src/state.rs @@ -19,11 +19,11 @@ use arc_swap::ArcSwapOption; use crossbeam_channel::Sender; use logforth_core::Error; -use crate::Task; use crate::append::Overflow; +use crate::append::Task; #[derive(Debug)] -pub(crate) struct AppendState(ArcSwapOption); +pub(crate) struct AsyncState(ArcSwapOption); #[derive(Debug)] struct State { @@ -31,7 +31,7 @@ struct State { handle: JoinHandle<()>, } -impl AppendState { +impl AsyncState { pub(crate) fn new(sender: Sender, handle: JoinHandle<()>) -> Self { let state = State { sender, handle }; Self(ArcSwapOption::from(Some(Arc::new(state)))) @@ -46,8 +46,8 @@ impl AppendState { match overflow { Overflow::Block => sender.send(task).map_err(|err| { Error::new(match err.0 { - Task::Log { .. } => "failed to send log task to asynchronous appender", - Task::Flush { .. } => "failed to send flush task to asynchronous appender", + Task::Log { .. } => "failed to send log task to async appender", + Task::Flush { .. } => "failed to send flush task to async appender", }) }), Overflow::DropIncoming => match sender.try_send(task) { @@ -55,8 +55,8 @@ impl AppendState { Err(crossbeam_channel::TrySendError::Full(_)) => Ok(()), Err(crossbeam_channel::TrySendError::Disconnected(task)) => { Err(Error::new(match task { - Task::Log { .. } => "failed to send log task to asynchronous appender", - Task::Flush { .. } => "failed to send flush task to asynchronous appender", + Task::Log { .. } => "failed to send log task to async appender", + Task::Flush { .. } => "failed to send flush task to async appender", })) } }, @@ -72,14 +72,12 @@ impl AppendState { drop(sender); // wait for the thread to finish - handle - .join() - .expect("failed to join asynchronous appender thread"); + handle.join().expect("failed to join async appender thread"); } } } -impl Drop for AppendState { +impl Drop for AsyncState { fn drop(&mut self) { self.destroy(); } diff --git a/appenders/asynchronous/src/worker.rs b/appenders/async/src/worker.rs similarity index 99% rename from appenders/asynchronous/src/worker.rs rename to appenders/async/src/worker.rs index 4670fb9..603d64d 100644 --- a/appenders/asynchronous/src/worker.rs +++ b/appenders/async/src/worker.rs @@ -19,7 +19,7 @@ use logforth_core::ErrorSink; use logforth_core::kv; use logforth_core::kv::Visitor; -use crate::Task; +use crate::append::Task; pub(crate) struct Worker { receiver: Receiver, diff --git a/core/src/lib.rs b/core/src/lib.rs index f6ee5c1..0716ae3 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -28,7 +28,7 @@ pub use self::diagnostic::Diagnostic; pub use self::filter::Filter; pub use self::layout::Layout; -pub mod error; +mod error; pub use self::error::*; mod logger; diff --git a/logforth/Cargo.toml b/logforth/Cargo.toml index a9bf650..4ffd15a 100644 --- a/logforth/Cargo.toml +++ b/logforth/Cargo.toml @@ -41,7 +41,7 @@ starter-log = ["bridge-log", "append-file", "layout-text", "layout-json"] bridge-log = ["dep:logforth-bridge-log"] # Appenders -append-asynchronous = ["dep:logforth-append-asynchronous"] +append-async = ["dep:logforth-append-async"] append-fastrace = ["dep:logforth-append-fastrace"] append-file = ["dep:logforth-append-file"] append-journald = ["dep:logforth-append-journald"] @@ -65,7 +65,7 @@ rustls = ["logforth-append-syslog?/rustls"] logforth-core = { workspace = true } # Optional dependencies -logforth-append-asynchronous = { workspace = true, optional = true } +logforth-append-async = { workspace = true, optional = true } logforth-append-fastrace = { workspace = true, optional = true } logforth-append-file = { workspace = true, optional = true } logforth-append-journald = { workspace = true, optional = true } diff --git a/logforth/src/lib.rs b/logforth/src/lib.rs index 57fda6a..6a57e3e 100644 --- a/logforth/src/lib.rs +++ b/logforth/src/lib.rs @@ -71,10 +71,10 @@ pub use logforth_core::record; /// Dispatch log records to various targets. pub mod append { - #[cfg(feature = "append-asynchronous")] - pub use logforth_append_asynchronous as asynchronous; - #[cfg(feature = "append-asynchronous")] - pub use logforth_append_asynchronous::Asynchronous; + #[cfg(feature = "append-async")] + pub use logforth_append_async as asynchronous; // `async` is a keyword + #[cfg(feature = "append-async")] + pub use logforth_append_async::Async; #[cfg(feature = "append-fastrace")] pub use logforth_append_fastrace::FastraceEvent; #[cfg(feature = "append-file")] From 67b7bad34390db5cb5594c177b49264f5770aea4 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 6 Oct 2025 14:35:49 +0800 Subject: [PATCH 12/15] add trap Signed-off-by: tison --- appenders/async/src/append.rs | 46 ++++++++--------------------------- appenders/async/src/lib.rs | 25 +++++++++++++++++++ appenders/async/src/state.rs | 4 +-- appenders/async/src/worker.rs | 22 ++++++----------- appenders/file/src/append.rs | 19 +++++++++++++++ appenders/file/src/rolling.rs | 30 ++++++++++++++++++++--- core/src/error.rs | 14 ----------- core/src/lib.rs | 2 ++ core/src/trap/default.rs | 18 ++++++++++++++ core/src/trap/mod.rs | 35 ++++++++++++++++++++++++++ logforth/src/lib.rs | 5 ++++ 11 files changed, 150 insertions(+), 70 deletions(-) create mode 100644 core/src/trap/default.rs create mode 100644 core/src/trap/mod.rs diff --git a/appenders/async/src/append.rs b/appenders/async/src/append.rs index 5db8408..8085064 100644 --- a/appenders/async/src/append.rs +++ b/appenders/async/src/append.rs @@ -17,12 +17,14 @@ use std::sync::Arc; use logforth_core::Append; use logforth_core::Diagnostic; use logforth_core::Error; -use logforth_core::ErrorSink; +use logforth_core::Trap; use logforth_core::kv; use logforth_core::kv::Visitor; use logforth_core::record::Record; -use logforth_core::record::RecordOwned; +use logforth_core::trap::DefaultTrap; +use crate::Overflow; +use crate::Task; use crate::state::AsyncState; use crate::worker::Worker; @@ -66,7 +68,7 @@ pub struct AsyncBuilder { thread_name: String, appends: Vec>, buffered_lines_limit: Option, - error_sink: Box, + trap: Box, overflow: Overflow, } @@ -77,7 +79,7 @@ impl AsyncBuilder { thread_name: thread_name.into(), appends: vec![], buffered_lines_limit: None, - error_sink: Box::new(PrintErrorSink), + trap: Box::new(DefaultTrap::default()), overflow: Overflow::Block, } } @@ -100,9 +102,9 @@ impl AsyncBuilder { self } - /// Set the error sink for this async appender. - pub fn error_sink(mut self, error_sink: impl Into>) -> Self { - self.error_sink = error_sink.into(); + /// Set the trap for this async appender. + pub fn trap(mut self, trap: impl Into>) -> Self { + self.trap = trap.into(); self } @@ -118,7 +120,7 @@ impl AsyncBuilder { thread_name, appends, buffered_lines_limit, - error_sink, + trap: error_sink, overflow, } = self; @@ -144,34 +146,6 @@ impl AsyncBuilder { } } -pub(crate) enum Task { - Log { - appends: Arc<[Box]>, - record: Box, - diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, - }, - Flush { - appends: Arc<[Box]>, - }, -} - -#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] -#[non_exhaustive] -pub(crate) enum Overflow { - /// Blocks until the channel is not full. - Block, - /// Drops the incoming operation. - DropIncoming, -} - -struct PrintErrorSink; - -impl ErrorSink for PrintErrorSink { - fn sink(&self, err: &Error) { - eprintln!("{err}"); - } -} - struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>); impl<'a> Visitor for DiagnosticCollector<'a> { diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs index cdd9715..0e4f61b 100644 --- a/appenders/async/src/lib.rs +++ b/appenders/async/src/lib.rs @@ -16,9 +16,34 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] +use std::sync::Arc; + +use logforth_core::Append; +use logforth_core::kv; +use logforth_core::record::RecordOwned; + mod append; mod state; mod worker; pub use self::append::Async; pub use self::append::AsyncBuilder; + +enum Task { + Log { + appends: Arc<[Box]>, + record: Box, + diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, + }, + Flush { + appends: Arc<[Box]>, + }, +} + +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +enum Overflow { + /// Blocks until the channel is not full. + Block, + /// Drops the incoming operation. + DropIncoming, +} diff --git a/appenders/async/src/state.rs b/appenders/async/src/state.rs index 7ef9146..7de02bf 100644 --- a/appenders/async/src/state.rs +++ b/appenders/async/src/state.rs @@ -19,8 +19,8 @@ use arc_swap::ArcSwapOption; use crossbeam_channel::Sender; use logforth_core::Error; -use crate::append::Overflow; -use crate::append::Task; +use crate::Overflow; +use crate::Task; #[derive(Debug)] pub(crate) struct AsyncState(ArcSwapOption); diff --git a/appenders/async/src/worker.rs b/appenders/async/src/worker.rs index 603d64d..5f496af 100644 --- a/appenders/async/src/worker.rs +++ b/appenders/async/src/worker.rs @@ -15,30 +15,24 @@ use crossbeam_channel::Receiver; use logforth_core::Diagnostic; use logforth_core::Error; -use logforth_core::ErrorSink; +use logforth_core::Trap; use logforth_core::kv; use logforth_core::kv::Visitor; -use crate::append::Task; +use crate::Task; pub(crate) struct Worker { receiver: Receiver, - error_sink: Box, + trap: Box, } impl Worker { - pub(crate) fn new(receiver: Receiver, error_sink: Box) -> Self { - Self { - receiver, - error_sink, - } + pub(crate) fn new(receiver: Receiver, trap: Box) -> Self { + Self { receiver, trap } } pub(crate) fn run(self) { - let Self { - receiver, - error_sink, - } = self; + let Self { receiver, trap } = self; while let Ok(task) = receiver.recv() { match task { @@ -53,7 +47,7 @@ impl Worker { for append in appends.iter() { if let Err(err) = append.append(&record, diags) { let err = Error::new("failed to append record").set_source(err); - error_sink.sink(&err); + trap.trap(&err); } } } @@ -61,7 +55,7 @@ impl Worker { for append in appends.iter() { if let Err(err) = append.flush() { let err = Error::new("failed to flush").set_source(err); - error_sink.sink(&err); + trap.trap(&err); } } } diff --git a/appenders/file/src/append.rs b/appenders/file/src/append.rs index 3aa1e8c..150f379 100644 --- a/appenders/file/src/append.rs +++ b/appenders/file/src/append.rs @@ -21,6 +21,7 @@ use std::sync::MutexGuard; use logforth_core::Diagnostic; use logforth_core::Error; use logforth_core::Layout; +use logforth_core::Trap; use logforth_core::append::Append; use logforth_core::layout::PlainTextLayout; use logforth_core::record::Record; @@ -77,6 +78,24 @@ impl FileBuilder { self } + /// Set the trap for handling errors during logging. + /// + /// Default to [`DefaultTrap`]. + /// + /// # Examples + /// + /// ``` + /// use logforth_append_file::FileBuilder; + /// use logforth_core::trap::DefaultTrap; + /// + /// let builder = FileBuilder::new("my_service", "my_app"); + /// builder.trap(DefaultTrap::default()); + /// ``` + pub fn trap(mut self, trap: impl Into>) -> Self { + self.builder = self.builder.trap(trap); + self + } + /// Set the rotation strategy to roll over log files minutely. pub fn rollover_minutely(mut self) -> Self { self.builder = self.builder.rotation(Rotation::Minutely); diff --git a/appenders/file/src/rolling.rs b/appenders/file/src/rolling.rs index 301c464..a26fd11 100644 --- a/appenders/file/src/rolling.rs +++ b/appenders/file/src/rolling.rs @@ -26,6 +26,8 @@ use std::str::FromStr; use jiff::Zoned; use jiff::civil::DateTime; use logforth_core::Error; +use logforth_core::Trap; +use logforth_core::trap::DefaultTrap; use crate::clock::Clock; use crate::rotation::Rotation; @@ -40,7 +42,8 @@ pub struct RollingFileWriter { impl Drop for RollingFileWriter { fn drop(&mut self) { if let Err(err) = self.writer.flush() { - eprintln!("failed to flush file writer on dropped: {err}"); + let err = Error::new("failed to flush file writer on dropped").set_source(err); + self.state.trap.trap(&err); } } } @@ -87,6 +90,7 @@ pub struct RollingFileWriterBuilder { max_size: Option, max_files: Option, clock: Clock, + trap: Box, } impl RollingFileWriterBuilder { @@ -101,9 +105,16 @@ impl RollingFileWriterBuilder { max_size: None, max_files: None, clock: Clock::DefaultClock, + trap: Box::new(DefaultTrap::default()), } } + /// Set the trap for handling internal errors. + pub fn trap(mut self, trap: impl Into>) -> Self { + self.trap = trap.into(); + self + } + /// Set the rotation policy. #[must_use] pub fn rotation(mut self, rotation: Rotation) -> Self { @@ -153,6 +164,7 @@ impl RollingFileWriterBuilder { max_size, max_files, clock, + trap, } = self; if filename.is_empty() { @@ -167,6 +179,7 @@ impl RollingFileWriterBuilder { max_size, max_files, clock, + trap, )?; Ok(RollingFileWriter { state, writer }) @@ -206,9 +219,11 @@ struct State { max_size: Option, max_files: Option, clock: Clock, + trap: Box, } impl State { + #[allow(clippy::too_many_arguments)] fn new( rotation: Rotation, dir: impl AsRef, @@ -217,6 +232,7 @@ impl State { max_size: Option, max_files: Option, clock: Clock, + trap: Box, ) -> Result<(Self, File), Error> { let now = clock.now(); let log_dir = dir.as_ref().to_path_buf(); @@ -235,6 +251,7 @@ impl State { max_size, max_files, clock, + trap, }; let files = { @@ -433,7 +450,8 @@ impl State { if let Some(max_files) = self.max_files { if let Err(err) = self.delete_oldest_logs(max_files.get()) { - eprintln!("failed to delete oldest logs: {err}"); + let err = Error::new("failed to delete oldest logs").set_source(err); + self.trap.trap(&err); } } @@ -444,11 +462,15 @@ impl State { match self.rotate_log_writer(now) { Ok(new_file) => { if let Err(err) = file.flush() { - eprintln!("failed to flush previous writer: {err}"); + let err = Error::new("failed to flush previous writer").set_source(err); + self.trap.trap(&err); } *file = new_file; } - Err(err) => eprintln!("failed to rotate log writer: {err}"), + Err(err) => { + let err = Error::new("failed to rotate log writer").set_source(err); + self.trap.trap(&err); + } } } diff --git a/core/src/error.rs b/core/src/error.rs index ea0d701..c15aa9f 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -12,23 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Error type and error sink trait. - use std::fmt; use std::io; -/// An error sink to receive errors. -pub trait ErrorSink: Send + Sync + 'static { - /// Receive an error - fn sink(&self, err: &Error); -} - -impl From for Box { - fn from(value: T) -> Self { - Box::new(value) - } -} - /// The error struct of logforth. pub struct Error { message: String, diff --git a/core/src/lib.rs b/core/src/lib.rs index 0716ae3..e1e9860 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -22,11 +22,13 @@ pub mod filter; pub mod kv; pub mod layout; pub mod record; +pub mod trap; pub use self::append::Append; pub use self::diagnostic::Diagnostic; pub use self::filter::Filter; pub use self::layout::Layout; +pub use self::trap::Trap; mod error; pub use self::error::*; diff --git a/core/src/trap/default.rs b/core/src/trap/default.rs new file mode 100644 index 0000000..634ea0b --- /dev/null +++ b/core/src/trap/default.rs @@ -0,0 +1,18 @@ +use std::io; +use std::io::Write; + +use crate::Error; +use crate::trap::Trap; + +/// A default trap that sends errors to standard error if possible. +/// +/// If standard error is not available, it does nothing. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct DefaultTrap {} + +impl Trap for DefaultTrap { + fn trap(&self, err: &Error) { + let _ = writeln!(io::stderr(), "{err}"); + } +} diff --git a/core/src/trap/mod.rs b/core/src/trap/mod.rs new file mode 100644 index 0000000..ffdca2f --- /dev/null +++ b/core/src/trap/mod.rs @@ -0,0 +1,35 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Traps for processing errors. + +use core::fmt; + +use crate::Error; + +mod default; + +pub use self::default::DefaultTrap; + +/// A trap for processing errors. +pub trait Trap: fmt::Debug + Send + Sync + 'static { + /// Process an error. + fn trap(&self, err: &Error); +} + +impl From for Box { + fn from(value: T) -> Self { + Box::new(value) + } +} diff --git a/logforth/src/lib.rs b/logforth/src/lib.rs index 6a57e3e..9cc1316 100644 --- a/logforth/src/lib.rs +++ b/logforth/src/lib.rs @@ -145,5 +145,10 @@ pub mod layout { pub use logforth_layout_text::TextLayout; } +/// Traps for processing errors. +pub mod trap { + pub use logforth_core::trap::*; +} + #[cfg(feature = "bridge-log")] pub mod starter_log; From 2555c6247285cb4b9e8a27bcd673a038446e4b59 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 6 Oct 2025 15:24:16 +0800 Subject: [PATCH 13/15] atexit Signed-off-by: tison --- appenders/async/src/append.rs | 28 ++++++-- appenders/async/src/worker.rs | 6 +- appenders/file/src/append.rs | 2 +- appenders/file/src/rolling.rs | 2 +- core/src/append/mod.rs | 5 ++ core/src/logger/log_impl.rs | 117 ++++++++++++++++++++++++++++++---- core/src/trap/default.rs | 14 ++++ 7 files changed, 152 insertions(+), 22 deletions(-) diff --git a/appenders/async/src/append.rs b/appenders/async/src/append.rs index 8085064..7e5b45b 100644 --- a/appenders/async/src/append.rs +++ b/appenders/async/src/append.rs @@ -34,6 +34,7 @@ pub struct Async { appends: Arc<[Box]>, overflow: Overflow, state: AsyncState, + trap: Arc, } impl Append for Async { @@ -61,6 +62,23 @@ impl Append for Async { }; self.state.send_task(task, overflow) } + + fn exit(&self) -> Result<(), Error> { + // https://github.com/SpriteOvO/spdlog-rs/issues/64 + // + // If the program is tearing down, this will be the final flush. `crossbeam` + // uses thread-local internally, which is not supported in `atexit` callback. + // This can be bypassed by flushing sinks directly on the current thread, but + // before we do that we have to destroy the thread pool to ensure that any + // pending log tasks are completed. + self.state.destroy(); + for append in self.appends.iter() { + if let Err(err) = append.exit() { + self.trap.trap(&err); + } + } + Ok(()) + } } /// A builder for configuring an async appender. @@ -68,7 +86,7 @@ pub struct AsyncBuilder { thread_name: String, appends: Vec>, buffered_lines_limit: Option, - trap: Box, + trap: Arc, overflow: Overflow, } @@ -79,7 +97,7 @@ impl AsyncBuilder { thread_name: thread_name.into(), appends: vec![], buffered_lines_limit: None, - trap: Box::new(DefaultTrap::default()), + trap: Arc::new(DefaultTrap::default()), overflow: Overflow::Block, } } @@ -104,6 +122,7 @@ impl AsyncBuilder { /// Set the trap for this async appender. pub fn trap(mut self, trap: impl Into>) -> Self { + let trap = trap.into(); self.trap = trap.into(); self } @@ -120,7 +139,7 @@ impl AsyncBuilder { thread_name, appends, buffered_lines_limit, - trap: error_sink, + trap, overflow, } = self; @@ -131,7 +150,7 @@ impl AsyncBuilder { None => crossbeam_channel::unbounded(), }; - let worker = Worker::new(receiver, error_sink); + let worker = Worker::new(receiver, trap.clone()); let thread_handle = std::thread::Builder::new() .name(thread_name) .spawn(move || worker.run()) @@ -142,6 +161,7 @@ impl AsyncBuilder { appends, overflow, state, + trap, } } } diff --git a/appenders/async/src/worker.rs b/appenders/async/src/worker.rs index 5f496af..944fccb 100644 --- a/appenders/async/src/worker.rs +++ b/appenders/async/src/worker.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use crossbeam_channel::Receiver; use logforth_core::Diagnostic; use logforth_core::Error; @@ -23,11 +25,11 @@ use crate::Task; pub(crate) struct Worker { receiver: Receiver, - trap: Box, + trap: Arc, } impl Worker { - pub(crate) fn new(receiver: Receiver, trap: Box) -> Self { + pub(crate) fn new(receiver: Receiver, trap: Arc) -> Self { Self { receiver, trap } } diff --git a/appenders/file/src/append.rs b/appenders/file/src/append.rs index 150f379..a7cd6af 100644 --- a/appenders/file/src/append.rs +++ b/appenders/file/src/append.rs @@ -78,7 +78,7 @@ impl FileBuilder { self } - /// Set the trap for handling errors during logging. + /// Set the trap for the file writer. /// /// Default to [`DefaultTrap`]. /// diff --git a/appenders/file/src/rolling.rs b/appenders/file/src/rolling.rs index a26fd11..7b2c580 100644 --- a/appenders/file/src/rolling.rs +++ b/appenders/file/src/rolling.rs @@ -109,7 +109,7 @@ impl RollingFileWriterBuilder { } } - /// Set the trap for handling internal errors. + /// Set the trap for the rolling file writer. pub fn trap(mut self, trap: impl Into>) -> Self { self.trap = trap.into(); self diff --git a/core/src/append/mod.rs b/core/src/append/mod.rs index 573e80a..ba7d20e 100644 --- a/core/src/append/mod.rs +++ b/core/src/append/mod.rs @@ -34,6 +34,11 @@ pub trait Append: fmt::Debug + Send + Sync + 'static { /// Flush any buffered records. fn flush(&self) -> Result<(), Error>; + + /// Perform any cleanup work before the program exits. + fn exit(&self) -> Result<(), Error> { + self.flush() + } } impl From for Box { diff --git a/core/src/logger/log_impl.rs b/core/src/logger/log_impl.rs index ddfffc8..e509c54 100644 --- a/core/src/logger/log_impl.rs +++ b/core/src/logger/log_impl.rs @@ -13,6 +13,8 @@ // limitations under the License. use std::io::Write; +use std::panic; +use std::sync::Once; use std::sync::OnceLock; use crate::Append; @@ -38,7 +40,49 @@ pub fn default_logger() -> &'static Logger { /// If a default logger has already been set, the function returns the provided logger /// as an error. pub fn set_default_logger(logger: Logger) -> Result<(), Logger> { - DEFAULT_LOGGER.set(logger) + static ATEXIT_CALLBACK: Once = Once::new(); + + DEFAULT_LOGGER.set(logger)?; + ATEXIT_CALLBACK.call_once(flush_default_logger_at_exit); + Ok(()) +} + +fn flush_default_logger_at_exit() { + // Rust never calls `drop` for static variables. + // + // Setting up an exit handler gives us a chance to flush the default logger + // once at the program exit, thus we don't lose the last logs. + + extern "C" fn handler() { + if let Some(default_logger) = DEFAULT_LOGGER.get() { + default_logger.exit(); + } + } + + #[must_use] + fn try_atexit() -> bool { + use std::os::raw::c_int; + + unsafe extern "C" { + fn atexit(cb: extern "C" fn()) -> c_int; + } + + (unsafe { atexit(handler) }) == 0 + } + + fn hook_panic() { + let previous_hook = panic::take_hook(); + + panic::set_hook(Box::new(move |info| { + handler(); + previous_hook(info); + })); + } + + if !try_atexit() { + // if we failed to register the `atexit` handler, at least we hook into panic + hook_panic(); + } } /// A logger that dispatches log records to one or more dispatcher. @@ -64,8 +108,8 @@ impl Logger { /// Log the [`Record`]. pub fn log(&self, record: &Record) { for dispatch in &self.dispatches { - if let Err(err) = dispatch.log(record) { - handle_log_error(record, err); + for err in dispatch.log(record) { + handle_log_error(record, &err); } } } @@ -73,8 +117,17 @@ impl Logger { /// Flush any buffered records. pub fn flush(&self) { for dispatch in &self.dispatches { - if let Err(err) = dispatch.flush() { - handle_flush_error(err); + for err in dispatch.flush() { + handle_flush_error(&err); + } + } + } + + /// Perform any cleanup work before the program exits. + pub fn exit(&self) { + for dispatch in &self.dispatches { + for err in dispatch.exit() { + handle_exit_error(&err); } } } @@ -126,32 +179,48 @@ impl Dispatch { true } - fn log(&self, record: &Record) -> Result<(), Error> { + fn log(&self, record: &Record) -> Vec { let diagnostics = &self.diagnostics; for filter in &self.filters { match filter.matches(record, diagnostics) { - FilterResult::Reject => return Ok(()), + FilterResult::Reject => return vec![], FilterResult::Accept => break, FilterResult::Neutral => {} } } + let mut errors = vec![]; for append in &self.appends { - append.append(record, diagnostics)?; + if let Err(err) = append.append(record, diagnostics) { + errors.push(err); + } } - Ok(()) + errors } - fn flush(&self) -> Result<(), Error> { + fn flush(&self) -> Vec { + let mut errors = vec![]; for append in &self.appends { - append.flush()?; + if let Err(err) = append.flush() { + errors.push(err); + } } - Ok(()) + errors + } + + fn exit(&self) -> Vec { + let mut errors = vec![]; + for append in &self.appends { + if let Err(err) = append.exit() { + errors.push(err); + } + } + errors } } -fn handle_log_error(record: &Record, error: Error) { +fn handle_log_error(record: &Record, error: &Error) { let Err(fallback_error) = write!( std::io::stderr(), r###" @@ -182,7 +251,7 @@ Error performing stderr logging after error occurred during regular logging. ); } -fn handle_flush_error(error: Error) { +fn handle_flush_error(error: &Error) { let Err(fallback_error) = write!( std::io::stderr(), r###" @@ -201,3 +270,23 @@ Error performing stderr logging after error occurred during regular flush. "###, ); } + +fn handle_exit_error(error: &Error) { + let Err(fallback_error) = write!( + std::io::stderr(), + r###" +Error perform exit. + Error: {error:?} +"###, + ) else { + return; + }; + + panic!( + r###" +Error performing stderr logging after error occurred during atexit. + Error: {error:?} + Fallback error: {fallback_error} +"###, + ); +} diff --git a/core/src/trap/default.rs b/core/src/trap/default.rs index 634ea0b..d69d49d 100644 --- a/core/src/trap/default.rs +++ b/core/src/trap/default.rs @@ -1,3 +1,17 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::io; use std::io::Write; From 49c2d11fd1b35352847c8f867201ba0db8b24bb1 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 6 Oct 2025 15:41:56 +0800 Subject: [PATCH 14/15] docs tests Signed-off-by: tison --- CHANGELOG.md | 2 ++ appenders/fastrace/src/lib.rs | 6 ++++++ core/src/append/mod.rs | 6 ++++++ 3 files changed, 14 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0199d5..b37e5f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,8 @@ All notable changes to this project will be documented in this file. ### New features * `PlainTextLayout` is added to support plain text format without any extra dependency. +* `Async` appender is added to support async logging with configurable buffer size and worker threads. +* `Trap` trait and a default `DefaultTrap` is added to support handling internal errors. ## [0.27.0] 2025-08-18 diff --git a/appenders/fastrace/src/lib.rs b/appenders/fastrace/src/lib.rs index 14bd22c..ef76f03 100644 --- a/appenders/fastrace/src/lib.rs +++ b/appenders/fastrace/src/lib.rs @@ -73,6 +73,12 @@ impl Append for FastraceEvent { fastrace::flush(); Ok(()) } + + fn exit(&self) -> Result<(), Error> { + // fastrace flush accesses thread-local storage, + // which is not supported in atexit handlers + Ok(()) + } } impl Drop for FastraceEvent { diff --git a/core/src/append/mod.rs b/core/src/append/mod.rs index ba7d20e..ac6e796 100644 --- a/core/src/append/mod.rs +++ b/core/src/append/mod.rs @@ -36,6 +36,12 @@ pub trait Append: fmt::Debug + Send + Sync + 'static { fn flush(&self) -> Result<(), Error>; /// Perform any cleanup work before the program exits. + /// + /// Default to call `flush`. + /// + /// This is typically called within a global logger during program exits. + /// If it is not in a global logger, the drop glue should perform necessary + /// cleanup. fn exit(&self) -> Result<(), Error> { self.flush() } From e0640f6d3e9c75a40a86a587a9928f049edcaff2 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 6 Oct 2025 16:32:56 +0800 Subject: [PATCH 15/15] tests Signed-off-by: tison --- .github/workflows/ci.yml | 1 + .gitignore | 1 + appenders/fastrace/src/lib.rs | 6 -- logforth/Cargo.toml | 11 +++ logforth/examples/asynchronous.rs | 40 +++++++++ logforth/tests/global_async_sink.rs | 125 ++++++++++++++++++++++++++++ logforth/tests/recursive_logging.rs | 2 +- 7 files changed, 179 insertions(+), 7 deletions(-) create mode 100644 logforth/examples/asynchronous.rs create mode 100644 logforth/tests/global_async_sink.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84e6c15..d19eeb8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -89,6 +89,7 @@ jobs: cargo run --features="starter-log" --example json_stdout cargo run --features="starter-log" --example rolling_file cargo run --features="starter-log" --example single_file + cargo run --features="starter-log,append-async" --example asynchronous cargo run --features="starter-log,diagnostic-fastrace,layout-google-cloud-logging" --example google_cloud_logging cargo run --features="starter-log,append-fastrace,diagnostic-fastrace" --example fastrace diff --git a/.gitignore b/.gitignore index 1b72444..9979cd3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /Cargo.lock /target +/logs diff --git a/appenders/fastrace/src/lib.rs b/appenders/fastrace/src/lib.rs index ef76f03..4be7766 100644 --- a/appenders/fastrace/src/lib.rs +++ b/appenders/fastrace/src/lib.rs @@ -81,12 +81,6 @@ impl Append for FastraceEvent { } } -impl Drop for FastraceEvent { - fn drop(&mut self) { - fastrace::flush(); - } -} - struct KvCollector { kv: Vec<(String, String)>, } diff --git a/logforth/Cargo.toml b/logforth/Cargo.toml index 4ffd15a..08b0082 100644 --- a/logforth/Cargo.toml +++ b/logforth/Cargo.toml @@ -87,6 +87,17 @@ serde = { workspace = true } [lints] workspace = true +[[test]] +harness = false +name = "global_async_sink" +required-features = ["starter-log", "append-async"] + +[[example]] +doc-scrape-examples = true +name = "asynchronous" +path = "examples/asynchronous.rs" +required-features = ["starter-log", "append-async"] + [[example]] doc-scrape-examples = true name = "testing" diff --git a/logforth/examples/asynchronous.rs b/logforth/examples/asynchronous.rs new file mode 100644 index 0000000..427295a --- /dev/null +++ b/logforth/examples/asynchronous.rs @@ -0,0 +1,40 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! An example of logging to a single file with async combinator. + +use logforth::append::file::FileBuilder; +use logforth::layout::JsonLayout; +use logforth::record::LevelFilter; +use logforth_append_async::AsyncBuilder; + +fn main() { + let file = FileBuilder::new("logs", "my_app_async") + .filename_suffix("log") + .layout(JsonLayout::default()) + .build() + .unwrap(); + + let asynchronous = AsyncBuilder::new("logforth-async").append(file).build(); + + logforth::starter_log::builder() + .dispatch(|d| d.filter(LevelFilter::Trace).append(asynchronous)) + .apply(); + + log::error!("Hello single error!"); + log::warn!("Hello single warn!"); + log::info!("Hello single info!"); + log::debug!("Hello single debug!"); + log::trace!("Hello single trace!"); +} diff --git a/logforth/tests/global_async_sink.rs b/logforth/tests/global_async_sink.rs new file mode 100644 index 0000000..a8e42bd --- /dev/null +++ b/logforth/tests/global_async_sink.rs @@ -0,0 +1,125 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This case ensures that the asynchronous logger flushes correctly at program exit. + +// This refers to https://github.com/SpriteOvO/spdlog-rs/issues/64 + +use std::fmt::Write; +use std::os::raw::c_int; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use logforth_append_async::AsyncBuilder; +use logforth_core::Append; +use logforth_core::Diagnostic; +use logforth_core::Error; +use logforth_core::record::LevelFilter; +use logforth_core::record::Record; + +static IS_LOGGED: AtomicBool = AtomicBool::new(false); +static IS_FLUSHED: AtomicBool = AtomicBool::new(false); + +#[derive(Debug)] +struct SetFlags; + +impl Append for SetFlags { + fn append(&self, _: &Record, _: &[Box]) -> Result<(), Error> { + IS_LOGGED.store(true, Ordering::SeqCst); + Ok(()) + } + + fn flush(&self) -> Result<(), Error> { + // assert that the record has been logged before flushing + assert!(IS_LOGGED.load(Ordering::SeqCst)); + IS_FLUSHED.store(true, Ordering::SeqCst); + Ok(()) + } +} + +fn run_test() { + { + extern "C" fn check() { + // assert that `Async` appender in the default logger will be flushed correctly + // and will not panic. + assert!(IS_FLUSHED.load(Ordering::SeqCst)); + } + + // set up `atexit` to check the flag at the end of the program + unsafe extern "C" { + fn atexit(cb: extern "C" fn()) -> c_int; + } + + assert_eq!(unsafe { atexit(check) }, 0); + + let asynchronous = AsyncBuilder::new("async-appender").append(SetFlags).build(); + + logforth::starter_log::builder() + .dispatch(|d| d.filter(LevelFilter::Trace).append(asynchronous)) + .apply(); + } + + log::info!("hello async sink"); +} + +fn main() { + // This is a flaky test, it only has a certain probability of failing, + // so we run it multiple times to make sure it's really working properly. + { + let mut captured_output = String::new(); + let args = std::env::args().collect::>(); + + let is_parent = args.iter().all(|arg| arg != "child"); + if is_parent { + for i in 0..1000 { + let output = std::process::Command::new(&args[0]) + .arg("child") + .stderr(std::process::Stdio::piped()) + .output() + .unwrap(); + + let success = output.status.success(); + writeln!( + captured_output, + "Attempt #{i} = {}", + if success { "ok" } else { "failed!" } + ) + .unwrap(); + + if !success { + eprintln!("{captured_output}"); + + let stderr = String::from_utf8_lossy(&output.stderr).lines().fold( + String::new(), + |mut contents, line| { + writeln!(&mut contents, "> {line}").unwrap(); + contents + }, + ); + + eprintln!("stderr of the failed attempt:\n{stderr}"); + panic!("test failed"); + } + } + return; + } else { + assert_eq!(args[1], "child"); + } + } + + // Run the test after leaving the scope, so the main function ends + // without dropping additional variables, thus exiting faster. This + // should increase the probability of reproducing the error. + run_test(); +} diff --git a/logforth/tests/recursive_logging.rs b/logforth/tests/recursive_logging.rs index c77a2b8..8e979a2 100644 --- a/logforth/tests/recursive_logging.rs +++ b/logforth/tests/recursive_logging.rs @@ -14,7 +14,7 @@ //! This case ensures our impl does properly handle recursive logging. -#![cfg(feature = "append-file")] +#![cfg(feature = "starter-log")] use std::num::NonZeroUsize;