Skip to content

Commit a7b0efb

Browse files
committed
Replace embassy-sync with critical-section + hand-rolled WakeSignal
SharedRamRing no longer depends on embassy-sync. The blocking mutex is replaced with critical_section::Mutex (borrow pattern), and the Signal is replaced with a minimal WakeSignal built from an AtomicBool and a stored core::task::Waker — no runtime dependency required. The feature flag is renamed from `embassy` to `critical-section` to reflect the actual dependency.
1 parent 89b33e0 commit a7b0efb

File tree

3 files changed

+107
-30
lines changed

3 files changed

+107
-30
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ heapless = { version = "0.8.0", optional = true }
2222
heapless-09 = { package = "heapless", version = "0.9.0", optional = true }
2323
postcard = { version = "1.1.3", optional = true }
2424
serde = { version = "1.0.228", default-features = false, optional = true }
25-
embassy-sync = { version = "0.6", optional = true }
25+
critical-section = { version = "1.0", optional = true }
2626

2727
[dev-dependencies]
2828
approx = "0.5.1"
@@ -44,8 +44,8 @@ heapless = ["dep:heapless"]
4444
heapless-09 = ["dep:heapless-09"]
4545
# Enable convenience implementations for postcard
4646
postcard = ["dep:postcard", "dep:serde"]
47-
# Enable ISR-safe SharedRamRing with Embassy Signal wakeup for the drain task
48-
embassy = ["dep:embassy-sync"]
47+
# Enable ISR-safe SharedRamRing using critical-section for the drain task
48+
critical-section = ["dep:critical-section"]
4949

5050
# Internal feature. Do not use. Anything here is semver exempt.
5151
_test = ["dep:futures", "dep:approx", "std", "arrayvec", "alloc", "heapless"]

src/queue/buffer.rs

Lines changed: 103 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,90 @@ impl<S: NorFlash, C: CacheImpl, const RAM_BYTES: usize> BufferedQueue<S, C, RAM_
342342
}
343343
}
344344

345+
// ── WakeSignal (critical-section + core::task) ───────────────────────────────
346+
347+
/// A minimal async signal: set from any context (including ISR), awaited from task context.
348+
///
349+
/// Uses an atomic flag plus a stored [`Waker`][core::task::Waker]; no runtime dependency.
350+
#[cfg(feature = "critical-section")]
351+
struct WakeSignal {
352+
pending: core::sync::atomic::AtomicBool,
353+
waker: critical_section::Mutex<core::cell::RefCell<Option<core::task::Waker>>>,
354+
}
355+
356+
#[cfg(feature = "critical-section")]
357+
impl WakeSignal {
358+
const fn new() -> Self {
359+
Self {
360+
pending: core::sync::atomic::AtomicBool::new(false),
361+
waker: critical_section::Mutex::new(core::cell::RefCell::new(None)),
362+
}
363+
}
364+
365+
/// Mark the signal as pending and wake any registered task.
366+
/// Safe to call from interrupt context.
367+
fn signal(&self) {
368+
self.pending
369+
.store(true, core::sync::atomic::Ordering::Release);
370+
critical_section::with(|cs| {
371+
if let Some(waker) = self.waker.borrow(cs).borrow_mut().take() {
372+
waker.wake();
373+
}
374+
});
375+
}
376+
377+
/// Future that resolves once [`signal`][Self::signal] has been called.
378+
fn wait(&self) -> WakeSignalFuture<'_> {
379+
WakeSignalFuture { signal: self }
380+
}
381+
}
382+
383+
#[cfg(feature = "critical-section")]
384+
struct WakeSignalFuture<'a> {
385+
signal: &'a WakeSignal,
386+
}
387+
388+
#[cfg(feature = "critical-section")]
389+
impl core::future::Future for WakeSignalFuture<'_> {
390+
type Output = ();
391+
392+
fn poll(
393+
self: core::pin::Pin<&mut Self>,
394+
cx: &mut core::task::Context<'_>,
395+
) -> core::task::Poll<()> {
396+
// Fast path: already signaled.
397+
if self
398+
.signal
399+
.pending
400+
.swap(false, core::sync::atomic::Ordering::Acquire)
401+
{
402+
return core::task::Poll::Ready(());
403+
}
404+
405+
// Register the waker so signal() can wake us.
406+
critical_section::with(|cs| {
407+
*self.signal.waker.borrow(cs).borrow_mut() = Some(cx.waker().clone());
408+
});
409+
410+
// Re-check: signal() may have run between the first load and waker registration.
411+
if self
412+
.signal
413+
.pending
414+
.swap(false, core::sync::atomic::Ordering::Acquire)
415+
{
416+
critical_section::with(|cs| {
417+
self.signal.waker.borrow(cs).borrow_mut().take();
418+
});
419+
core::task::Poll::Ready(())
420+
} else {
421+
core::task::Poll::Pending
422+
}
423+
}
424+
}
425+
345426
// ── SharedRamRing ─────────────────────────────────────────────────────────────
346427

