Skip to content

Commit 143d16f

Browse files
benthecarmanclaude
andcommitted
Add PaginatedKVStore support to SqliteStore
Implement rust-lightning's PaginatedKVStore/PaginatedKVStoreSync traits on SqliteStore with a v2→v3 schema migration that adds a created_at column for tracking insertion order. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a3c2ced commit 143d16f

File tree

5 files changed

+901
-41
lines changed

5 files changed

+901
-41
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ proptest = "1.0.0"
9090
regex = "1.5.6"
9191
criterion = { version = "0.7.0", features = ["async_tokio"] }
9292
ldk-node-062 = { package = "ldk-node", version = "=0.6.2" }
93+
ldk-node-070 = { package = "ldk-node", version = "=0.7.0" }
9394

9495
[target.'cfg(not(no_download))'.dev-dependencies]
9596
electrsd = { version = "0.36.1", default-features = false, features = ["legacy", "esplora_a33e97e1", "corepc-node_27_2"] }

src/io/sqlite_store/migrations.rs

Lines changed: 160 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@ use lightning::io;
99
use rusqlite::Connection;
1010

1111
pub(super) fn migrate_schema(
12-
connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16,
12+
connection: &mut Connection, kv_table_name: &str, mut from_version: u16, to_version: u16,
1313
) -> io::Result<()> {
1414
assert!(from_version < to_version);
15-
if from_version == 1 && to_version == 2 {
15+
if from_version == 1 && to_version >= 2 {
1616
migrate_v1_to_v2(connection, kv_table_name)?;
17+
from_version = 2;
18+
}
19+
if from_version == 2 && to_version >= 3 {
20+
migrate_v2_to_v3(connection, kv_table_name)?;
1721
}
1822

1923
Ok(())
@@ -65,11 +69,76 @@ fn migrate_v1_to_v2(connection: &mut Connection, kv_table_name: &str) -> io::Res
6569
Ok(())
6670
}
6771

72+
fn migrate_v2_to_v3(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> {
73+
let tx = connection.transaction().map_err(|e| {
74+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
75+
io::Error::new(io::ErrorKind::Other, msg)
76+
})?;
77+
78+
let old_table = format!("{}_v2_old", kv_table_name);
79+
let map_err = |e: rusqlite::Error| -> io::Error {
80+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
81+
io::Error::new(io::ErrorKind::Other, msg)
82+
};
83+
84+
// Recreate the table to ensure the correct PRIMARY KEY regardless of migration history.
85+
// Tables migrated from v1 have PK (primary_namespace, key) only — missing
86+
// secondary_namespace. Recreating normalizes the schema for all databases.
87+
let rename_sql = format!("ALTER TABLE {} RENAME TO {}", kv_table_name, old_table);
88+
tx.execute(&rename_sql, []).map_err(map_err)?;
89+
90+
let create_table_sql = format!(
91+
"CREATE TABLE {} (
92+
primary_namespace TEXT NOT NULL,
93+
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
94+
key TEXT NOT NULL CHECK (key <> ''),
95+
value BLOB,
96+
created_at INTEGER NOT NULL DEFAULT 0,
97+
PRIMARY KEY (primary_namespace, secondary_namespace, key)
98+
)",
99+
kv_table_name
100+
);
101+
tx.execute(&create_table_sql, []).map_err(map_err)?;
102+
103+
// Copy data and backfill created_at from ROWID for relative ordering
104+
let set_created_at_sql = format!(
105+
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value, created_at)
106+
SELECT primary_namespace, secondary_namespace, key, value, ROWID FROM {}",
107+
kv_table_name, old_table
108+
);
109+
tx.execute(&set_created_at_sql, []).map_err(map_err)?;
110+
111+
let drop_old_sql = format!("DROP TABLE {}", old_table);
112+
tx.execute(&drop_old_sql, []).map_err(map_err)?;
113+
114+
// Create composite index for paginated listing
115+
let sql = format!(
116+
"CREATE INDEX idx_{}_paginated ON {} (primary_namespace, secondary_namespace, created_at DESC, key ASC)",
117+
kv_table_name, kv_table_name
118+
);
119+
tx.execute(&sql, []).map_err(map_err)?;
120+
121+
// Update user_version
122+
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 3u16, |_| Ok(())).map_err(
123+
|e| {
124+
let msg = format!("Failed to upgrade user_version from 2 to 3: {}", e);
125+
io::Error::new(io::ErrorKind::Other, msg)
126+
},
127+
)?;
128+
129+
tx.commit().map_err(|e| {
130+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
131+
io::Error::new(io::ErrorKind::Other, msg)
132+
})?;
133+
134+
Ok(())
135+
}
136+
68137
#[cfg(test)]
69138
mod tests {
70139
use std::fs;
71140

72-
use lightning::util::persist::KVStoreSync;
141+
use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync};
73142
use rusqlite::{named_params, Connection};
74143

