diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84e6c15..d19eeb8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -89,6 +89,7 @@ jobs: cargo run --features="starter-log" --example json_stdout cargo run --features="starter-log" --example rolling_file cargo run --features="starter-log" --example single_file + cargo run --features="starter-log,append-async" --example asynchronous cargo run --features="starter-log,diagnostic-fastrace,layout-google-cloud-logging" --example google_cloud_logging cargo run --features="starter-log,append-fastrace,diagnostic-fastrace" --example fastrace diff --git a/.gitignore b/.gitignore index 1b72444..9979cd3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /Cargo.lock /target +/logs diff --git a/CHANGELOG.md b/CHANGELOG.md index f0199d5..b37e5f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,8 @@ All notable changes to this project will be documented in this file. ### New features * `PlainTextLayout` is added to support plain text format without any extra dependency. +* `Async` appender is added to support async logging with configurable buffer size and worker threads. +* `Trap` trait and a default `DefaultTrap` is added to support handling internal errors. ## [0.27.0] 2025-08-18 diff --git a/Cargo.toml b/Cargo.toml index dacffa7..02f8401 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ members = [ "core", "logforth", - "appenders/*", "bridges/*", "diagnostics/*", @@ -35,22 +34,25 @@ rust-version = "1.85.0" [workspace.dependencies] # Workspace dependencies -logforth-append-fastrace = { version = "0.1.0", path = "appenders/fastrace" } -logforth-append-file = { version = "0.1.0", path = "appenders/file" } -logforth-append-journald = { version = "0.1.0", path = "appenders/journald" } -logforth-append-opentelemetry = { version = "0.1.0", path = "appenders/opentelemetry" } -logforth-append-syslog = { version = "0.1.0", path = "appenders/syslog" } -logforth-bridge-log = { version = "0.1.0", path = "bridges/log" } -logforth-core = { version = "0.1.0", path = "core" } -logforth-diagnostic-fastrace = { version = "0.1.0", path = "diagnostics/fastrace" } -logforth-layout-google-cloud-logging = { version = "0.1.0", path = "layouts/google-cloud-logging" } -logforth-layout-json = { version = "0.1.0", path = "layouts/json" } -logforth-layout-logfmt = { version = "0.1.0", path = "layouts/logfmt" } -logforth-layout-text = { version = "0.1.0", path = "layouts/text" } +logforth-append-async = { version = "0.2.0", path = "appenders/async" } +logforth-append-fastrace = { version = "0.2.0", path = "appenders/fastrace" } +logforth-append-file = { version = "0.2.0", path = "appenders/file" } +logforth-append-journald = { version = "0.2.0", path = "appenders/journald" } +logforth-append-opentelemetry = { version = "0.2.0", path = "appenders/opentelemetry" } +logforth-append-syslog = { version = "0.2.0", path = "appenders/syslog" } +logforth-bridge-log = { version = "0.2.0", path = "bridges/log" } +logforth-core = { version = "0.2.0", path = "core" } +logforth-diagnostic-fastrace = { version = "0.2.0", path = "diagnostics/fastrace" } +logforth-layout-google-cloud-logging = { version = "0.2.0", path = "layouts/google-cloud-logging" } +logforth-layout-json = { version = "0.2.0", path = "layouts/json" } +logforth-layout-logfmt = { version = "0.2.0", path = "layouts/logfmt" } +logforth-layout-text = { version = "0.2.0", path = "layouts/text" } # Crates.io dependencies anyhow = { version = "1.0" } +arc-swap = { version = "1.7.1" } colored = { version = "3.0" } +crossbeam-channel = { version = "0.5.15" } fastrace = { version = "0.7" } fasyslog = { version = "1.0.0" } insta = { version = "1.43.2" } diff --git a/appenders/async/Cargo.toml b/appenders/async/Cargo.toml new file mode 100644 index 0000000..4124d1b --- /dev/null +++ b/appenders/async/Cargo.toml @@ -0,0 +1,40 @@ +# Copyright 2024 FastLabs Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[package] +name = "logforth-append-async" +version = "0.2.0" + +description = "Asynchronous appender for Logforth." +keywords = ["logging", "log", "async"] + +categories.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[dependencies] +arc-swap = { workspace = true } +crossbeam-channel = { workspace = true } +logforth-core = { workspace = true } + +[lints] +workspace = true diff --git a/appenders/async/README.md b/appenders/async/README.md new file mode 100644 index 0000000..39fa161 --- /dev/null +++ b/appenders/async/README.md @@ -0,0 +1,5 @@ +# Async Appender + +This appender is a remix of [spdlog-rs's AsyncPoolSink](https://docs.rs/spdlog-rs/*/spdlog/sink/struct.AsyncPoolSink.html), with several modifications to fit this crate's need: + +* Instead of a thread pool, it uses a single background thread to drain the log queue. diff --git a/appenders/async/src/append.rs b/appenders/async/src/append.rs new file mode 100644 index 0000000..7e5b45b --- /dev/null +++ b/appenders/async/src/append.rs @@ -0,0 +1,176 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use logforth_core::Append; +use logforth_core::Diagnostic; +use logforth_core::Error; +use logforth_core::Trap; +use logforth_core::kv; +use logforth_core::kv::Visitor; +use logforth_core::record::Record; +use logforth_core::trap::DefaultTrap; + +use crate::Overflow; +use crate::Task; +use crate::state::AsyncState; +use crate::worker::Worker; + +/// A composable appender, logging and flushing asynchronously. +#[derive(Debug)] +pub struct Async { + appends: Arc<[Box]>, + overflow: Overflow, + state: AsyncState, + trap: Arc, +} + +impl Append for Async { + fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error> { + let mut diagnostics = vec![]; + + let mut collector = DiagnosticCollector(&mut diagnostics); + for d in diags { + d.visit(&mut collector)?; + } + + let overflow = self.overflow; + let task = Task::Log { + appends: self.appends.clone(), + record: Box::new(record.to_owned()), + diags: diagnostics, + }; + self.state.send_task(task, overflow) + } + + fn flush(&self) -> Result<(), Error> { + let overflow = self.overflow; + let task = Task::Flush { + appends: self.appends.clone(), + }; + self.state.send_task(task, overflow) + } + + fn exit(&self) -> Result<(), Error> { + // https://github.com/SpriteOvO/spdlog-rs/issues/64 + // + // If the program is tearing down, this will be the final flush. `crossbeam` + // uses thread-local internally, which is not supported in `atexit` callback. + // This can be bypassed by flushing sinks directly on the current thread, but + // before we do that we have to destroy the thread pool to ensure that any + // pending log tasks are completed. + self.state.destroy(); + for append in self.appends.iter() { + if let Err(err) = append.exit() { + self.trap.trap(&err); + } + } + Ok(()) + } +} + +/// A builder for configuring an async appender. +pub struct AsyncBuilder { + thread_name: String, + appends: Vec>, + buffered_lines_limit: Option, + trap: Arc, + overflow: Overflow, +} + +impl AsyncBuilder { + /// Create a new async appender builder. + pub fn new(thread_name: impl Into) -> AsyncBuilder { + AsyncBuilder { + thread_name: thread_name.into(), + appends: vec![], + buffered_lines_limit: None, + trap: Arc::new(DefaultTrap::default()), + overflow: Overflow::Block, + } + } + + /// Set the buffer size of pending messages. + pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option) -> Self { + self.buffered_lines_limit = buffered_lines_limit; + self + } + + /// Set the overflow policy to block when the buffer is full. + pub fn overflow_block(mut self) -> Self { + self.overflow = Overflow::Block; + self + } + + /// Set the overflow policy to drop incoming messages when the buffer is full. + pub fn overflow_drop_incoming(mut self) -> Self { + self.overflow = Overflow::DropIncoming; + self + } + + /// Set the trap for this async appender. + pub fn trap(mut self, trap: impl Into>) -> Self { + let trap = trap.into(); + self.trap = trap.into(); + self + } + + /// Add an appender to this async appender. + pub fn append(mut self, append: impl Into>) -> Self { + self.appends.push(append.into()); + self + } + + /// Build the async appender. + pub fn build(self) -> Async { + let Self { + thread_name, + appends, + buffered_lines_limit, + trap, + overflow, + } = self; + + let appends = appends.into_boxed_slice().into(); + + let (sender, receiver) = match buffered_lines_limit { + Some(limit) => crossbeam_channel::bounded(limit), + None => crossbeam_channel::unbounded(), + }; + + let worker = Worker::new(receiver, trap.clone()); + let thread_handle = std::thread::Builder::new() + .name(thread_name) + .spawn(move || worker.run()) + .expect("failed to spawn async appender thread"); + let state = AsyncState::new(sender, thread_handle); + + Async { + appends, + overflow, + state, + trap, + } + } +} + +struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>); + +impl<'a> Visitor for DiagnosticCollector<'a> { + fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> { + self.0.push((key.to_owned(), value.to_owned())); + Ok(()) + } +} diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs new file mode 100644 index 0000000..0e4f61b --- /dev/null +++ b/appenders/async/src/lib.rs @@ -0,0 +1,49 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A composable appender, logging and flushing asynchronously. + +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +use std::sync::Arc; + +use logforth_core::Append; +use logforth_core::kv; +use logforth_core::record::RecordOwned; + +mod append; +mod state; +mod worker; + +pub use self::append::Async; +pub use self::append::AsyncBuilder; + +enum Task { + Log { + appends: Arc<[Box]>, + record: Box, + diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, + }, + Flush { + appends: Arc<[Box]>, + }, +} + +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +enum Overflow { + /// Blocks until the channel is not full. + Block, + /// Drops the incoming operation. + DropIncoming, +} diff --git a/appenders/async/src/state.rs b/appenders/async/src/state.rs new file mode 100644 index 0000000..7de02bf --- /dev/null +++ b/appenders/async/src/state.rs @@ -0,0 +1,84 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::thread::JoinHandle; + +use arc_swap::ArcSwapOption; +use crossbeam_channel::Sender; +use logforth_core::Error; + +use crate::Overflow; +use crate::Task; + +#[derive(Debug)] +pub(crate) struct AsyncState(ArcSwapOption); + +#[derive(Debug)] +struct State { + sender: Sender, + handle: JoinHandle<()>, +} + +impl AsyncState { + pub(crate) fn new(sender: Sender, handle: JoinHandle<()>) -> Self { + let state = State { sender, handle }; + Self(ArcSwapOption::from(Some(Arc::new(state)))) + } + + pub(crate) fn send_task(&self, task: Task, overflow: Overflow) -> Result<(), Error> { + let state = self.0.load(); + // SAFETY: state is always Some before dropped. + let state = state.as_ref().unwrap(); + let sender = &state.sender; + + match overflow { + Overflow::Block => sender.send(task).map_err(|err| { + Error::new(match err.0 { + Task::Log { .. } => "failed to send log task to async appender", + Task::Flush { .. } => "failed to send flush task to async appender", + }) + }), + Overflow::DropIncoming => match sender.try_send(task) { + Ok(()) => Ok(()), + Err(crossbeam_channel::TrySendError::Full(_)) => Ok(()), + Err(crossbeam_channel::TrySendError::Disconnected(task)) => { + Err(Error::new(match task { + Task::Log { .. } => "failed to send log task to async appender", + Task::Flush { .. } => "failed to send flush task to async appender", + })) + } + }, + } + } + + pub(crate) fn destroy(&self) { + if let Some(state) = self.0.swap(None) { + // SAFETY: state has always one strong count before swapped. + let State { sender, handle } = Arc::into_inner(state).unwrap(); + + // drop our sender, threads will break the loop after receiving and processing + drop(sender); + + // wait for the thread to finish + handle.join().expect("failed to join async appender thread"); + } + } +} + +impl Drop for AsyncState { + fn drop(&mut self) { + self.destroy(); + } +} diff --git a/appenders/async/src/worker.rs b/appenders/async/src/worker.rs new file mode 100644 index 0000000..944fccb --- /dev/null +++ b/appenders/async/src/worker.rs @@ -0,0 +1,79 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crossbeam_channel::Receiver; +use logforth_core::Diagnostic; +use logforth_core::Error; +use logforth_core::Trap; +use logforth_core::kv; +use logforth_core::kv::Visitor; + +use crate::Task; + +pub(crate) struct Worker { + receiver: Receiver, + trap: Arc, +} + +impl Worker { + pub(crate) fn new(receiver: Receiver, trap: Arc) -> Self { + Self { receiver, trap } + } + + pub(crate) fn run(self) { + let Self { receiver, trap } = self; + + while let Ok(task) = receiver.recv() { + match task { + Task::Log { + appends, + record, + diags, + } => { + let diags: Vec> = vec![Box::new(OwnedDiagnostic(diags))]; + let diags = diags.as_slice(); + let record = record.as_record(); + for append in appends.iter() { + if let Err(err) = append.append(&record, diags) { + let err = Error::new("failed to append record").set_source(err); + trap.trap(&err); + } + } + } + Task::Flush { appends } => { + for append in appends.iter() { + if let Err(err) = append.flush() { + let err = Error::new("failed to flush").set_source(err); + trap.trap(&err); + } + } + } + } + } + } +} + +#[derive(Debug)] +struct OwnedDiagnostic(Vec<(kv::KeyOwned, kv::ValueOwned)>); + +impl Diagnostic for OwnedDiagnostic { + fn visit(&self, visitor: &mut dyn Visitor) -> Result<(), Error> { + for (key, value) in &self.0 { + visitor.visit(key.by_ref(), value.by_ref())?; + } + Ok(()) + } +} diff --git a/appenders/fastrace/Cargo.toml b/appenders/fastrace/Cargo.toml index a674fca..80a4777 100644 --- a/appenders/fastrace/Cargo.toml +++ b/appenders/fastrace/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-fastrace" -version = "0.1.0" +version = "0.2.0" description = "Fastrace appender for Logforth." keywords = ["logging", "log", "fastrace"] diff --git a/appenders/fastrace/src/lib.rs b/appenders/fastrace/src/lib.rs index 5e42070..4be7766 100644 --- a/appenders/fastrace/src/lib.rs +++ b/appenders/fastrace/src/lib.rs @@ -73,6 +73,12 @@ impl Append for FastraceEvent { fastrace::flush(); Ok(()) } + + fn exit(&self) -> Result<(), Error> { + // fastrace flush accesses thread-local storage, + // which is not supported in atexit handlers + Ok(()) + } } struct KvCollector { diff --git a/appenders/file/Cargo.toml b/appenders/file/Cargo.toml index eb60497..49c78e2 100644 --- a/appenders/file/Cargo.toml +++ b/appenders/file/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-file" -version = "0.1.0" +version = "0.2.0" description = "File appender for Logforth with optional rollover strategy." keywords = ["logging", "log", "file-appender"] diff --git a/appenders/file/src/README.md b/appenders/file/README.md similarity index 100% rename from appenders/file/src/README.md rename to appenders/file/README.md diff --git a/appenders/file/src/append.rs b/appenders/file/src/append.rs index 4bd795a..a7cd6af 100644 --- a/appenders/file/src/append.rs +++ b/appenders/file/src/append.rs @@ -21,6 +21,7 @@ use std::sync::MutexGuard; use logforth_core::Diagnostic; use logforth_core::Error; use logforth_core::Layout; +use logforth_core::Trap; use logforth_core::append::Append; use logforth_core::layout::PlainTextLayout; use logforth_core::record::Record; @@ -77,6 +78,24 @@ impl FileBuilder { self } + /// Set the trap for the file writer. + /// + /// Default to [`DefaultTrap`]. + /// + /// # Examples + /// + /// ``` + /// use logforth_append_file::FileBuilder; + /// use logforth_core::trap::DefaultTrap; + /// + /// let builder = FileBuilder::new("my_service", "my_app"); + /// builder.trap(DefaultTrap::default()); + /// ``` + pub fn trap(mut self, trap: impl Into>) -> Self { + self.builder = self.builder.trap(trap); + self + } + /// Set the rotation strategy to roll over log files minutely. pub fn rollover_minutely(mut self) -> Self { self.builder = self.builder.rotation(Rotation::Minutely); @@ -151,3 +170,10 @@ impl Append for File { Ok(()) } } + +impl Drop for File { + fn drop(&mut self) { + let writer = self.writer.get_mut().unwrap_or_else(|e| e.into_inner()); + let _ = writer.flush(); + } +} diff --git a/appenders/file/src/rolling.rs b/appenders/file/src/rolling.rs index 301c464..7b2c580 100644 --- a/appenders/file/src/rolling.rs +++ b/appenders/file/src/rolling.rs @@ -26,6 +26,8 @@ use std::str::FromStr; use jiff::Zoned; use jiff::civil::DateTime; use logforth_core::Error; +use logforth_core::Trap; +use logforth_core::trap::DefaultTrap; use crate::clock::Clock; use crate::rotation::Rotation; @@ -40,7 +42,8 @@ pub struct RollingFileWriter { impl Drop for RollingFileWriter { fn drop(&mut self) { if let Err(err) = self.writer.flush() { - eprintln!("failed to flush file writer on dropped: {err}"); + let err = Error::new("failed to flush file writer on dropped").set_source(err); + self.state.trap.trap(&err); } } } @@ -87,6 +90,7 @@ pub struct RollingFileWriterBuilder { max_size: Option, max_files: Option, clock: Clock, + trap: Box, } impl RollingFileWriterBuilder { @@ -101,9 +105,16 @@ impl RollingFileWriterBuilder { max_size: None, max_files: None, clock: Clock::DefaultClock, + trap: Box::new(DefaultTrap::default()), } } + /// Set the trap for the rolling file writer. + pub fn trap(mut self, trap: impl Into>) -> Self { + self.trap = trap.into(); + self + } + /// Set the rotation policy. #[must_use] pub fn rotation(mut self, rotation: Rotation) -> Self { @@ -153,6 +164,7 @@ impl RollingFileWriterBuilder { max_size, max_files, clock, + trap, } = self; if filename.is_empty() { @@ -167,6 +179,7 @@ impl RollingFileWriterBuilder { max_size, max_files, clock, + trap, )?; Ok(RollingFileWriter { state, writer }) @@ -206,9 +219,11 @@ struct State { max_size: Option, max_files: Option, clock: Clock, + trap: Box, } impl State { + #[allow(clippy::too_many_arguments)] fn new( rotation: Rotation, dir: impl AsRef, @@ -217,6 +232,7 @@ impl State { max_size: Option, max_files: Option, clock: Clock, + trap: Box, ) -> Result<(Self, File), Error> { let now = clock.now(); let log_dir = dir.as_ref().to_path_buf(); @@ -235,6 +251,7 @@ impl State { max_size, max_files, clock, + trap, }; let files = { @@ -433,7 +450,8 @@ impl State { if let Some(max_files) = self.max_files { if let Err(err) = self.delete_oldest_logs(max_files.get()) { - eprintln!("failed to delete oldest logs: {err}"); + let err = Error::new("failed to delete oldest logs").set_source(err); + self.trap.trap(&err); } } @@ -444,11 +462,15 @@ impl State { match self.rotate_log_writer(now) { Ok(new_file) => { if let Err(err) = file.flush() { - eprintln!("failed to flush previous writer: {err}"); + let err = Error::new("failed to flush previous writer").set_source(err); + self.trap.trap(&err); } *file = new_file; } - Err(err) => eprintln!("failed to rotate log writer: {err}"), + Err(err) => { + let err = Error::new("failed to rotate log writer").set_source(err); + self.trap.trap(&err); + } } } diff --git a/appenders/journald/Cargo.toml b/appenders/journald/Cargo.toml index cb00767..06b4fb0 100644 --- a/appenders/journald/Cargo.toml +++ b/appenders/journald/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-journald" -version = "0.1.0" +version = "0.2.0" description = "Journald appender for Logforth." keywords = ["logging", "log", "journald"] diff --git a/appenders/journald/src/README.md b/appenders/journald/README.md similarity index 100% rename from appenders/journald/src/README.md rename to appenders/journald/README.md diff --git a/appenders/journald/src/lib.rs b/appenders/journald/src/lib.rs index 61d54b8..addd38b 100644 --- a/appenders/journald/src/lib.rs +++ b/appenders/journald/src/lib.rs @@ -312,4 +312,9 @@ impl Append for Journald { self.send_payload(&buffer)?; Ok(()) } + + fn flush(&self) -> Result<(), Error> { + // UnixDatagram doesn't buffer anything, so nothing to do here. + Ok(()) + } } diff --git a/appenders/opentelemetry/Cargo.toml b/appenders/opentelemetry/Cargo.toml index 42dde9b..23851dd 100644 --- a/appenders/opentelemetry/Cargo.toml +++ b/appenders/opentelemetry/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-opentelemetry" -version = "0.1.0" +version = "0.2.0" description = "Opemtelemetry appender for Logforth." keywords = ["logging", "log", "opentelemtry"] diff --git a/appenders/opentelemetry/src/lib.rs b/appenders/opentelemetry/src/lib.rs index 23c3c08..3c2710f 100644 --- a/appenders/opentelemetry/src/lib.rs +++ b/appenders/opentelemetry/src/lib.rs @@ -277,6 +277,12 @@ impl Append for OpentelemetryLog { } } +impl Drop for OpentelemetryLog { + fn drop(&mut self) { + let _ = self.provider.force_flush(); + } +} + fn log_level_to_otel_severity(level: Level) -> opentelemetry::logs::Severity { match level { Level::Error => opentelemetry::logs::Severity::Error, diff --git a/appenders/syslog/Cargo.toml b/appenders/syslog/Cargo.toml index c99c881..53fc892 100644 --- a/appenders/syslog/Cargo.toml +++ b/appenders/syslog/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-append-syslog" -version = "0.1.0" +version = "0.2.0" description = "Syslog appender for Logforth." keywords = ["logging", "log", "syslog"] diff --git a/appenders/syslog/src/lib.rs b/appenders/syslog/src/lib.rs index 43958de..7d98c66 100644 --- a/appenders/syslog/src/lib.rs +++ b/appenders/syslog/src/lib.rs @@ -308,6 +308,13 @@ impl Append for Syslog { } } +impl Drop for Syslog { + fn drop(&mut self) { + let sender = self.sender.get_mut().unwrap_or_else(|e| e.into_inner()); + let _ = sender.flush(); + } +} + #[derive(Debug)] struct SyslogFormatter { format: SyslogFormat, diff --git a/bridges/log/Cargo.toml b/bridges/log/Cargo.toml index 43f8c9b..c84da1e 100644 --- a/bridges/log/Cargo.toml +++ b/bridges/log/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-bridge-log" -version = "0.1.0" +version = "0.2.0" description = "Bridge Logforth with the log crate." keywords = ["logging", "log"] diff --git a/core/Cargo.toml b/core/Cargo.toml index 6dd2501..b808adf 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-core" -version = "0.1.0" +version = "0.2.0" description = "Core structs and functions for Logforth." keywords = ["logging", "log"] @@ -27,6 +27,10 @@ readme.workspace = true repository.workspace = true rust-version.workspace = true +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + [features] default = [] serde = ["value-bag/serde"] diff --git a/core/src/append/mod.rs b/core/src/append/mod.rs index 16c7089..ac6e796 100644 --- a/core/src/append/mod.rs +++ b/core/src/append/mod.rs @@ -33,10 +33,17 @@ pub trait Append: fmt::Debug + Send + Sync + 'static { fn append(&self, record: &Record, diags: &[Box]) -> Result<(), Error>; /// Flush any buffered records. + fn flush(&self) -> Result<(), Error>; + + /// Perform any cleanup work before the program exits. + /// + /// Default to call `flush`. /// - /// Default to a no-op. - fn flush(&self) -> Result<(), Error> { - Ok(()) + /// This is typically called within a global logger during program exits. + /// If it is not in a global logger, the drop glue should perform necessary + /// cleanup. + fn exit(&self) -> Result<(), Error> { + self.flush() } } diff --git a/core/src/append/testing.rs b/core/src/append/testing.rs index ae5d555..d595ef9 100644 --- a/core/src/append/testing.rs +++ b/core/src/append/testing.rs @@ -67,4 +67,9 @@ impl Append for Testing { eprintln!("{}", String::from_utf8_lossy(&bytes)); Ok(()) } + + fn flush(&self) -> Result<(), Error> { + // nothing to flush for eprintln + Ok(()) + } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 0716ae3..e1e9860 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -22,11 +22,13 @@ pub mod filter; pub mod kv; pub mod layout; pub mod record; +pub mod trap; pub use self::append::Append; pub use self::diagnostic::Diagnostic; pub use self::filter::Filter; pub use self::layout::Layout; +pub use self::trap::Trap; mod error; pub use self::error::*; diff --git a/core/src/logger/log_impl.rs b/core/src/logger/log_impl.rs index ddfffc8..e509c54 100644 --- a/core/src/logger/log_impl.rs +++ b/core/src/logger/log_impl.rs @@ -13,6 +13,8 @@ // limitations under the License. use std::io::Write; +use std::panic; +use std::sync::Once; use std::sync::OnceLock; use crate::Append; @@ -38,7 +40,49 @@ pub fn default_logger() -> &'static Logger { /// If a default logger has already been set, the function returns the provided logger /// as an error. pub fn set_default_logger(logger: Logger) -> Result<(), Logger> { - DEFAULT_LOGGER.set(logger) + static ATEXIT_CALLBACK: Once = Once::new(); + + DEFAULT_LOGGER.set(logger)?; + ATEXIT_CALLBACK.call_once(flush_default_logger_at_exit); + Ok(()) +} + +fn flush_default_logger_at_exit() { + // Rust never calls `drop` for static variables. + // + // Setting up an exit handler gives us a chance to flush the default logger + // once at the program exit, thus we don't lose the last logs. + + extern "C" fn handler() { + if let Some(default_logger) = DEFAULT_LOGGER.get() { + default_logger.exit(); + } + } + + #[must_use] + fn try_atexit() -> bool { + use std::os::raw::c_int; + + unsafe extern "C" { + fn atexit(cb: extern "C" fn()) -> c_int; + } + + (unsafe { atexit(handler) }) == 0 + } + + fn hook_panic() { + let previous_hook = panic::take_hook(); + + panic::set_hook(Box::new(move |info| { + handler(); + previous_hook(info); + })); + } + + if !try_atexit() { + // if we failed to register the `atexit` handler, at least we hook into panic + hook_panic(); + } } /// A logger that dispatches log records to one or more dispatcher. @@ -64,8 +108,8 @@ impl Logger { /// Log the [`Record`]. pub fn log(&self, record: &Record) { for dispatch in &self.dispatches { - if let Err(err) = dispatch.log(record) { - handle_log_error(record, err); + for err in dispatch.log(record) { + handle_log_error(record, &err); } } } @@ -73,8 +117,17 @@ impl Logger { /// Flush any buffered records. pub fn flush(&self) { for dispatch in &self.dispatches { - if let Err(err) = dispatch.flush() { - handle_flush_error(err); + for err in dispatch.flush() { + handle_flush_error(&err); + } + } + } + + /// Perform any cleanup work before the program exits. + pub fn exit(&self) { + for dispatch in &self.dispatches { + for err in dispatch.exit() { + handle_exit_error(&err); } } } @@ -126,32 +179,48 @@ impl Dispatch { true } - fn log(&self, record: &Record) -> Result<(), Error> { + fn log(&self, record: &Record) -> Vec { let diagnostics = &self.diagnostics; for filter in &self.filters { match filter.matches(record, diagnostics) { - FilterResult::Reject => return Ok(()), + FilterResult::Reject => return vec![], FilterResult::Accept => break, FilterResult::Neutral => {} } } + let mut errors = vec![]; for append in &self.appends { - append.append(record, diagnostics)?; + if let Err(err) = append.append(record, diagnostics) { + errors.push(err); + } } - Ok(()) + errors } - fn flush(&self) -> Result<(), Error> { + fn flush(&self) -> Vec { + let mut errors = vec![]; for append in &self.appends { - append.flush()?; + if let Err(err) = append.flush() { + errors.push(err); + } } - Ok(()) + errors + } + + fn exit(&self) -> Vec { + let mut errors = vec![]; + for append in &self.appends { + if let Err(err) = append.exit() { + errors.push(err); + } + } + errors } } -fn handle_log_error(record: &Record, error: Error) { +fn handle_log_error(record: &Record, error: &Error) { let Err(fallback_error) = write!( std::io::stderr(), r###" @@ -182,7 +251,7 @@ Error performing stderr logging after error occurred during regular logging. ); } -fn handle_flush_error(error: Error) { +fn handle_flush_error(error: &Error) { let Err(fallback_error) = write!( std::io::stderr(), r###" @@ -201,3 +270,23 @@ Error performing stderr logging after error occurred during regular flush. "###, ); } + +fn handle_exit_error(error: &Error) { + let Err(fallback_error) = write!( + std::io::stderr(), + r###" +Error perform exit. + Error: {error:?} +"###, + ) else { + return; + }; + + panic!( + r###" +Error performing stderr logging after error occurred during atexit. + Error: {error:?} + Fallback error: {fallback_error} +"###, + ); +} diff --git a/core/src/trap/default.rs b/core/src/trap/default.rs new file mode 100644 index 0000000..d69d49d --- /dev/null +++ b/core/src/trap/default.rs @@ -0,0 +1,32 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::io::Write; + +use crate::Error; +use crate::trap::Trap; + +/// A default trap that sends errors to standard error if possible. +/// +/// If standard error is not available, it does nothing. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct DefaultTrap {} + +impl Trap for DefaultTrap { + fn trap(&self, err: &Error) { + let _ = writeln!(io::stderr(), "{err}"); + } +} diff --git a/core/src/trap/mod.rs b/core/src/trap/mod.rs new file mode 100644 index 0000000..ffdca2f --- /dev/null +++ b/core/src/trap/mod.rs @@ -0,0 +1,35 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Traps for processing errors. + +use core::fmt; + +use crate::Error; + +mod default; + +pub use self::default::DefaultTrap; + +/// A trap for processing errors. +pub trait Trap: fmt::Debug + Send + Sync + 'static { + /// Process an error. + fn trap(&self, err: &Error); +} + +impl From for Box { + fn from(value: T) -> Self { + Box::new(value) + } +} diff --git a/diagnostics/fastrace/Cargo.toml b/diagnostics/fastrace/Cargo.toml index 9de0ff2..4b14d19 100644 --- a/diagnostics/fastrace/Cargo.toml +++ b/diagnostics/fastrace/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-diagnostic-fastrace" -version = "0.1.0" +version = "0.2.0" description = "Fastrace diagnostic for Logforth." keywords = ["logging", "log", "fastrace"] diff --git a/layouts/google-cloud-logging/Cargo.toml b/layouts/google-cloud-logging/Cargo.toml index df26490..b39ab20 100644 --- a/layouts/google-cloud-logging/Cargo.toml +++ b/layouts/google-cloud-logging/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-layout-google-cloud-logging" -version = "0.1.0" +version = "0.2.0" description = "Google Cloud Structured Logging layout for Logforth." keywords = ["logging", "log", "google-cloud-logging"] diff --git a/layouts/json/Cargo.toml b/layouts/json/Cargo.toml index eadd6e6..f4d42e5 100644 --- a/layouts/json/Cargo.toml +++ b/layouts/json/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-layout-json" -version = "0.1.0" +version = "0.2.0" description = "JSON layout for Logforth." keywords = ["logging", "log", "json"] diff --git a/layouts/logfmt/Cargo.toml b/layouts/logfmt/Cargo.toml index d2f8474..32a067b 100644 --- a/layouts/logfmt/Cargo.toml +++ b/layouts/logfmt/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-layout-logfmt" -version = "0.1.0" +version = "0.2.0" description = "Logfmt layout for Logforth." keywords = ["logging", "log", "logfmt"] diff --git a/layouts/text/Cargo.toml b/layouts/text/Cargo.toml index 8539339..9811e8d 100644 --- a/layouts/text/Cargo.toml +++ b/layouts/text/Cargo.toml @@ -14,7 +14,7 @@ [package] name = "logforth-layout-text" -version = "0.1.0" +version = "0.2.0" description = "Optionally colored text layout for Logforth." keywords = ["logging", "log", "colored", "terminal"] diff --git a/logforth/Cargo.toml b/logforth/Cargo.toml index b56eb36..08b0082 100644 --- a/logforth/Cargo.toml +++ b/logforth/Cargo.toml @@ -41,6 +41,7 @@ starter-log = ["bridge-log", "append-file", "layout-text", "layout-json"] bridge-log = ["dep:logforth-bridge-log"] # Appenders +append-async = ["dep:logforth-append-async"] append-fastrace = ["dep:logforth-append-fastrace"] append-file = ["dep:logforth-append-file"] append-journald = ["dep:logforth-append-journald"] @@ -64,6 +65,7 @@ rustls = ["logforth-append-syslog?/rustls"] logforth-core = { workspace = true } # Optional dependencies +logforth-append-async = { workspace = true, optional = true } logforth-append-fastrace = { workspace = true, optional = true } logforth-append-file = { workspace = true, optional = true } logforth-append-journald = { workspace = true, optional = true } @@ -85,6 +87,17 @@ serde = { workspace = true } [lints] workspace = true +[[test]] +harness = false +name = "global_async_sink" +required-features = ["starter-log", "append-async"] + +[[example]] +doc-scrape-examples = true +name = "asynchronous" +path = "examples/asynchronous.rs" +required-features = ["starter-log", "append-async"] + [[example]] doc-scrape-examples = true name = "testing" diff --git a/logforth/examples/asynchronous.rs b/logforth/examples/asynchronous.rs new file mode 100644 index 0000000..427295a --- /dev/null +++ b/logforth/examples/asynchronous.rs @@ -0,0 +1,40 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! An example of logging to a single file with async combinator. + +use logforth::append::file::FileBuilder; +use logforth::layout::JsonLayout; +use logforth::record::LevelFilter; +use logforth_append_async::AsyncBuilder; + +fn main() { + let file = FileBuilder::new("logs", "my_app_async") + .filename_suffix("log") + .layout(JsonLayout::default()) + .build() + .unwrap(); + + let asynchronous = AsyncBuilder::new("logforth-async").append(file).build(); + + logforth::starter_log::builder() + .dispatch(|d| d.filter(LevelFilter::Trace).append(asynchronous)) + .apply(); + + log::error!("Hello single error!"); + log::warn!("Hello single warn!"); + log::info!("Hello single info!"); + log::debug!("Hello single debug!"); + log::trace!("Hello single trace!"); +} diff --git a/logforth/src/lib.rs b/logforth/src/lib.rs index 55d6989..9cc1316 100644 --- a/logforth/src/lib.rs +++ b/logforth/src/lib.rs @@ -71,6 +71,10 @@ pub use logforth_core::record; /// Dispatch log records to various targets. pub mod append { + #[cfg(feature = "append-async")] + pub use logforth_append_async as asynchronous; // `async` is a keyword + #[cfg(feature = "append-async")] + pub use logforth_append_async::Async; #[cfg(feature = "append-fastrace")] pub use logforth_append_fastrace::FastraceEvent; #[cfg(feature = "append-file")] @@ -93,6 +97,8 @@ pub mod append { /// Bridge logforth with other logging frameworks. pub mod bridge { /// Bridge logforth with [`log`]. + /// + /// [`log`]: https://docs.rs/log/ #[cfg(feature = "bridge-log")] pub mod log { #[cfg(feature = "bridge-log")] @@ -139,5 +145,10 @@ pub mod layout { pub use logforth_layout_text::TextLayout; } +/// Traps for processing errors. +pub mod trap { + pub use logforth_core::trap::*; +} + #[cfg(feature = "bridge-log")] pub mod starter_log; diff --git a/logforth/src/starter_log.rs b/logforth/src/starter_log.rs index 374da53..d5f6068 100644 --- a/logforth/src/starter_log.rs +++ b/logforth/src/starter_log.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Starter configurations for quickly setting up logforth with the [`log`] crate +//! Starter configurations for quickly setting up logforth with the `log` crate use crate::Append; use crate::Error; @@ -23,7 +23,7 @@ use crate::core::DispatchBuilder; use crate::core::LoggerBuilder; use crate::filter::env_filter::EnvFilterBuilder; -/// A builder for setting up logforth with the [`log`] crate. +/// A builder for setting up logforth with the `log` crate. pub struct LogStarterBuilder { builder: LoggerBuilder, } @@ -70,7 +70,7 @@ impl LogStarterBuilder { /// before initialization will be ignored. /// /// This function will set the global maximum log level to `Trace`. To override this, call - /// [`log::set_max_level`] after this function. + /// `log::set_max_level` after this function. /// /// # Errors /// @@ -103,7 +103,7 @@ impl LogStarterBuilder { /// initialized a global logger. /// /// This function will set the global maximum log level to `Trace`. To override this, call - /// [`log::set_max_level`] after this function. + /// `log::set_max_level` after this function. /// /// # Panics /// @@ -125,7 +125,7 @@ enum StdStream { Stderr(append::Stderr), } -/// A builder for setting up logforth with the [`log`] crate, using standard output/error streams. +/// A builder for setting up logforth with the `log` crate, using standard output/error streams. pub struct LogStarterStdStreamBuilder { append: StdStream, filter: Box, @@ -207,7 +207,7 @@ impl LogStarterStdStreamBuilder { /// before initialization will be ignored. /// /// This function will set the global maximum log level to `Trace`. To override this, call - /// [`log::set_max_level`] after this function. + /// `log::set_max_level` after this function. /// /// # Errors /// @@ -246,7 +246,7 @@ impl LogStarterStdStreamBuilder { /// initialized a global logger. /// /// This function will set the global maximum log level to `Trace`. To override this, call - /// [`log::set_max_level`] after this function. + /// `log::set_max_level` after this function. /// /// # Panics /// diff --git a/logforth/tests/global_async_sink.rs b/logforth/tests/global_async_sink.rs new file mode 100644 index 0000000..a8e42bd --- /dev/null +++ b/logforth/tests/global_async_sink.rs @@ -0,0 +1,125 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This case ensures that the asynchronous logger flushes correctly at program exit. + +// This refers to https://github.com/SpriteOvO/spdlog-rs/issues/64 + +use std::fmt::Write; +use std::os::raw::c_int; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use logforth_append_async::AsyncBuilder; +use logforth_core::Append; +use logforth_core::Diagnostic; +use logforth_core::Error; +use logforth_core::record::LevelFilter; +use logforth_core::record::Record; + +static IS_LOGGED: AtomicBool = AtomicBool::new(false); +static IS_FLUSHED: AtomicBool = AtomicBool::new(false); + +#[derive(Debug)] +struct SetFlags; + +impl Append for SetFlags { + fn append(&self, _: &Record, _: &[Box]) -> Result<(), Error> { + IS_LOGGED.store(true, Ordering::SeqCst); + Ok(()) + } + + fn flush(&self) -> Result<(), Error> { + // assert that the record has been logged before flushing + assert!(IS_LOGGED.load(Ordering::SeqCst)); + IS_FLUSHED.store(true, Ordering::SeqCst); + Ok(()) + } +} + +fn run_test() { + { + extern "C" fn check() { + // assert that `Async` appender in the default logger will be flushed correctly + // and will not panic. + assert!(IS_FLUSHED.load(Ordering::SeqCst)); + } + + // set up `atexit` to check the flag at the end of the program + unsafe extern "C" { + fn atexit(cb: extern "C" fn()) -> c_int; + } + + assert_eq!(unsafe { atexit(check) }, 0); + + let asynchronous = AsyncBuilder::new("async-appender").append(SetFlags).build(); + + logforth::starter_log::builder() + .dispatch(|d| d.filter(LevelFilter::Trace).append(asynchronous)) + .apply(); + } + + log::info!("hello async sink"); +} + +fn main() { + // This is a flaky test, it only has a certain probability of failing, + // so we run it multiple times to make sure it's really working properly. + { + let mut captured_output = String::new(); + let args = std::env::args().collect::>(); + + let is_parent = args.iter().all(|arg| arg != "child"); + if is_parent { + for i in 0..1000 { + let output = std::process::Command::new(&args[0]) + .arg("child") + .stderr(std::process::Stdio::piped()) + .output() + .unwrap(); + + let success = output.status.success(); + writeln!( + captured_output, + "Attempt #{i} = {}", + if success { "ok" } else { "failed!" } + ) + .unwrap(); + + if !success { + eprintln!("{captured_output}"); + + let stderr = String::from_utf8_lossy(&output.stderr).lines().fold( + String::new(), + |mut contents, line| { + writeln!(&mut contents, "> {line}").unwrap(); + contents + }, + ); + + eprintln!("stderr of the failed attempt:\n{stderr}"); + panic!("test failed"); + } + } + return; + } else { + assert_eq!(args[1], "child"); + } + } + + // Run the test after leaving the scope, so the main function ends + // without dropping additional variables, thus exiting faster. This + // should increase the probability of reproducing the error. + run_test(); +} diff --git a/logforth/tests/recursive_logging.rs b/logforth/tests/recursive_logging.rs index c77a2b8..8e979a2 100644 --- a/logforth/tests/recursive_logging.rs +++ b/logforth/tests/recursive_logging.rs @@ -14,7 +14,7 @@ //! This case ensures our impl does properly handle recursive logging. -#![cfg(feature = "append-file")] +#![cfg(feature = "starter-log")] use std::num::NonZeroUsize;