-
Notifications
You must be signed in to change notification settings - Fork 28
feat: new version and async appender #183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
edf13ab
feat: new version and async appender
tisonkun a03bab8
add async appender stub
tisonkun 3835ebf
Append flush has no default now
tisonkun 0b65ef8
impl Drop for appenders when useful
tisonkun ca345e9
fixup links
tisonkun 9cd2c60
impl Async appender
tisonkun 9316c78
diags as record field
tisonkun 960605a
Revert "diags as record field"
tisonkun 64be2d2
impl async sink
tisonkun 286d74c
refactor out
tisonkun 17ade1b
naming
tisonkun 67b7bad
add trap
tisonkun 2555c62
atexit
tisonkun 49c2d11
docs tests
tisonkun e0640f6
tests
tisonkun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| /Cargo.lock | ||
| /target | ||
| /logs |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| # Copyright 2024 FastLabs Developers | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| [package] | ||
| name = "logforth-append-async" | ||
| version = "0.2.0" | ||
|
|
||
| description = "Asynchronous appender for Logforth." | ||
| keywords = ["logging", "log", "async"] | ||
|
|
||
| categories.workspace = true | ||
| edition.workspace = true | ||
| homepage.workspace = true | ||
| license.workspace = true | ||
| readme.workspace = true | ||
| repository.workspace = true | ||
| rust-version.workspace = true | ||
|
|
||
| [package.metadata.docs.rs] | ||
| all-features = true | ||
| rustdoc-args = ["--cfg", "docsrs"] | ||
|
|
||
| [dependencies] | ||
| arc-swap = { workspace = true } | ||
| crossbeam-channel = { workspace = true } | ||
| logforth-core = { workspace = true } | ||
|
|
||
| [lints] | ||
| workspace = true |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| # Async Appender | ||
|
|
||
| This appender is a remix of [spdlog-rs's AsyncPoolSink](https://docs.rs/spdlog-rs/*/spdlog/sink/struct.AsyncPoolSink.html), with several modifications to fit this crate's need: | ||
|
|
||
| * Instead of a thread pool, it uses a single background thread to drain the log queue. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| // Copyright 2024 FastLabs Developers | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use logforth_core::Append; | ||
| use logforth_core::Diagnostic; | ||
| use logforth_core::Error; | ||
| use logforth_core::Trap; | ||
| use logforth_core::kv; | ||
| use logforth_core::kv::Visitor; | ||
| use logforth_core::record::Record; | ||
| use logforth_core::trap::DefaultTrap; | ||
|
|
||
| use crate::Overflow; | ||
| use crate::Task; | ||
| use crate::state::AsyncState; | ||
| use crate::worker::Worker; | ||
|
|
||
| /// A composable appender, logging and flushing asynchronously. | ||
| #[derive(Debug)] | ||
| pub struct Async { | ||
| appends: Arc<[Box<dyn Append>]>, | ||
| overflow: Overflow, | ||
| state: AsyncState, | ||
| trap: Arc<dyn Trap>, | ||
| } | ||
|
|
||
| impl Append for Async { | ||
| fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> { | ||
| let mut diagnostics = vec![]; | ||
|
|
||
| let mut collector = DiagnosticCollector(&mut diagnostics); | ||
| for d in diags { | ||
| d.visit(&mut collector)?; | ||
| } | ||
|
|
||
| let overflow = self.overflow; | ||
| let task = Task::Log { | ||
| appends: self.appends.clone(), | ||
| record: Box::new(record.to_owned()), | ||
| diags: diagnostics, | ||
| }; | ||
| self.state.send_task(task, overflow) | ||
| } | ||
|
|
||
| fn flush(&self) -> Result<(), Error> { | ||
| let overflow = self.overflow; | ||
| let task = Task::Flush { | ||
| appends: self.appends.clone(), | ||
| }; | ||
| self.state.send_task(task, overflow) | ||
| } | ||
|
|
||
| fn exit(&self) -> Result<(), Error> { | ||
| // https://github.com/SpriteOvO/spdlog-rs/issues/64 | ||
| // | ||
| // If the program is tearing down, this will be the final flush. `crossbeam` | ||
| // uses thread-local internally, which is not supported in `atexit` callback. | ||
| // This can be bypassed by flushing sinks directly on the current thread, but | ||
| // before we do that we have to destroy the thread pool to ensure that any | ||
| // pending log tasks are completed. | ||
| self.state.destroy(); | ||
| for append in self.appends.iter() { | ||
| if let Err(err) = append.exit() { | ||
| self.trap.trap(&err); | ||
| } | ||
| } | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| /// A builder for configuring an async appender. | ||
| pub struct AsyncBuilder { | ||
| thread_name: String, | ||
| appends: Vec<Box<dyn Append>>, | ||
| buffered_lines_limit: Option<usize>, | ||
| trap: Arc<dyn Trap>, | ||
| overflow: Overflow, | ||
| } | ||
|
|
||
| impl AsyncBuilder { | ||
| /// Create a new async appender builder. | ||
| pub fn new(thread_name: impl Into<String>) -> AsyncBuilder { | ||
| AsyncBuilder { | ||
| thread_name: thread_name.into(), | ||
| appends: vec![], | ||
| buffered_lines_limit: None, | ||
| trap: Arc::new(DefaultTrap::default()), | ||
| overflow: Overflow::Block, | ||
| } | ||
| } | ||
|
|
||
| /// Set the buffer size of pending messages. | ||
| pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self { | ||
| self.buffered_lines_limit = buffered_lines_limit; | ||
| self | ||
| } | ||
|
|
||
| /// Set the overflow policy to block when the buffer is full. | ||
| pub fn overflow_block(mut self) -> Self { | ||
| self.overflow = Overflow::Block; | ||
| self | ||
| } | ||
|
|
||
| /// Set the overflow policy to drop incoming messages when the buffer is full. | ||
| pub fn overflow_drop_incoming(mut self) -> Self { | ||
| self.overflow = Overflow::DropIncoming; | ||
| self | ||
| } | ||
|
|
||
| /// Set the trap for this async appender. | ||
| pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self { | ||
| let trap = trap.into(); | ||
| self.trap = trap.into(); | ||
| self | ||
| } | ||
|
|
||
| /// Add an appender to this async appender. | ||
| pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self { | ||
| self.appends.push(append.into()); | ||
| self | ||
| } | ||
|
|
||
| /// Build the async appender. | ||
| pub fn build(self) -> Async { | ||
| let Self { | ||
| thread_name, | ||
| appends, | ||
| buffered_lines_limit, | ||
| trap, | ||
| overflow, | ||
| } = self; | ||
|
|
||
| let appends = appends.into_boxed_slice().into(); | ||
|
|
||
| let (sender, receiver) = match buffered_lines_limit { | ||
| Some(limit) => crossbeam_channel::bounded(limit), | ||
| None => crossbeam_channel::unbounded(), | ||
| }; | ||
|
|
||
| let worker = Worker::new(receiver, trap.clone()); | ||
| let thread_handle = std::thread::Builder::new() | ||
| .name(thread_name) | ||
| .spawn(move || worker.run()) | ||
| .expect("failed to spawn async appender thread"); | ||
| let state = AsyncState::new(sender, thread_handle); | ||
|
|
||
| Async { | ||
| appends, | ||
| overflow, | ||
| state, | ||
| trap, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>); | ||
|
|
||
| impl<'a> Visitor for DiagnosticCollector<'a> { | ||
| fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> { | ||
| self.0.push((key.to_owned(), value.to_owned())); | ||
| Ok(()) | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| // Copyright 2024 FastLabs Developers | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| //! A composable appender, logging and flushing asynchronously. | ||
|
|
||
| #![cfg_attr(docsrs, feature(doc_auto_cfg))] | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use logforth_core::Append; | ||
| use logforth_core::kv; | ||
| use logforth_core::record::RecordOwned; | ||
|
|
||
| mod append; | ||
| mod state; | ||
| mod worker; | ||
|
|
||
| pub use self::append::Async; | ||
| pub use self::append::AsyncBuilder; | ||
|
|
||
| enum Task { | ||
| Log { | ||
| appends: Arc<[Box<dyn Append>]>, | ||
| record: Box<RecordOwned>, | ||
| diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, | ||
| }, | ||
| Flush { | ||
| appends: Arc<[Box<dyn Append>]>, | ||
| }, | ||
| } | ||
|
|
||
| #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] | ||
| enum Overflow { | ||
| /// Blocks until the channel is not full. | ||
| Block, | ||
| /// Drops the incoming operation. | ||
| DropIncoming, | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @ SpriteOvO's spdlog-rs
AsyncPoolSink. I borrow most of the design while change the thread pool to a single background thread.One open issue is about naming. Since
asyncis a keyword now, we'd better use a different name from Async here. Currently I useAsynchronous, but I'd prefer a shorter name. ConsideringAsyncPool, perhapsAsyncSingleThreador other.BTW we need to implement a clean up logic atexit, related to SpriteOvO/spdlog-rs#102