75144
use crate::io::sqlite_store::SqliteStore;
@@ -121,7 +190,7 @@ mod tests {
121190
let sql = format!(
122191
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
123192
kv_table_name
124-
);
193+
);
125194
let mut stmt = connection.prepare_cached(&sql).unwrap();
126195

127196
stmt.execute(named_params! {
@@ -159,4 +228,91 @@ mod tests {
159228
// Check we can continue to use the store just fine.
160229
do_read_write_remove_list_persist(&store);
161230
}
231+
232+
#[test]
233+
fn rwrl_post_schema_2_migration() {
234+
let old_schema_version = 2u16;
235+
236+
let mut temp_path = random_storage_path();
237+
temp_path.push("rwrl_post_schema_2_migration");
238+
239+
let db_file_name = "test_db".to_string();
240+
let kv_table_name = "test_table".to_string();
241+
242+
let test_ns = "testspace";
243+
let test_sub = "testsub";
244+
245+
{
246+
// Create a v2 database manually
247+
fs::create_dir_all(temp_path.clone()).unwrap();
248+
let mut db_file_path = temp_path.clone();
249+
db_file_path.push(db_file_name.clone());
250+
251+
let connection = Connection::open(db_file_path.clone()).unwrap();
252+
253+
connection
254+
.pragma(
255+
Some(rusqlite::DatabaseName::Main),
256+
"user_version",
257+
old_schema_version,
258+
|_| Ok(()),
259+
)
260+
.unwrap();
261+
262+
let sql = format!(
263+
"CREATE TABLE IF NOT EXISTS {} (
264+
primary_namespace TEXT NOT NULL,
265+
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
266+
key TEXT NOT NULL CHECK (key <> ''),
267+
value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
268+
);",
269+
kv_table_name
270+
);
271+
connection.execute(&sql, []).unwrap();
272+
273+
// Insert 3 rows in a known order
274+
for i in 0..3 {
275+
let key = format!("key_{}", i);
276+
let sql = format!(
277+
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:ns, :sub, :key, :value);",
278+
kv_table_name
279+
);
280+
let mut stmt = connection.prepare_cached(&sql).unwrap();
281+
stmt.execute(named_params! {
282+
":ns": test_ns,
283+
":sub": test_sub,
284+
":key": key,
285+
":value": vec![i as u8; 8],
286+
})
287+
.unwrap();
288+
}
289+
}
290+
291+
// Open with new code, triggering v2→v3 migration
292+
let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap();
293+
294+
// Verify data survived
295+
for i in 0..3 {
296+
let key = format!("key_{}", i);
297+
let data = store.read(test_ns, test_sub, &key).unwrap();
298+
assert_eq!(data, vec![i as u8; 8]);
299+
}
300+
301+
// Verify paginated listing works and returns entries in ROWID-backfilled order (newest first)
302+
let response =
303+
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
304+
assert_eq!(response.keys.len(), 3);
305+
// ROWIDs were 1, 2, 3 so created_at was backfilled as 1, 2, 3
306+
// Newest first: key_2, key_1, key_0
307+
assert_eq!(response.keys, vec!["key_2", "key_1", "key_0"]);
308+
309+
// Verify we can write new entries and they get proper ordering
310+
KVStoreSync::write(&store, test_ns, test_sub, "key_new", vec![99u8; 8]).unwrap();
311+
let response =
312+
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
313+
assert_eq!(response.keys[0], "key_new");
314+
315+
// Check we can continue to use the store just fine.
316+
do_read_write_remove_list_persist(&store);
317+
}
162318
}

0 commit comments

Comments
 (0)