Skip to content

Commit ac4e907

Browse files
committed
Implement PaginatedKVStore for FilesystemStore
Test created with claude code
1 parent 8b8b789 commit ac4e907

File tree

1 file changed

+168
-2
lines changed

1 file changed

+168
-2
lines changed

lightning-persister/src/fs_store.rs

Lines changed: 168 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,22 @@
22
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
33

44
use lightning::types::string::PrintableString;
5-
use lightning::util::persist::{KVStoreSync, MigratableKVStore};
5+
use lightning::util::persist::{
6+
KVStoreSync, MigratableKVStore, PaginatedKVStoreSync, PaginatedListResponse,
7+
};
68

79
use std::collections::HashMap;
810
use std::fs;
911
use std::io::{Read, Write};
1012
use std::path::{Path, PathBuf};
1113
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1214
use std::sync::{Arc, Mutex, RwLock};
15+
use std::time::SystemTime;
1316

1417
#[cfg(feature = "tokio")]
1518
use core::future::Future;
1619
#[cfg(feature = "tokio")]
17-
use lightning::util::persist::KVStore;
20+
use lightning::util::persist::{KVStore, PaginatedKVStore};
1821

1922
#[cfg(target_os = "windows")]
2023
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
@@ -39,6 +42,9 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3942
// a consistent view and error out.
4043
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;
4144

45+
// The default page size for paginated list operations.
46+
const PAGINATED_LIST_DEFAULT_PAGE_SIZE: usize = 50;
47+
4248
struct FilesystemStoreInner {
4349
data_dir: PathBuf,
4450
tmp_file_counter: AtomicUsize,
@@ -148,6 +154,20 @@ impl KVStoreSync for FilesystemStore {
148154
}
149155
}
150156

157+
impl PaginatedKVStoreSync for FilesystemStore {
158+
fn list_paginated(
159+
&self, primary_namespace: &str, secondary_namespace: &str, last_key: Option<String>,
160+
) -> Result<PaginatedListResponse, lightning::io::Error> {
161+
let path = self.inner.get_checked_dest_file_path(
162+
primary_namespace,
163+
secondary_namespace,
164+
None,
165+
"list_paginated",
166+
)?;
167+
self.inner.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE)
168+
}
169+
}
170+
151171
impl FilesystemStoreInner {
152172
fn get_inner_lock_ref(&self, path: PathBuf) -> Arc<RwLock<u64>> {
153173
let mut outer_lock = self.locks.lock().unwrap();
@@ -456,6 +476,74 @@ impl FilesystemStoreInner {
456476

457477
Ok(keys)
458478
}
479+
480+
fn list_paginated(
481+
&self, prefixed_dest: PathBuf, last_key: Option<String>, page_size: usize,
482+
) -> lightning::io::Result<PaginatedListResponse> {
483+
if !Path::new(&prefixed_dest).exists() {
484+
return Ok(PaginatedListResponse { keys: Vec::new(), last_key: None });
485+
}
486+
487+
let mut entries: Vec<(String, SystemTime)> = Vec::with_capacity(page_size);
488+
let mut retries = LIST_DIR_CONSISTENCY_RETRIES;
489+
490+
'retry_list: loop {
491+
entries.clear();
492+
'skip_entry: for entry in fs::read_dir(&prefixed_dest)? {
493+
let entry = entry?;
494+
let p = entry.path();
495+
496+
let res = dir_entry_is_key(&entry);
497+
match res {
498+
Ok(true) => {
499+
let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?;
500+
// Get file creation time, falling back to modified time if unavailable.
501+
let metadata = entry.metadata()?;
502+
let created_time = metadata
503+
.created()
504+
.or_else(|_| metadata.modified())
505+
.unwrap_or(SystemTime::UNIX_EPOCH);
506+
entries.push((key, created_time));
507+
},
508+
Ok(false) => {
509+
continue 'skip_entry;
510+
},
511+
Err(e) => {
512+
if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 {
513+
retries -= 1;
514+
continue 'retry_list;
515+
} else {
516+
return Err(e.into());
517+
}
518+
},
519+
}
520+
}
521+
break 'retry_list;
522+
}
523+
524+
if entries.is_empty() {
525+
return Ok(PaginatedListResponse { keys: Vec::new(), last_key: None });
526+
}
527+
528+
// Sort by creation time descending (newest first), then by key name for stability.
529+
entries.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
530+
531+
// Apply pagination: find the first entry AFTER the given key in sort order.
532+
let start_idx = if let Some(ref key) = last_key {
533+
// Find the position of this key and start after it
534+
entries.iter().position(|(k, _)| k == key).map(|pos| pos + 1).unwrap_or(0)
535+
} else {
536+
0
537+
};
538+
539+
let page_entries: Vec<String> =
540+
entries.into_iter().skip(start_idx).take(page_size).map(|(k, _)| k).collect();
541+
542+
let response_last_key =
543+
if page_entries.len() == page_size { page_entries.last().cloned() } else { None };
544+
545+
Ok(PaginatedListResponse { keys: page_entries, last_key: response_last_key })
546+
}
459547
}
460548

