Skip to content

Commit cb66b59

Browse files
committed
Drop DelayedStore and associated backup test
These were created to test that our backup store does not impact the primary store writes but the boilerplate appears too much for the functionality being tested.
1 parent 02debf5 commit cb66b59

File tree

2 files changed

+4
-215
lines changed

2 files changed

+4
-215
lines changed

src/io/test_utils.rs

Lines changed: 1 addition & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use std::collections::{hash_map, HashMap};
99
use std::future::Future;
1010
use std::panic::RefUnwindSafe;
1111
use std::path::PathBuf;
12-
use std::sync::{Arc, Mutex};
13-
use std::time::Duration;
12+
use std::sync::Mutex;
1413

1514
use lightning::events::ClosureReason;
1615
use lightning::ln::functional_test_utils::{
@@ -26,8 +25,6 @@ use lightning::{check_closed_broadcast, io};
2625
use rand::distr::Alphanumeric;
2726
use rand::{rng, Rng};
2827

29-
use crate::runtime::Runtime;
30-
3128
type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister<
3229
&'a K,
3330
&'a test_utils::TestLogger,
@@ -355,168 +352,3 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
355352
// Make sure everything is persisted as expected after close.
356353
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
357354
}
358-
359-
struct DelayedStoreInner {
360-
storage: Mutex<HashMap<String, Vec<u8>>>,
361-
delay: Duration,
362-
}
363-
364-
impl DelayedStoreInner {
365-
fn new(delay: Duration) -> Self {
366-
Self { storage: Mutex::new(HashMap::new()), delay }
367-
}
368-
369-
fn make_key(pn: &str, sn: &str, key: &str) -> String {
370-
format!("{}/{}/{}", pn, sn, key)
371-
}
372-
373-
async fn read_internal(
374-
&self, primary_namespace: String, secondary_namespace: String, key: String,
375-
) -> Result<Vec<u8>, io::Error> {
376-
tokio::time::sleep(self.delay).await;
377-
378-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
379-
let storage = self.storage.lock().unwrap();
380-
storage
381-
.get(&full_key)
382-
.cloned()
383-
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "key not found"))
384-
}
385-
386-
async fn write_internal(
387-
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
388-
) -> Result<(), io::Error> {
389-
tokio::time::sleep(self.delay).await;
390-
391-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
392-
let mut storage = self.storage.lock().unwrap();
393-
storage.insert(full_key, buf);
394-
Ok(())
395-
}
396-
397-
async fn remove_internal(
398-
&self, primary_namespace: String, secondary_namespace: String, key: String,
399-
) -> Result<(), io::Error> {
400-
tokio::time::sleep(self.delay).await;
401-
402-
let full_key = Self::make_key(&primary_namespace, &secondary_namespace, &key);
403-
let mut storage = self.storage.lock().unwrap();
404-
storage.remove(&full_key);
405-
Ok(())
406-
}
407-
408-
async fn list_internal(
409-
&self, primary_namespace: String, secondary_namespace: String,
410-
) -> Result<Vec<String>, io::Error> {
411-
tokio::time::sleep(self.delay).await;
412-
413-
let prefix = format!("{}/{}/", primary_namespace, secondary_namespace);
414-
let storage = self.storage.lock().unwrap();
415-
Ok(storage
416-
.keys()
417-
.filter(|k| k.starts_with(&prefix))
418-
.map(|k| k.strip_prefix(&prefix).unwrap().to_string())
419-
.collect())
420-
}
421-
}
422-
423-
pub struct DelayedStore {
424-
inner: Arc<DelayedStoreInner>,
425-
runtime: Arc<Runtime>,
426-
}
427-
428-
impl DelayedStore {
429-
pub fn new(delay_ms: u64, runtime: Arc<Runtime>) -> Self {
430-
Self { inner: Arc::new(DelayedStoreInner::new(Duration::from_millis(delay_ms))), runtime }
431-
}
432-
}
433-
434-
impl KVStore for DelayedStore {
435-
fn read(
436-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
437-
) -> impl Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send {
438-
let inner = Arc::clone(&self.inner);
439-
let pn = primary_namespace.to_string();
440-
let sn = secondary_namespace.to_string();
441-
let key = key.to_string();
442-
443-
async move { inner.read_internal(pn, sn, key).await }
444-
}
445-
446-
fn write(
447-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
448-
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
449-
let inner = Arc::clone(&self.inner);
450-
let pn = primary_namespace.to_string();
451-
let sn = secondary_namespace.to_string();
452-
let key = key.to_string();
453-
454-
async move { inner.write_internal(pn, sn, key, buf).await }
455-
}
456-
457-
fn remove(
458-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
459-
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
460-
let inner = Arc::clone(&self.inner);
461-
let pn = primary_namespace.to_string();
462-
let sn = secondary_namespace.to_string();
463-
let key = key.to_string();
464-
465-
async move { inner.remove_internal(pn, sn, key).await }
466-
}
467-
468-
fn list(
469-
&self, primary_namespace: &str, secondary_namespace: &str,
470-
) -> impl Future<Output = Result<Vec<String>, io::Error>> + 'static + Send {
471-
let inner = Arc::clone(&self.inner);
472-
let pn = primary_namespace.to_string();
473-
let sn = secondary_namespace.to_string();
474-
475-
async move { inner.list_internal(pn, sn).await }
476-
}
477-
}
478-
479-
impl KVStoreSync for DelayedStore {
480-
fn read(
481-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
482-
) -> Result<Vec<u8>, io::Error> {
483-
let inner = Arc::clone(&self.inner);
484-
let pn = primary_namespace.to_string();
485-
let sn = secondary_namespace.to_string();
486-
let key = key.to_string();
487-
488-
self.runtime.block_on(async move { inner.read_internal(pn, sn, key).await })
489-
}
490-
491-
fn write(
492-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
493-
) -> Result<(), io::Error> {
494-
let inner = Arc::clone(&self.inner);
495-
let pn = primary_namespace.to_string();
496-
let sn = secondary_namespace.to_string();
497-
let key = key.to_string();
498-
499-
self.runtime.block_on(async move { inner.write_internal(pn, sn, key, buf).await })
500-
}
501-
502-
fn remove(
503-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
504-
) -> Result<(), io::Error> {
505-
let inner = Arc::clone(&self.inner);
506-
let pn = primary_namespace.to_string();
507-
let sn = secondary_namespace.to_string();
508-
let key = key.to_string();
509-
510-
self.runtime.block_on(async move { inner.remove_internal(pn, sn, key).await })
511-
}
512-
513-
fn list(
514-
&self, primary_namespace: &str, secondary_namespace: &str,
515-
) -> Result<Vec<String>, io::Error> {
516-
let inner = Arc::clone(&self.inner);
517-
let pn = primary_namespace.to_string();
518-
let sn = secondary_namespace.to_string();
519-
520-
self.runtime.block_on(async move { inner.list_internal(pn, sn).await })
521-
}
522-
}