347-
/// An ISR-safe RAM ring buffer with an Embassy [`Signal`][embassy_sync::signal::Signal] that
348-
/// wakes a drain task on enqueue.
428+
/// An ISR-safe RAM ring buffer that wakes a drain task on enqueue.
349429
///
350430
/// Designed to be placed in a `static`:
351431
/// ```ignore
@@ -366,49 +446,44 @@ impl<S: NorFlash, C: CacheImpl, const RAM_BYTES: usize> BufferedQueue<S, C, RAM_
366446
/// RING.enqueue(&sensor_sample, OverflowPolicy::DiscardOldest);
367447
///
368448
/// // In the drain task:
369-
/// #[embassy_executor::task]
370449
/// async fn drain(mut storage: QueueStorage<Flash, NoCache>) {
371450
/// let mut scratch = [0u8; 64];
372451
/// loop {
373452
/// RING.wait_and_drain_all(&mut storage, &mut scratch, false).await.unwrap();
374453
/// }
375454
/// }
376455
/// ```
377-
#[cfg(feature = "embassy")]
456+
#[cfg(feature = "critical-section")]
378457
pub struct SharedRamRing<const N: usize> {
379-
ring: embassy_sync::blocking_mutex::Mutex<
380-
embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex,
381-
core::cell::RefCell<RamRing<N>>,
382-
>,
383-
signal: embassy_sync::signal::Signal<
384-
embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex,
385-
(),
386-
>,
458+
ring: critical_section::Mutex<core::cell::RefCell<RamRing<N>>>,
459+
signal: WakeSignal,
387460
}
388461

389-
#[cfg(feature = "embassy")]
462+
#[cfg(feature = "critical-section")]
390463
impl<const N: usize> SharedRamRing<N> {
391464
/// Create a new `SharedRamRing`. Suitable for `static` initialisation.
392465
pub const fn new() -> Self {
393466
Self {
394-
ring: embassy_sync::blocking_mutex::Mutex::new(core::cell::RefCell::new(RamRing::new())),
395-
signal: embassy_sync::signal::Signal::new(),
467+
ring: critical_section::Mutex::new(core::cell::RefCell::new(RamRing::new())),
468+
signal: WakeSignal::new(),
396469
}
397470
}
398471

399472
// ── Producer API (sync, ISR-safe) ─────────────────────────────────────────
400473

401474
/// Enqueue an item. Safe to call from any context, including interrupt handlers.
402475
///
403-
/// Signals the drain task after a successful enqueue so it wakes without polling.
476+
/// Wakes the drain task after a successful enqueue.
404477
/// Returns `Err(())` if the ring is full and `policy` is [`OverflowPolicy::Err`].
405478
pub fn enqueue(&self, data: &[u8], policy: OverflowPolicy) -> Result<(), ()> {
406-
let result = self.ring.lock(|r| match policy {
407-
OverflowPolicy::Err => r.borrow_mut().push(data),
408-
OverflowPolicy::DiscardOldest => r.borrow_mut().push_overwriting(data),
479+
let result = critical_section::with(|cs| match policy {
480+
OverflowPolicy::Err => self.ring.borrow(cs).borrow_mut().push(data),
481+
OverflowPolicy::DiscardOldest => {
482+
self.ring.borrow(cs).borrow_mut().push_overwriting(data)
483+
}
409484
});
410485
if result.is_ok() {
411-
self.signal.signal(());
486+
self.signal.signal();
412487
}
413488
result
414489
}
@@ -433,12 +508,14 @@ impl<const N: usize> SharedRamRing<N> {
433508
scratch: &mut [u8],
434509
allow_overwrite: bool,
435510
) -> Result<bool, Error<S::Error>> {
436-
let len = self.ring.lock(|r| r.borrow().peek_into(scratch).map(|s| s.len()));
511+
let len = critical_section::with(|cs| {
512+
self.ring.borrow(cs).borrow().peek_into(scratch).map(|s| s.len())
513+
});
437514
let Some(len) = len else {
438515
return Ok(false);
439516
};
440517
storage.push(&scratch[..len], allow_overwrite).await?;
441-
self.ring.lock(|r| r.borrow_mut().discard_oldest());
518+
critical_section::with(|cs| self.ring.borrow(cs).borrow_mut().discard_oldest());
442519
Ok(true)
443520
}
444521

@@ -508,21 +585,21 @@ impl<const N: usize> SharedRamRing<N> {
508585

509586
/// Free bytes remaining in the ring.
510587
pub fn ram_free_bytes(&self) -> usize {
511-
self.ring.lock(|r| N - r.borrow().bytes_used())
588+
critical_section::with(|cs| N - self.ring.borrow(cs).borrow().bytes_used())
512589
}
513590

514591
/// Number of items currently buffered in the ring.
515592
pub fn ram_pending_count(&self) -> usize {
516-
self.ring.lock(|r| r.borrow().len())
593+
critical_section::with(|cs| self.ring.borrow(cs).borrow().len())
517594
}
518595

519596
/// Byte length of the oldest item in the ring, or `None` if the ring is empty.
520597
pub fn oldest_ram_item_len(&self) -> Option<usize> {
521-
self.ring.lock(|r| r.borrow().oldest_len())
598+
critical_section::with(|cs| self.ring.borrow(cs).borrow().oldest_len())
522599
}
523600
}
524601

525-
#[cfg(feature = "embassy")]
602+
#[cfg(feature = "critical-section")]
526603
impl<const N: usize> Default for SharedRamRing<N> {
527604
fn default() -> Self {
528605
Self::new()

src/queue/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
pub mod buffer;
44
pub use buffer::{BufferedQueue, OverflowPolicy, RamRing};
5-
#[cfg(feature = "embassy")]
5+
#[cfg(feature = "critical-section")]
66
pub use buffer::SharedRamRing;
77

88
use crate::item::{Item, ItemHeader, ItemHeaderIter};

0 commit comments

Comments
 (0)