461549
#[cfg(feature = "tokio")]
@@ -544,6 +632,35 @@ impl KVStore for FilesystemStore {
544632
}
545633
}
546634

635+
#[cfg(feature = "tokio")]
636+
impl PaginatedKVStore for FilesystemStore {
637+
fn list_paginated(
638+
&self, primary_namespace: &str, secondary_namespace: &str, last_key: Option<String>,
639+
) -> impl Future<Output = Result<PaginatedListResponse, lightning::io::Error>> + 'static + Send
640+
{
641+
let this = Arc::clone(&self.inner);
642+
643+
let path = this.get_checked_dest_file_path(
644+
primary_namespace,
645+
secondary_namespace,
646+
None,
647+
"list_paginated",
648+
);
649+
650+
async move {
651+
let path = match path {
652+
Ok(path) => path,
653+
Err(e) => return Err(e),
654+
};
655+
tokio::task::spawn_blocking(move || {
656+
this.list_paginated(path, last_key, PAGINATED_LIST_DEFAULT_PAGE_SIZE)
657+
})
658+
.await
659+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
660+
}
661+
}
662+
}
663+
547664
fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Error> {
548665
let p = dir_entry.path();
549666
if let Some(ext) = p.extension() {
@@ -792,6 +909,55 @@ mod tests {
792909
assert_eq!(listed_keys.len(), 0);
793910
}
794911

912+
#[test]
913+
fn test_list_paginated() {
914+
let mut temp_path = std::env::temp_dir();
915+
temp_path.push("test_list_paginated");
916+
let fs_store = FilesystemStore::new(temp_path);
917+
918+
let primary = "testspace";
919+
let secondary = "testsubspace";
920+
921+
// Write multiple keys with small delays to ensure different creation times
922+
let keys = ["key_a", "key_b", "key_c", "key_d", "key_e"];
923+
for key in &keys {
924+
KVStoreSync::write(&fs_store, primary, secondary, key, vec![42u8]).unwrap();
925+
// Small delay to ensure different creation times
926+
std::thread::sleep(std::time::Duration::from_millis(10));
927+
}
928+
929+
// Test that all keys are returned (no pagination cursor)
930+
let response =
931+
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap();
932+
assert_eq!(response.keys.len(), 5);
933+
// Keys should be ordered by creation time descending (newest first)
934+
// The last written key should be first
935+
assert_eq!(response.keys[0], "key_e");
936+
assert_eq!(response.keys[4], "key_a");
937+
// No more pages since we have less than page_size (50)
938+
assert!(response.last_key.is_none());
939+
940+
// Test pagination with a cursor
941+
// First, get the first page starting from the beginning
942+
let response =
943+
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, None).unwrap();
944+
// Use one of the middle keys as a cursor to get remaining keys
945+
let cursor = response.keys[2].clone(); // Should be "key_c"
946+
let response2 =
947+
PaginatedKVStoreSync::list_paginated(&fs_store, primary, secondary, Some(cursor))
948+
.unwrap();
949+
// Should return the keys after "key_c" in the sorted order
950+
assert_eq!(response2.keys.len(), 2);
951+
assert_eq!(response2.keys[0], "key_b");
952+
assert_eq!(response2.keys[1], "key_a");
953+
954+
// Test with non-existent namespace returns empty
955+
let response =
956+
PaginatedKVStoreSync::list_paginated(&fs_store, "nonexistent", "", None).unwrap();
957+
assert!(response.keys.is_empty());
958+
assert!(response.last_key.is_none());
959+
}
960+
795961
#[test]
796962
fn test_data_migration() {
797963
let mut source_temp_path = std::env::temp_dir();

0 commit comments

Comments
 (0)