src/io/tier_store.rs

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -700,9 +700,7 @@ mod tests {
700700
};
701701
use lightning_persister::fs_store::FilesystemStore;
702702

703-
use crate::io::test_utils::{
704-
do_read_write_remove_list_persist, random_storage_path, DelayedStore,
705-
};
703+
use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path};
706704
use crate::io::tier_store::TierStore;
707705
use crate::logger::Logger;
708706
use crate::runtime::Runtime;
@@ -872,47 +870,6 @@ mod tests {
872870
assert_eq!(backup_read_cm.unwrap(), data);
873871
}
874872

875-
#[test]
876-
fn backup_overflow_doesnt_fail_writes() {
877-
let base_dir = random_storage_path();
878-
let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned();
879-
let logger = Arc::new(Logger::new_fs_writer(log_path.clone(), Level::Trace).unwrap());
880-
let runtime = Arc::new(Runtime::new(Arc::clone(&logger)).unwrap());
881-
882-
let _cleanup = CleanupDir(base_dir.clone());
883-
884-
let primary_store: Arc<DynStore> =
885-
Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary"))));
886-
let mut tier =
887-
setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime));
888-
889-
let backup_store: Arc<DynStore> =
890-
Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime)));
891-
tier.set_backup_store(Arc::clone(&backup_store));
892-
893-
let data = vec![42u8; 32];
894-
895-
let key = CHANNEL_MANAGER_PERSISTENCE_KEY;
896-
for i in 0..=10 {
897-
let result = KVStoreSync::write(
898-
&tier,
899-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
900-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
901-
&format!("{}_{}", key, i),
902-
data.clone(),
903-
);
904-
905-
assert!(result.is_ok(), "Write {} should succeed", i);
906-
}
907-
908-
// Check logs for backup queue overflow message
909-
let log_contents = std::fs::read_to_string(&log_path).unwrap();
910-
assert!(
911-
log_contents.contains("Backup queue is full"),
912-
"Logs should contain backup queue overflow message"
913-
);
914-
}
915-
916873
#[test]
917874
fn lazy_removal() {
918875
let base_dir = random_storage_path();
@@ -928,7 +885,7 @@ mod tests {
928885
setup_tier_store(Arc::clone(&primary_store), Arc::clone(&logger), Arc::clone(&runtime));
929886

930887
let backup_store: Arc<DynStore> =
931-
Arc::new(DynStoreWrapper(DelayedStore::new(100, runtime)));
888+
Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup"))));
932889
tier.set_backup_store(Arc::clone(&backup_store));
933890

934891
let data = vec![42u8; 32];
@@ -943,7 +900,7 @@ mod tests {
943900
);
944901
assert!(write_result.is_ok(), "Write should succeed");
945902

946-
thread::sleep(Duration::from_millis(10));
903+
thread::sleep(Duration::from_millis(100));
947904

948905
assert_eq!(
949906
KVStoreSync::read(

0 commit comments

Comments
 (0)