From 600c6a5a15eca3b9a7e1cc2beee571928d2248cd Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Wed, 13 Nov 2024 02:11:19 +0000 Subject: [PATCH] feat: extend tuple system to js --- packages/infra/client/actor-kv/src/key.rs | 179 ++++++++++++++ packages/infra/client/actor-kv/src/lib.rs | 157 +++++------- packages/infra/client/actor-kv/src/utils.rs | 54 ++++ .../isolate-v8-runner/js/40_rivet_kv.js | 230 ++++++++++++------ .../isolate-v8-runner/js/90_rivet_ns.js | 1 + .../client/isolate-v8-runner/src/ext/kv.rs | 113 +++++++-- 6 files changed, 537 insertions(+), 197 deletions(-) create mode 100644 packages/infra/client/actor-kv/src/key.rs diff --git a/packages/infra/client/actor-kv/src/key.rs b/packages/infra/client/actor-kv/src/key.rs new file mode 100644 index 0000000000..038bb6622d --- /dev/null +++ b/packages/infra/client/actor-kv/src/key.rs @@ -0,0 +1,179 @@ +use deno_core::JsBuffer; +use foundationdb::tuple::{ + Bytes, PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, +}; +use serde::Deserialize; + +// TODO: Custom deser impl that uses arrays instead of objects? +#[derive(Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Key { + /// Contains references to v8-owned buffers. Requires no copies. + JsInKey(Vec), + /// Cant use `ToJsBuffer` because of its API, so it gets converted to ToJsBuffer in the KV ext. + JsOutKey(Vec>), +} + +impl std::fmt::Debug for Key { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Key({})", self.len()) + } +} + +impl PartialEq for Key { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Key::JsInKey(a), Key::JsInKey(b)) => a + .iter() + .map(|x| x.as_ref()) + .eq(b.iter().map(|x| x.as_ref())), + (Key::JsOutKey(a), Key::JsOutKey(b)) => a == b, + _ => false, + } + } +} + +impl Eq for Key {} + +impl std::hash::Hash for Key { + fn hash(&self, state: &mut H) { + match self { + Key::JsInKey(js_in_key) => { + for buffer in js_in_key { + state.write(buffer.as_ref()); + } + } + Key::JsOutKey(out_key) => { + for buffer in out_key { + state.write(buffer); + } + } + } + } +} + +impl Key { + pub fn len(&self) -> usize { + match self { + // Arbitrary 4 accounting for nesting overhead + Key::JsInKey(js_in_key) => { + js_in_key.iter().fold(0, |acc, x| acc + x.len()) + 4 * js_in_key.len() + } + // Arbitrary 4 accounting for nesting overhead + Key::JsOutKey(out_key) => { + out_key.iter().fold(0, |acc, x| acc + x.len()) + 4 * out_key.len() + } + } + } +} + +impl TuplePack for Key { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + match self { + Key::JsInKey(tuple) => { + let mut offset = VersionstampOffset::None { size: 0 }; + + w.write_all(&[NESTED])?; + offset += 1; + + for v in tuple.iter() { + offset += v.as_ref().pack(w, tuple_depth.increment())?; + } + + w.write_all(&[NIL])?; + offset += 1; + + Ok(offset) + } + Key::JsOutKey(_) => unreachable!("should not be packing out keys"), + } + } +} + +impl<'de> TupleUnpack<'de> for Key { + fn unpack(mut input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + input = parse_code(input, NESTED)?; + + let mut vec = Vec::new(); + while !is_end_of_tuple(input, true) { + let (rem, v) = Bytes::unpack(input, tuple_depth.increment())?; + input = rem; + vec.push(v.into_owned()); + } + + input = parse_code(input, NIL)?; + + Ok((input, Key::JsOutKey(vec))) + } +} + +/// Same as Key::JsInKey except when packing, it leaves off the NIL byte to allow for an open range. +#[derive(Deserialize)] +pub struct ListKey(Vec); + +impl TuplePack for ListKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + w.write_all(&[NESTED])?; + offset += 1; + + for v in self.0.iter() { + offset += v.as_ref().pack(w, tuple_depth.increment())?; + } + + // No ending NIL byte compared to `Key::pack` + + Ok(offset) + } +} + +impl ListKey { + pub fn len(&self) -> usize { + // Arbitrary 4 accounting for nesting overhead + self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len() + } +} + +// === Copied from foundationdbrs === +const NIL: u8 = 0x00; +const NESTED: u8 = 0x05; +const ESCAPE: u8 = 0xff; + +#[inline] +fn parse_byte(input: &[u8]) -> PackResult<(&[u8], u8)> { + if input.is_empty() { + Err(PackError::MissingBytes) + } else { + Ok((&input[1..], input[0])) + } +} + +fn parse_code(input: &[u8], expected: u8) -> PackResult<&[u8]> { + let (input, found) = parse_byte(input)?; + if found == expected { + Ok(input) + } else { + Err(PackError::BadCode { + found, + expected: Some(expected), + }) + } +} + +fn is_end_of_tuple(input: &[u8], nested: bool) -> bool { + match input.first() { + None => true, + _ if !nested => false, + Some(&NIL) => Some(&ESCAPE) != input.get(1), + _ => false, + } +} diff --git a/packages/infra/client/actor-kv/src/lib.rs b/packages/infra/client/actor-kv/src/lib.rs index 5f26281a59..0c8f241c92 100644 --- a/packages/infra/client/actor-kv/src/lib.rs +++ b/packages/infra/client/actor-kv/src/lib.rs @@ -4,15 +4,17 @@ use std::{ }; use anyhow::*; -use deno_core::{JsBuffer, ToJsBuffer}; -use foundationdb::{self as fdb, directory::Directory}; +use deno_core::JsBuffer; +use foundationdb::{self as fdb, directory::Directory, tuple::Subspace}; use futures_util::{StreamExt, TryStreamExt}; -use metadata::Metadata; +use key::{Key, ListKey}; +pub use metadata::Metadata; use prost::Message; use serde::{Deserialize, Serialize}; -use utils::TransactionExt; +use utils::{validate_entries, validate_keys, TransactionExt}; use uuid::Uuid; +pub mod key; mod metadata; mod utils; @@ -23,11 +25,12 @@ const MAX_PUT_PAYLOAD_SIZE: usize = 976 * 1024; const MAX_STORAGE_SIZE: usize = 1024 * 1024 * 1024; // 1 GiB const VALUE_CHUNK_SIZE: usize = 1000; // 1 KB, not KiB +// Currently designed largely around the Deno runtime. More abstractions can be made later. pub struct ActorKv { version: &'static str, db: fdb::Database, actor_id: Uuid, - subspace: Option, + subspace: Option, } impl ActorKv { @@ -65,7 +68,7 @@ impl ActorKv { } /// Returns estimated size of the given subspace. - pub async fn get_subspace_size(&self, subspace: &fdb::tuple::Subspace) -> Result { + pub async fn get_subspace_size(&self, subspace: &Subspace) -> Result { let (start, end) = subspace.range(); // This txn does not have to be committed because we are not modifying any data @@ -76,7 +79,7 @@ impl ActorKv { } /// Gets keys from the KV store. - pub async fn get(&self, keys: Vec) -> Result> { + pub async fn get(&self, keys: Vec) -> Result> { let subspace = self .subspace .as_ref() @@ -114,19 +117,13 @@ impl ActorKv { bail!("unexpected sub key: {sub_key:?}"); } - Ok(SubKey { - key: key.clone(), - data: SubKeyData::Metadata(value), - }) + Ok((key.clone(), SubKey::Metadata(value))) } else { // Parse sub key as idx let (_, idx) = key_subspace .unpack::<(String, usize)>(value.key())?; - Ok(SubKey { - key: key.clone(), - data: SubKeyData::Chunk(idx, value), - }) + Ok((key.clone(), SubKey::Chunk(idx, value))) } } Err(err) => Err(err.into()), @@ -137,8 +134,8 @@ impl ActorKv { // Should remain in order .buffered(32) .flatten() - .try_fold(HashMap::new(), |mut acc, sub_key| async { - acc.entry(sub_key.key.clone()) + .try_fold(HashMap::new(), |mut acc, (key, sub_key)| async { + acc.entry(key) .or_insert_with(EntryBuilder::default) .add_sub_key(sub_key)?; @@ -165,7 +162,7 @@ impl ActorKv { query: ListQuery, reverse: bool, limit: Option, - ) -> Result> { + ) -> Result> { let subspace = self .subspace .as_ref() @@ -200,39 +197,34 @@ impl ActorKv { Ok(value) => { // Parse key as string if let Ok((key, sub_key)) = - subspace.unpack::<(String, String)>(value.key()) + subspace.unpack::<(Key, String)>(value.key()) { if sub_key != "metadata" { bail!("unexpected sub key: {sub_key:?}"); } - Ok(SubKey { - key, - data: SubKeyData::Metadata(value), - }) + Ok((key, SubKey::Metadata(value))) } else { // Parse sub key as idx let (key, _, idx) = - subspace.unpack::<(String, String, usize)>(value.key())?; + subspace.unpack::<(Key, String, usize)>(value.key())?; - Ok(SubKey { - key, - data: SubKeyData::Chunk(idx, value), - }) + Ok((key, SubKey::Chunk(idx, value))) } } Err(err) => Err(err.into()), } }); + // With a limit, we short circuit out of the `try_fold` once the limit is reached if let Some(limit) = limit { stream - .try_fold(HashMap::new(), |mut acc, sub_key| async { + .try_fold(HashMap::new(), |mut acc, (key, sub_key)| async { let size = acc.len(); - let entry = acc.entry(sub_key.key.clone()); + let entry = acc.entry(key); - // Short circuit when limit is reached. This relies on data from the stream being - // in order. + // Short circuit when limit is reached. This relies on data from the stream + // being in order. if size == limit && matches!(entry, hash_map::Entry::Vacant(_)) { return Err(ListLimitReached(acc).into()); } @@ -251,8 +243,8 @@ impl ActorKv { }) } else { stream - .try_fold(HashMap::new(), |mut acc, sub_key| async { - acc.entry(sub_key.key.clone()) + .try_fold(HashMap::new(), |mut acc, (key, sub_key)| async { + acc.entry(key) .or_insert_with(EntryBuilder::default) .add_sub_key(sub_key)?; @@ -288,7 +280,7 @@ impl ActorKv { } /// Puts keys into the KV store. - pub async fn put(&self, entries: HashMap) -> Result<()> { + pub async fn put(&self, entries: HashMap) -> Result<()> { let subspace = self .subspace .as_ref() @@ -351,7 +343,7 @@ impl ActorKv { } /// Deletes keys from the KV store. Returns true for keys that existed before deletion. - pub async fn delete(&self, keys: Vec) -> Result> { + pub async fn delete(&self, keys: Vec) -> Result> { let subspace = self .subspace .as_ref() @@ -385,6 +377,22 @@ impl ActorKv { .map_err(Into::into) } + /// Deletes all keys from the KV store. + pub async fn delete_all(&self) -> Result<()> { + let subspace = self + .subspace + .as_ref() + .context("must call `ActorKv::init` before using KV operations")?; + + self.db + .run(|tx, _mc| async move { + tx.clear_subspace_range(&subspace); + Ok(()) + }) + .await + .map_err(Into::into) + } + /// **Destroys entire actor's KV. Cannot be undone.** pub async fn destroy(self) -> Result<()> { let root = fdb::directory::DirectoryLayer::default(); @@ -408,15 +416,15 @@ struct EntryBuilder { impl EntryBuilder { fn add_sub_key(&mut self, sub_key: SubKey) -> Result<()> { - match sub_key.data { - SubKeyData::Metadata(value) => { + match sub_key { + SubKey::Metadata(value) => { // We ignore setting the metadata again because it means the same key was given twice in the // input keys for `ActorKv::get`. We don't perform automatic deduplication. if self.metadata.is_none() { self.metadata = Some(Metadata::decode(value.value())?); } } - SubKeyData::Chunk(idx, value) => { + SubKey::Chunk(idx, value) => { // We don't perform deduplication on the input keys for `ActorKv::get` so we might have // duplicate data chunks. This idx check ignores chunks that were already passed and ensures // contiguity. @@ -430,14 +438,14 @@ impl EntryBuilder { Ok(()) } - fn build(self, key: &str) -> Result { + fn build(self, key: &Key) -> Result { ensure!(!self.value.is_empty(), "empty value at key {key:?}"); Ok(Entry { metadata: self .metadata .with_context(|| format!("no metadata for key {key:?}"))?, - value: self.value.into(), + value: self.value, }) } } @@ -445,17 +453,12 @@ impl EntryBuilder { /// Represents a Rivet KV value. #[derive(Serialize)] pub struct Entry { - metadata: Metadata, - value: ToJsBuffer, + pub metadata: Metadata, + pub value: Vec, } /// Represents FDB keys within a Rivet KV key. -struct SubKey { - key: String, - data: SubKeyData, -} - -enum SubKeyData { +enum SubKey { Metadata(fdb::future::FdbValue), Chunk(usize, fdb::future::FdbValue), } @@ -464,13 +467,13 @@ enum SubKeyData { #[serde(rename_all = "camelCase")] pub enum ListQuery { All, - RangeInclusive(String, String), - RangeExclusive(String, String), - Prefix(String), + RangeInclusive(ListKey, Key), + RangeExclusive(ListKey, Key), + Prefix(ListKey), } impl ListQuery { - fn range(&self, subspace: &fdb::tuple::Subspace) -> (Vec, Vec) { + fn range(&self, subspace: &Subspace) -> (Vec, Vec) { match self { ListQuery::All => subspace.range(), ListQuery::RangeInclusive(start, end) => ( @@ -521,7 +524,7 @@ impl ListQuery { } // Used to short circuit after the -struct ListLimitReached(HashMap); +struct ListLimitReached(HashMap); impl std::fmt::Debug for ListLimitReached { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -536,49 +539,3 @@ impl std::fmt::Display for ListLimitReached { } impl std::error::Error for ListLimitReached {} - -fn validate_keys(keys: &[String]) -> Result<()> { - ensure!(keys.len() <= MAX_KEYS, "a maximum of 128 keys is allowed"); - - for key in keys { - ensure!( - key.len() <= MAX_KEY_SIZE, - "key is too long (max 2048 bytes)" - ); - } - - Ok(()) -} - -fn validate_entries(entries: &HashMap, total_size: usize) -> Result<()> { - ensure!( - entries.len() <= MAX_KEYS, - "A maximum of 128 key-value entries is allowed" - ); - let payload_size = entries - .iter() - .fold(0, |acc, (k, v)| acc + k.len() + v.len()); - ensure!( - payload_size <= MAX_PUT_PAYLOAD_SIZE, - "total payload is too large (max 976 KiB)" - ); - - let storage_remaining = MAX_STORAGE_SIZE.saturating_sub(total_size); - ensure!( - payload_size <= storage_remaining, - "not enough space left in storage ({storage_remaining} bytes remaining, current payload is {payload_size} bytes)" - ); - - for (key, value) in entries { - ensure!( - key.len() <= MAX_KEY_SIZE, - "key is too long (max 2048 bytes)" - ); - ensure!( - value.len() <= MAX_VALUE_SIZE, - "value for key {key:?} is too large (max 128 KiB)" - ); - } - - Ok(()) -} diff --git a/packages/infra/client/actor-kv/src/utils.rs b/packages/infra/client/actor-kv/src/utils.rs index c456d4c511..b77a63577c 100644 --- a/packages/infra/client/actor-kv/src/utils.rs +++ b/packages/infra/client/actor-kv/src/utils.rs @@ -1,6 +1,14 @@ +use std::{collections::HashMap, result::Result::Ok}; + +use anyhow::*; +use deno_core::JsBuffer; use foundationdb as fdb; use futures_util::{FutureExt, TryStreamExt}; +use crate::{ + key::Key, MAX_KEYS, MAX_KEY_SIZE, MAX_PUT_PAYLOAD_SIZE, MAX_STORAGE_SIZE, MAX_VALUE_SIZE, +}; + pub trait TransactionExt { /// Owned version of `Transaction.get_ranges`. fn get_ranges_owned<'a>( @@ -68,3 +76,49 @@ pub fn now() -> i64 { .try_into() .expect("now doesn't fit in i64") } + +pub fn validate_keys(keys: &[Key]) -> Result<()> { + ensure!(keys.len() <= MAX_KEYS, "a maximum of 128 keys is allowed"); + + for key in keys { + ensure!( + key.len() <= MAX_KEY_SIZE, + "key is too long (max 2048 bytes)" + ); + } + + Ok(()) +} + +pub fn validate_entries(entries: &HashMap, total_size: usize) -> Result<()> { + ensure!( + entries.len() <= MAX_KEYS, + "A maximum of 128 key-value entries is allowed" + ); + let payload_size = entries + .iter() + .fold(0, |acc, (k, v)| acc + k.len() + v.len()); + ensure!( + payload_size <= MAX_PUT_PAYLOAD_SIZE, + "total payload is too large (max 976 KiB)" + ); + + let storage_remaining = MAX_STORAGE_SIZE.saturating_sub(total_size); + ensure!( + payload_size <= storage_remaining, + "not enough space left in storage ({storage_remaining} bytes remaining, current payload is {payload_size} bytes)" + ); + + for (key, value) in entries { + ensure!( + key.len() <= MAX_KEY_SIZE, + "key is too long (max 2048 bytes)" + ); + ensure!( + value.len() <= MAX_VALUE_SIZE, + "value is too large (max 128 KiB)" + ); + } + + Ok(()) +} diff --git a/packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js b/packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js index c17e7eb51c..764a9bd08f 100644 --- a/packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js +++ b/packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js @@ -6,21 +6,22 @@ import { op_rivet_kv_put_batch, op_rivet_kv_delete, op_rivet_kv_delete_batch, + op_rivet_kv_delete_all, } from "ext:core/ops"; import { core } from "ext:core/mod.js"; /** * Retrieves a value from the key-value store. * - * @param {string} key - The key to retrieve the value for. + * @param {any|any[]} key - The key to retrieve the value for. * @param {Object} [options] - Options. - * @param {('value'|'arrayBuffer')} [options.format] - The format in which to return the data. - * If "arrayBuffer", returns an ArrayBuffer. - * Otherwise, returns the deserialized value. + * @param {('value'|'arrayBuffer')} [options.format] - The format in which to return the value. + * If "arrayBuffer", returns an ArrayBuffer. + * Otherwise, returns the deserialized value. * @returns {Promise} The retrieved value, or undefined if the key does not exist. */ async function get(key, options) { - let entry = (await op_rivet_kv_get(key)) ?? undefined; + let entry = (await op_rivet_kv_get(serializeKey(key))) ?? undefined; return deserializeValue(key, entry.value, options?.format); } @@ -30,19 +31,20 @@ async function get(key, options) { * * @param {string[]} keys - A list of keys to retrieve. * @param {Object} [options] - Options. - * @param {('value'|'arrayBuffer')} [options.format] - The format in which to return the data. - * If "arrayBuffer", returns an ArrayBuffer. - * Otherwise, returns the deserialized value. + * @param {('value'|'arrayBuffer')} [options.format] - The format in which to return the value. + * If "arrayBuffer", returns an ArrayBuffer. + * Otherwise, returns the deserialized value. * @returns {Promise>} The retrieved values. Keys that have no value in the key-value store - * will not be present. + * will not be present. */ async function getBatch(keys, options) { - let entries = await op_rivet_kv_get_batch(keys); + let entries = await op_rivet_kv_get_batch(keys.map((x) => serializeKey(x))); let deserializedValues = new Map(); - for (let key in entries) { - deserializedValues.set(key, deserializeValue(key, entries[key].value, options?.format)); + for (let [key, entry] of entries) { + let jsKey = deserializeKey(key); + deserializedValues.set(jsKey, deserializeValue(jsKey, entry.value, options?.format)); } return deserializedValues; @@ -53,54 +55,55 @@ async function getBatch(keys, options) { * is used for filtering. * * @param {Object} [options] - Options. - * @param {('value'|'arrayBuffer')} [options.format] - The format in which to return the data. - * If "arrayBuffer", returns an ArrayBuffer. - * Otherwise, returns the deserialized value. - * @param {string} [options.start] - The key to start listing results from (inclusive). + * @param {('value'|'arrayBuffer')} [options.format] - The format in which to return the value. + * If "arrayBuffer", returns an ArrayBuffer. + * Otherwise, returns the deserialized value. + * @param {string} [options.start] - The key to start listing results from (inclusive). Cannot be used with + * startAfter or prefix. * @param {string} [options.startAfter] - The key to start listing results after (exclusive). Cannot be used - * with start. + * with start or prefix. * @param {string} [options.end] - The key to end listing results at (exclusive). - * @param {string} [options.prefix] - Restricts results to keys that start with the given prefix. - * @param {boolean} [options.reverse] - If true, results are returned in descending order. Start still defines - * the smallest key and end still defines the largest key in lexicographic order. + * @param {string} [options.prefix] - Restricts results to keys that start with the given prefix. Cannot be + * used with start or startAfter. + * @param {boolean} [options.reverse] - If true, results are returned in descending order. * @param {number} [options.limit] - The maximum number of key-value pairs to return. - * @returns {Promise>} The retrieved values. Keys that have no value in the key-value store - * will not be present. + * @returns {Promise>} The retrieved values. */ async function list(options) { + // Build query let query; - if (options.prefix) { + if (options?.prefix) { query = { - prefix: options.prefix, + prefix: serializeListKey(options.prefix), }; - } else if (options.start) { + } else if (options?.start) { if (!options.end) { throw new Error("must set options.end with options.start"); } query = { - rangeInclusive: [options.start, options.end], + rangeInclusive: [serializeListKey(options.start), serializeKey(options.end)], }; - } else if (options.startAfter) { + } else if (options?.startAfter) { if (!options.end) { throw new Error("must set options.end with options.startAfter"); } query = { - rangeExclusive: [options.startAfter, options.end], + rangeExclusive: [serializeListKey(options.startAfter), serializeKey(options.end)], }; - } else if (options.end) { + } else if (options?.end) { throw new Error("must set options.start or options.startAfter with options.end"); } else { query = { all: {} }; } let entries = await op_rivet_kv_list(query, options?.reverse ?? false, options?.limit); - let deserializedValues = new Map(); - for (let key in entries) { - deserializedValues.set(key, deserializeValue(key, entries[key].value, options?.format)); + for (let [key, entry] of entries) { + let jsKey = deserializeKey(key); + deserializedValues.set(jsKey, deserializeValue(jsKey, entry.value, options?.format)); } return deserializedValues; @@ -109,28 +112,54 @@ async function list(options) { /** * Stores a key-value pair in the key-value store. * - * @param {string} key - The key under which the value will be stored. - * @param {any} value - The value to be stored, which will be serialized. + * @param {any|any[]} key - The key under which the value will be stored. + * @param {any|ArrayBuffer} value - The value to be stored, which will be serialized. + * @param {Object} [options] - Options. + * @param {('value'|'arrayBuffer')} [options.format] - The format in which to write the value. `value` must + * be an ArrayBuffer if this is set to arrayBuffer. * @returns {Promise} A promise that resolves when the operation is complete. */ -async function put(key, value) { - validateType(value); +async function put(key, givenValue, options) { + validateType(givenValue, null, options?.format); + let format = options?.format ?? "value"; + + let value; + if (format == "value") { + value = core.serialize(givenValue, { forStorage: true }); + } else if (format == "arrayBuffer") { + value = new Uint8Array(givenValue); + } - await op_rivet_kv_put(key, core.serialize(value, { forStorage: true })); + await op_rivet_kv_put(serializeKey(key), value); } /** * Asynchronously stores a batch of key-value pairs. * - * @param {Object} obj - An object containing key-value pairs to be stored. + * @param {Record} obj - An object containing key-value pairs to be stored. + * @param {any|ArrayBuffer} value - The value to be stored, which will be serialized. + * @param {Object} [options] - Options. + * @param {('value'|'arrayBuffer')} [options.format] - The format in which to write the values. values in + * `obj` must be ArrayBuffers if this is set to arrayBuffer. * @returns {Promise} A promise that resolves when the batch operation is complete. */ -async function putBatch(obj) { +async function putBatch(obj, options) { let serializedObj = new Map(); + let format = options?.format ?? "value"; for (let key in obj) { - validateType(obj[key], key); - serializedObj.set(key, core.serialize(obj[key], { forStorage: true })); + let givenValue = obj[key]; + + validateType(givenValue, key, format); + + let value; + if (format == "value") { + value = core.serialize(givenValue, { forStorage: true }); + } else if (format == "arrayBuffer") { + value = new Uint8Array(givenValue); + } + + serializedObj.set(serializeKey(key), value); } await op_rivet_kv_put_batch(serializedObj); @@ -143,56 +172,99 @@ async function putBatch(obj) { * @returns {Promise} A promise that resolves when the operation is complete. */ async function delete_(key) { - return await op_rivet_kv_delete(key); + return await op_rivet_kv_delete(serializeKey(key)); } async function deleteBatch(keys) { - return await op_rivet_kv_delete_batch(keys); + return await op_rivet_kv_delete_batch(keys.map((x) => serializeKey(x))); +} + +/** + * Deletes all data from the key-value store. **This CANNOT be undone.** + * + * @returns {Promise} A promise that resolves when the operation is complete. + */ +async function deleteAll() { + return await op_rivet_kv_delete_all(); } -// See https://github.com/denoland/deno/issues/12067#issuecomment-1975001079 -function validateType(value, key) { +function validateType(value, key, format = "value") { let keyText = key ? ` in key "{key}"` : ""; - if (value instanceof Blob) { - throw new Error( - `The type ${value.constructor.name}${keyText} is not serializable in Deno, but you can use a TypedArray instead. See https://github.com/denoland/deno/issues/12067#issuecomment-1975001079.` - ); + if (format == "value") { + if (value instanceof Blob) { + throw new Error( + `the type ${value.constructor.name}${keyText} is not serializable in Deno, but you can use a TypedArray instead. See https://github.com/denoland/deno/issues/12067#issuecomment-1975001079.` + ); + } + if ( + value instanceof CryptoKey || + value instanceof DOMException || + // Not defined in Deno + // value instanceof RTCCertificate || + // We don't load in the canvas ext into the the Deno runtime for Rivet + // value instanceof ImageBitmap || + value instanceof ImageData + ) { + throw new Error( + `the type ${value.constructor.name}${keyText} is not serializable in Deno. See https://github.com/denoland/deno/issues/12067#issuecomment-1975001079.` + ); + } + } else if (format == "arrayBuffer") { + if (!(value instanceof ArrayBuffer)) { + throw new Error(`value must be an ArrayBuffer if options.format = "arrayBuffer".`); + } + } else { + throw new Error("unexpected key type from KV driver"); + } +} + +function serializeKey(key) { + if (key instanceof Array) { + return { jsInKey: [key.map((x) => core.serialize(x))] }; + } else { + return { jsInKey: [core.serialize(key)] }; } - if ( - value instanceof CryptoKey || - value instanceof DOMException || - // Not defined in Deno - // value instanceof RTCCertificate || - // We don't load in the canvas ext into the the Deno runtime for Rivet - // value instanceof ImageBitmap || - value instanceof ImageData - ) { - throw new Error( - `The type ${value.constructor.name}${keyText} is not serializable in Deno. See https://github.com/denoland/deno/issues/12067#issuecomment-1975001079.` - ); +} + +function serializeListKey(key) { + if (key instanceof Array) { + return key.map((x) => core.serialize(x)); + } else { + return [core.serialize(key)]; } } -function deserializeValue(key, data, format = "value") { - if (data != undefined) { - if (format == "value") { - try { - return core.deserialize(data, { forStorage: true }); - } catch (e) { - throw new Error( - `could not deserialize data in key "${key}". you must use options.format = "arrayBuffer".`, - { cause: e } - ); - } - } else if (format == "arrayBuffer") { - return data.buffer; - } else { - throw Error(`invalid format: "${options.format}". expected "value" or "arrayBuffer".`); - } +function deserializeKey(key) { + if ("inKey" in key || "outKey" in key) { + let jsKey = key.inKey ?? key.outKey; + + let tuple = jsKey[0].map((x) => core.deserialize(x)); + + if (tuple.length == 1) return tuple[0]; + else return tuple; + } else { + throw new Error("unexpected key type from KV driver"); } +} + +function deserializeValue(key, value, format = "value") { + if (value == undefined) return value; - return undefined; + if (format == "value") { + try { + return core.deserialize(value, { forStorage: true }); + } catch (e) { + throw new Error( + `could not deserialize value in key "${key}". you must use options.format = "arrayBuffer".`, + { cause: e } + ); + } + } else if (format == "arrayBuffer") { + return value.buffer; + } else { + throw Error(`invalid format: "${options.format}". expected "value" or "arrayBuffer".`); + } } -export { get, getBatch, list, put, putBatch, delete_, deleteBatch }; +export { get, getBatch, list, put, putBatch, delete_, deleteBatch, deleteAll }; diff --git a/packages/infra/client/isolate-v8-runner/js/90_rivet_ns.js b/packages/infra/client/isolate-v8-runner/js/90_rivet_ns.js index d77348f5cc..280a9dcd4c 100644 --- a/packages/infra/client/isolate-v8-runner/js/90_rivet_ns.js +++ b/packages/infra/client/isolate-v8-runner/js/90_rivet_ns.js @@ -9,6 +9,7 @@ const rivetNs = { putBatch: kv.putBatch, delete: kv.delete_, deleteBatch: kv.deleteBatch, + deleteAll: kv.deleteAll, }, }; diff --git a/packages/infra/client/isolate-v8-runner/src/ext/kv.rs b/packages/infra/client/isolate-v8-runner/src/ext/kv.rs index ae747b0246..8f0e3a23ae 100644 --- a/packages/infra/client/isolate-v8-runner/src/ext/kv.rs +++ b/packages/infra/client/isolate-v8-runner/src/ext/kv.rs @@ -1,6 +1,9 @@ use std::{collections::HashMap, future::Future, sync::Arc}; -use deno_core::{error::AnyError, op2, JsBuffer, OpState}; +use deno_core::{error::AnyError, op2, JsBuffer, OpState, ToJsBuffer}; +use serde::Serialize; + +type FakeMap = Box<[(T, U)]>; deno_core::extension!( rivet_kv, @@ -12,6 +15,7 @@ deno_core::extension!( op_rivet_kv_put_batch, op_rivet_kv_delete, op_rivet_kv_delete_batch, + op_rivet_kv_delete_all, ], esm = [ dir "js", @@ -25,26 +29,72 @@ deno_core::extension!( }, ); +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +enum Key { + InKey(Vec), + OutKey(Vec), +} + +impl From for Key { + fn from(value: actor_kv::key::Key) -> Self { + match value { + actor_kv::key::Key::JsInKey(tuple) => Key::InKey(tuple), + actor_kv::key::Key::JsOutKey(tuple) => { + Key::OutKey(tuple.into_iter().map(Into::into).collect()) + } + } + } +} + +#[derive(Serialize)] +struct Entry { + metadata: actor_kv::Metadata, + value: ToJsBuffer, +} + +impl From for Entry { + fn from(value: actor_kv::Entry) -> Self { + Entry { + metadata: value.metadata, + value: value.value.into(), + } + } +} + #[op2(async)] #[serde] pub fn op_rivet_kv_get( state: &mut OpState, - #[string] key: String, -) -> Result, AnyError>>, AnyError> { + #[serde] key: actor_kv::key::Key, +) -> Result, AnyError>>, AnyError> { let kv = state.borrow::>().clone(); - Ok(async move { kv.get(vec![key]).await.map(|res| res.into_values().next()) }) + Ok(async move { + let res = kv.get(vec![key.into()]).await?; + + Ok(res.into_values().next().map(Into::into)) + }) } #[op2(async)] #[serde] pub fn op_rivet_kv_get_batch( state: &mut OpState, - #[serde] keys: Vec, -) -> Result, AnyError>>, AnyError> { + #[serde] keys: Vec, +) -> Result, AnyError>>, AnyError> { let kv = state.borrow::>().clone(); - Ok(async move { kv.get(keys).await }) + Ok(async move { + let res = kv + .get(keys.into_iter().map(Into::into).collect()) + .await? + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + + Ok(res) + }) } #[op2(async)] @@ -54,16 +104,25 @@ pub fn op_rivet_kv_list( #[serde] query: actor_kv::ListQuery, reverse: bool, limit: Option, -) -> Result, AnyError>>, AnyError> { +) -> Result, AnyError>>, AnyError> { let kv = state.borrow::>().clone(); - Ok(async move { kv.list(query, reverse, limit.map(|x| x as usize)).await }) + Ok(async move { + let res = kv + .list(query.into(), reverse, limit.map(|x| x as usize)) + .await? + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + + Ok(res) + }) } #[op2(async)] pub fn op_rivet_kv_put( state: &mut OpState, - #[string] key: String, + #[serde] key: actor_kv::key::Key, #[buffer] value: JsBuffer, ) -> Result>, AnyError> { let kv = state.borrow::>().clone(); @@ -74,7 +133,7 @@ pub fn op_rivet_kv_put( #[op2(async)] pub fn op_rivet_kv_put_batch( state: &mut OpState, - #[serde] obj: HashMap, + #[serde] obj: HashMap, ) -> Result>, AnyError> { let kv = state.borrow::>().clone(); @@ -84,14 +143,14 @@ pub fn op_rivet_kv_put_batch( #[op2(async)] pub fn op_rivet_kv_delete( state: &mut OpState, - #[string] key: String, + #[serde] key: actor_kv::key::Key, ) -> Result>, AnyError> { let kv = state.borrow::>().clone(); Ok(async move { - kv.delete(vec![key]) - .await - .map(|res| res.into_values().next().unwrap_or_default()) + let res = kv.delete(vec![key]).await?; + + Ok(res.into_values().next().unwrap_or_default()) }) } @@ -99,9 +158,27 @@ pub fn op_rivet_kv_delete( #[serde] pub fn op_rivet_kv_delete_batch( state: &mut OpState, - #[serde] keys: Vec, -) -> Result, AnyError>>, AnyError> { + #[serde] keys: Vec, +) -> Result, AnyError>>, AnyError> { + let kv = state.borrow::>().clone(); + + Ok(async move { + let res = kv + .delete(keys) + .await? + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + + Ok(res) + }) +} + +#[op2(async)] +pub fn op_rivet_kv_delete_all( + state: &mut OpState, +) -> Result>, AnyError> { let kv = state.borrow::>().clone(); - Ok(async move { kv.delete(keys).await }) + Ok(async move { kv.delete_all().await }) }