From c17168146a80af27d0c90424b2db80ed0526fad0 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 23 Jun 2024 13:41:24 +0530 Subject: [PATCH 1/3] protocol: improve and fix decode routines --- src/io/aio.rs | 16 +- src/io/sync.rs | 25 +- src/protocol/mod.rs | 1202 +++++++++++++++++++++++++---------------- src/protocol/pipe.rs | 50 +- src/protocol/state.rs | 254 --------- 5 files changed, 796 insertions(+), 751 deletions(-) delete mode 100644 src/protocol/state.rs diff --git a/src/io/aio.rs b/src/io/aio.rs index 86e1193..9f5d943 100644 --- a/src/io/aio.rs +++ b/src/io/aio.rs @@ -26,8 +26,7 @@ use { error::{ClientResult, ConnectionSetupError, Error}, protocol::{ handshake::{ClientHandshake, ServerHandshake}, - state_init::{DecodeState, MRespState, PipelineResult, RState}, - Decoder, + DecodeState, Decoder, MRespState, PipelineResult, RState, }, query::Pipeline, response::{FromResponse, Response}, @@ -168,11 +167,12 @@ impl TcpConnection { return Err(Error::IoError(std::io::ErrorKind::ConnectionReset.into())); } self.buf.extend_from_slice(&buf[..n]); - let mut decoder = Decoder::new(&self.buf, cursor); - match decoder.validate_pipe(pipeline.query_count(), state) { + let (_state, _position) = + Decoder::new(&self.buf, cursor).validate_pipe(pipeline.query_count(), state); + match _state { PipelineResult::Completed(r) => return Ok(r), PipelineResult::Pending(_state) => { - cursor = decoder.position(); + cursor = _position; state = _state; } PipelineResult::Error(e) => return Err(e.into()), @@ -198,13 +198,13 @@ impl TcpConnection { continue; } self.buf.extend_from_slice(&buf[..n]); - let mut decoder = Decoder::new(&self.buf, cursor); - match decoder.validate_response(state) { + let (_state, _position) = Decoder::new(&self.buf, cursor).validate_response(state); + match _state { DecodeState::Completed(resp) => return Ok(resp), DecodeState::ChangeState(_state) => { expected = 1; state = _state; - cursor = decoder.position(); + cursor = _position; } DecodeState::Error(e) => return Err(Error::ProtocolError(e)), } diff --git a/src/io/sync.rs b/src/io/sync.rs index 226084e..d526a06 100644 --- a/src/io/sync.rs +++ b/src/io/sync.rs @@ -28,8 +28,7 @@ use { error::{ClientResult, ConnectionSetupError, Error}, protocol::{ handshake::{ClientHandshake, ServerHandshake}, - state_init::{DecodeState, MRespState, PipelineResult, RState}, - Decoder, + DecodeState, Decoder, MRespState, PipelineResult, RState, }, query::Pipeline, response::{FromResponse, Response}, @@ -163,11 +162,12 @@ impl TcpConnection { return Err(Error::IoError(std::io::ErrorKind::ConnectionReset.into())); } self.buf.extend_from_slice(&buf[..n]); - let mut decoder = Decoder::new(&self.buf, cursor); - match decoder.validate_pipe(pipeline.query_count(), state) { + let (_state, _position) = + Decoder::new(&self.buf, cursor).validate_pipe(pipeline.query_count(), state); + match _state { PipelineResult::Completed(r) => return Ok(r), PipelineResult::Pending(_state) => { - cursor = decoder.position(); + cursor = _position; state = _state; } PipelineResult::Error(e) => return Err(e.into()), @@ -189,15 +189,14 @@ impl TcpConnection { return Err(Error::IoError(std::io::ErrorKind::ConnectionReset.into())); } self.buf.extend_from_slice(&buf[..n]); - let mut decoder = Decoder::new(&self.buf, cursor); - match decoder.validate_response(state) { - DecodeState::ChangeState(new_state) => { - state = new_state; - cursor = decoder.position(); - continue; - } + let (_state, _position) = Decoder::new(&self.buf, cursor).validate_response(state); + match _state { DecodeState::Completed(resp) => return Ok(resp), - DecodeState::Error(e) => return Err(e.into()), + DecodeState::ChangeState(_state) => { + state = _state; + cursor = _position; + } + DecodeState::Error(e) => return Err(Error::ProtocolError(e)), } } } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 8756c33..c36b00c 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -16,24 +16,18 @@ pub mod handshake; mod pipe; -mod state; + +use crate::response::Row; use { - self::state::{ - DecodeState, MetaState, MultiRowState, PendingValue, RState, ResponseState, RowState, - ValueDecodeState, ValueDecodeStateAny, ValueDecodeStateRaw, ValueState, ValueStateMeta, - }, - crate::response::{Response, Row, Value}, + crate::response::{Response, Value}, + std::marker::PhantomData, }; +// re-export +pub(crate) use pipe::{MRespState, PipelineResult}; -pub mod state_init { - pub(crate) use super::{ - pipe::{MRespState, PipelineResult}, - state::{DecodeState, RState}, - }; -} - -pub(crate) type ProtocolResult = Result; +/// A [`Result`] type for results originating from the protocol module +pub type ProtocolResult = Result; /// Errors that can happen when handling protocol level encoding and decoding #[derive(Debug, PartialEq, Clone)] @@ -43,547 +37,853 @@ pub enum ProtocolError { /// The server possibly returned an unknown data type and we can't decode it. Note that this might happen when you use an older client version with /// a newer version of Skytable InvalidServerResponseUnknownDataType, + /// The server responded with an unknown packet structure (are you correctly pairing database and database client versions?) InvalidPacket, } -impl Value { - fn u64(self) -> u64 { - match self { - Self::UInt64(u) => u, - _ => unreachable!(), - } +#[derive(Debug, PartialEq)] +pub enum DecodeState { + ChangeState(RState), + Completed(Response), + Error(ProtocolError), +} + +#[derive(Debug, PartialEq)] +pub struct RState(pub(super) ResponseState); +impl Default for RState { + fn default() -> Self { + RState(ResponseState::Initial) } } +#[derive(Debug, PartialEq)] +pub(crate) enum ResponseState { + Initial, + PValue(PendingValue), + PError, + PRow(ValueStream), + PMultiRow(MultiValueStream), +} + /* - Decoder + decoder */ -#[derive(Debug, PartialEq)] +#[derive(Debug)] +/// Skyhash/2 decoder pub struct Decoder<'a> { b: &'a [u8], i: usize, } impl<'a> Decoder<'a> { + /// the minimum number of bytes pub const MIN_READBACK: usize = 1; + /// Initialize the decoder pub fn new(b: &'a [u8], i: usize) -> Self { Self { b, i } } - pub fn validate_response(&mut self, RState(state): RState) -> DecodeState { - match state { - ResponseState::Initial => self.begin(), - ResponseState::PError => self.resume_error(), - ResponseState::PValue(v) => self.resume_value(v), - ResponseState::PRow(r) => self.resume_row(r), - ResponseState::PMultiRow(mr) => self.resume_rows(mr), - } - } + /// get the current position of the decoder pub fn position(&self) -> usize { self.i } - fn begin(&mut self) -> DecodeState { - match self._cursor_next() { - // TODO(@ohsayan): this is reserved! - 0x0F => return DecodeState::Error(ProtocolError::InvalidServerResponseUnknownDataType), - 0x10 => self.resume_error(), - 0x11 => self.resume_row(RowState::new(ValueStateMeta::zero(), vec![], None)), - 0x12 => return DecodeState::Completed(Response::Empty), - 0x13 => self.resume_rows(MultiRowState::default()), - code => match self.start_decode(true, code, vec![], None) { - Ok(ValueDecodeStateAny::Decoded(v)) => DecodeState::Completed(Response::Value(v)), - Ok(ValueDecodeStateAny::Pending(pv)) => { - DecodeState::ChangeState(RState(ResponseState::PValue(pv))) + pub fn validate_response(mut self, RState(state): RState) -> (DecodeState, usize) { + let ret = match state { + ResponseState::Initial => { + match self.next() { + // TODO(@ohsayan): this is reserved! + 0x0F => DecodeState::Error(ProtocolError::InvalidServerResponseUnknownDataType), + 0x10 => self.complete_error(), + 0x11 => self.complete_row(ValueStream::initialize(&self)), + 0x12 => DecodeState::Completed(Response::Empty), + 0x13 => self.complete_rows(MultiValueStream::initialize(&self)), + code => match PendingValue::next_value_with_code(&mut self, code) { + Ok(ds) => match ds { + ProtocolObjectDecodeState::Completed(c) => { + DecodeState::Completed(Response::Value(c)) + } + ProtocolObjectDecodeState::Pending(pv) => { + DecodeState::ChangeState(RState(ResponseState::PValue(pv))) + } + }, + Err(e) => DecodeState::Error(e), + }, } + } + ResponseState::PValue(pv) => match pv.try_complete_self(&mut self) { + Ok(ds) => match ds { + ProtocolObjectDecodeState::Completed(c) => { + DecodeState::Completed(Response::Value(c)) + } + ProtocolObjectDecodeState::Pending(pv) => { + DecodeState::ChangeState(RState(ResponseState::PValue(pv))) + } + }, Err(e) => DecodeState::Error(e), }, + ResponseState::PError => self.complete_error(), + ResponseState::PRow(vs) => self.complete_row(vs), + ResponseState::PMultiRow(mvs) => self.complete_rows(mvs), + }; + (ret, self.position()) + } + fn complete_error(&mut self) -> DecodeState { + if self.remaining() < 2 { + DecodeState::ChangeState(RState(ResponseState::PError)) + } else { + let bytes: [u8; 2] = [self.next(), self.next()]; + DecodeState::Completed(Response::Error(u16::from_le_bytes(bytes))) } } - fn resume_error(&mut self) -> DecodeState { - if self._remaining() < 2 { - return DecodeState::ChangeState(RState(ResponseState::PError)); - } - let bytes: [u8; 2] = [self._cursor_next(), self._cursor_next()]; - DecodeState::Completed(Response::Error(u16::from_le_bytes(bytes))) - } - fn resume_value(&mut self, PendingValue { state, tmp, stack }: PendingValue) -> DecodeState { - match self.resume_decode(true, state, stack, tmp) { - Ok(ValueDecodeStateAny::Pending(pv)) => { - DecodeState::ChangeState(RState(ResponseState::PValue(pv))) - } - Ok(ValueDecodeStateAny::Decoded(v)) => DecodeState::Completed(Response::Value(v)), + fn complete_row(&mut self, value_stream: ValueStream) -> DecodeState { + match value_stream.complete(self) { + Ok(ds) => match ds { + ProtocolObjectDecodeState::Completed(valuestream) => { + DecodeState::Completed(Response::Row(Row::new(valuestream.items))) + } + ProtocolObjectDecodeState::Pending(prow) => { + DecodeState::ChangeState(RState(ResponseState::PRow(prow))) + } + }, Err(e) => DecodeState::Error(e), } } - fn resume_row(&mut self, mut row_state: RowState) -> DecodeState { - match row_state.meta.md.finished(self) { - Ok(true) => self._decode_row_core(row_state), - Ok(false) => DecodeState::ChangeState(RState(ResponseState::PRow(row_state))), + fn complete_rows(&mut self, mvs: MultiValueStream) -> DecodeState { + match mvs.complete(self) { + Ok(ds) => match ds { + ProtocolObjectDecodeState::Completed(c) => { + DecodeState::Completed(Response::Rows(unsafe { core::mem::transmute(c.items) })) + } + ProtocolObjectDecodeState::Pending(pmv) => { + DecodeState::ChangeState(RState(ResponseState::PMultiRow(pmv))) + } + }, Err(e) => DecodeState::Error(e), } } - fn _decode_row_core(&mut self, mut row_state: RowState) -> DecodeState { - while row_state.row.len() as u64 != row_state.meta.md.val() { - let r = match row_state.tmp.take() { - None => { - if self._cursor_eof() { - return DecodeState::ChangeState(RState(ResponseState::PRow(row_state))); - } - let code = self._cursor_next(); - let stack = vec![]; - self.start_decode(true, code, stack, None) - } - Some(PendingValue { state, tmp, stack }) => { - self.resume_decode(true, state, stack, tmp) - } - }; - let r = match r { - Ok(r) => r, - Err(e) => return DecodeState::Error(e), - }; - match r { - ValueDecodeStateAny::Pending(pv) => { - row_state.tmp = Some(pv); - return DecodeState::ChangeState(RState(ResponseState::PRow(row_state))); - } - ValueDecodeStateAny::Decoded(v) => { - row_state.row.push(v); +} + +impl<'a> Decoder<'a> { + fn next(&mut self) -> u8 { + let r = self.b[self.i]; + self.i += 1; + r + } + fn remaining(&self) -> usize { + self.current().len() + } + fn current(&self) -> &[u8] { + &self.b[self.i..] + } + fn eof(&self) -> bool { + self.current().is_empty() + } + fn cursor_value(&self) -> u8 { + self.current()[0] + } + fn cursor_eq(&self, b: u8) -> bool { + (self.b[self.i.min(self.b.len() - 1)] == b) && !self.eof() + } + fn has_left(&self, s: usize) -> bool { + self.remaining() >= s + } + fn next_chunk(&mut self, size: usize) -> &[u8] { + let current = self.i; + let chunk = &self.b[current..current + size]; + self.i += size; + chunk + } +} + +/* + common state mgmt +*/ + +trait ProtocolObjectState: Sized { + type Value; + fn initialize(decoder: &Decoder) -> Self; + fn complete(self, decoder: &mut Decoder) -> ProtocolResult>; + fn into_value(self) -> Self::Value; +} + +#[derive(Debug, PartialEq)] +enum ProtocolObjectDecodeState { + Completed(T), + Pending(U), +} + +impl> ProtocolObjectDecodeState { + fn try_complete( + self, + decoder: &mut Decoder, + ) -> ProtocolResult> { + match self { + Self::Completed(c) => Ok(ProtocolObjectDecodeState::Completed(c)), + Self::Pending(pv) => match pv.complete(decoder)? { + ProtocolObjectDecodeState::Completed(c) => { + Ok(ProtocolObjectDecodeState::Completed(c.into_value())) } - } - } - DecodeState::Completed(Response::Row(Row::new(row_state.row))) - } - fn resume_rows(&mut self, mut multirow: MultiRowState) -> DecodeState { - macro_rules! finish { - ($completed:expr, $target:expr) => { - match MetaState::try_finish(self, $completed, &mut $target) { - Ok(true) => multirow.md_state += 1, - Ok(false) => { - return DecodeState::ChangeState(RState(ResponseState::PMultiRow(multirow))) - } - Err(e) => return DecodeState::Error(e), + ProtocolObjectDecodeState::Pending(pv) => { + Ok(ProtocolObjectDecodeState::Pending(pv)) } - }; + }, } - finish!(multirow.md_state == 1, &mut multirow.md1_target); - finish!(multirow.md_state == 2, &mut multirow.md2_col_cnt); - while multirow.rows.len() as u64 != multirow.md1_target { - let ret = match multirow.c_row.take() { - Some(r) => self._decode_row_core(r), - None => self._decode_row_core(RowState::new( - ValueStateMeta::new(0, multirow.md2_col_cnt, true), - vec![], - None, - )), - }; - match ret { - DecodeState::Completed(Response::Row(r)) => multirow.rows.push(r), - DecodeState::Completed(_) => unreachable!(), - e @ DecodeState::Error(_) => return e, - DecodeState::ChangeState(RState(ResponseState::PRow(pr))) => { - multirow.c_row = Some(pr); - return DecodeState::ChangeState(RState(ResponseState::PMultiRow(multirow))); - } - DecodeState::ChangeState(_) => unreachable!(), - } + } +} + +#[cfg(test)] +impl ProtocolObjectDecodeState { + fn into_completed(self) -> Option { + match self { + Self::Completed(c) => Some(c), + Self::Pending(_) => None, } - DecodeState::Completed(Response::Rows(multirow.rows)) } } -impl<'a> Decoder<'a> { - fn __resume_decode( - &mut self, - mut value: T, - meta: ValueStateMeta, - ) -> ProtocolResult { - let mut okay = true; - while !(self._cursor_eof() | self._creq(b'\n')) & okay { - okay &= value.update(self._cursor_next()); +/* + protocol objects: + 1. lfsobject -> lf separated object + 2. spobject -> size prefixed object +*/ + +pub(crate) trait LfsObject: Sized { + type State; + fn init_state(decoder: &Decoder) -> (Self, Self::State); + /// return false if the byte can't be accepted + fn update(&mut self, state: &mut Self::State, byte: u8) -> bool; + /// the byte stream has reached EOF. parse this object + fn complete_lfs(self, _: &Self::State, _: &Decoder) -> ProtocolResult { + Ok(self) + } +} + +#[derive(Debug, PartialEq)] +pub(crate) struct LfsValue { + v: T, + state: T::State, +} + +impl ProtocolObjectState for LfsValue { + type Value = T; + fn into_value(self) -> Self::Value { + self.v + } + fn initialize(decoder: &Decoder) -> Self { + let (v, state) = T::init_state(decoder); + Self { v, state } + } + fn complete( + mut self, + decoder: &mut Decoder, + ) -> ProtocolResult> { + let mut stop = decoder.cursor_eq(b'\n'); + let mut error = false; + while !decoder.eof() && !error && !stop { + let byte = decoder.next(); + error = !self.v.update(&mut self.state, byte); + stop = decoder.cursor_eq(b'\n'); } - let lf = self._creq(b'\n'); - self._cursor_incr_if(lf); - // FIXME(@ohsayan): the below is not exactly necessary and we can actually remove this if it complicates state management - okay &= !(lf & (self._cursor() == meta.start)); - if okay & lf { - let start = meta.start; - value - .pack_completed(meta, &self.b[start..self._cursor() - 1]) - .map(ValueDecodeStateRaw::Decoded) - } else { - if okay { - Ok(ValueDecodeStateAny::Pending(value.pack_pending(meta))) - } else { - Err(ProtocolError::InvalidServerResponseForData) + if stop & !error { + decoder.i += 1; // account for LF + let Self { state, v } = self; + match v.complete_lfs(&state, &decoder) { + Ok(v) => Ok(ProtocolObjectDecodeState::Completed(Self { v, state })), + Err(e) => Err(e), } - } - } - fn __resume_psize( - &mut self, - mut meta: ValueStateMeta, - ) -> ProtocolResult { - if !meta.md.finished(self)? { - Ok(ValueDecodeStateRaw::Pending(ValueState::new( - T::empty(), - meta, - ))) } else { - meta.start = self._cursor(); - if self._remaining() as u64 >= meta.md.val() { - let buf = &self.b[meta.start..self._cursor() + meta.md.val() as usize]; - self._cursor_incr_by(meta.md.val() as usize); - T::finish(buf).map(ValueDecodeStateAny::Decoded) + if error { + Err(ProtocolError::InvalidServerResponseForData) } else { - Ok(ValueDecodeStateAny::Pending(ValueState::new( - T::empty(), - meta, - ))) + Ok(ProtocolObjectDecodeState::Pending(self)) } } } } -impl<'a> Decoder<'a> { - fn _cursor(&self) -> usize { - self.i - } - fn _cursor_value(&self) -> u8 { - self.b[self._cursor()] - } - fn _cursor_incr(&mut self) { - self._cursor_incr_by(1) - } - fn _cursor_incr_by(&mut self, b: usize) { - self.i += b; - } - fn _cursor_incr_if(&mut self, iff: bool) { - self._cursor_incr_by(iff as _) - } - fn _cursor_next(&mut self) -> u8 { - let r = self._cursor_value(); - self._cursor_incr(); - r - } - fn _remaining(&self) -> usize { - self.b.len() - self.i +/* + lfs objects with no state mgmt +*/ + +macro_rules! impl_num_lfs_object { + ($($ty:ty),*) => { + $( + impl LfsObject for $ty { + type State = (); + fn init_state(_: &Decoder) -> (Self, Self::State) {(0, ())} + fn update(&mut self, _: &mut Self::State, byte: u8) -> bool { + match self.checked_mul(10).map(|me| me.checked_add((byte & 0x0f) as $ty)) { + Some(Some(v)) if byte.is_ascii_digit() => { *self = v; true }, + _ => false, + } + } + } + )* + }; +} + +impl_num_lfs_object!(u8, u16, u32, u64, usize); + +/* + lfs objects requiring state mgmt +*/ + +#[derive(Debug, PartialEq)] +pub(crate) struct LfsObjectState { + start: usize, +} + +macro_rules! impl_num_lfs_object_state { + ($($ty:ty),*) => { + $( + impl LfsObject for $ty { + type State = LfsObjectState; + fn init_state(decoder: &Decoder) -> (Self, Self::State) { (<$ty as ::core::default::Default>::default(), LfsObjectState { start: decoder.i},) } + fn update(&mut self, _: &mut Self::State, _: u8) -> bool { true } + fn complete_lfs(self, state: &Self::State, decoder: &Decoder) -> ProtocolResult { + let block = &decoder.b[state.start..decoder.i-1]; // -1 for LF + match core::str::from_utf8(block).map(str::parse) { + Ok(Ok(v)) => Ok(v), + _ => Err(ProtocolError::InvalidServerResponseForData), + } + } + } + )* + }; +} + +impl_num_lfs_object_state!(f32, f64, i8, i16, i32, i64, isize); + +/* + spobjects: binary, string +*/ + +trait SpObject: Sized { + fn finish(block: &[u8]) -> ProtocolResult; + fn init() -> Self; +} + +#[derive(Debug, PartialEq)] +pub(crate) struct SpObjectState { + size: ProtocolObjectDecodeState>, + v: T, + _d: PhantomData, +} + +impl ProtocolObjectState for SpObjectState { + type Value = T; + fn initialize(decoder: &Decoder) -> Self { + Self { + size: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)), + v: T::init(), + _d: PhantomData, + } } - fn _cursor_eof(&self) -> bool { - self._remaining() == 0 + fn into_value(self) -> Self::Value { + self.v + } + fn complete( + mut self, + decoder: &mut Decoder, + ) -> ProtocolResult> { + let size = match self.size.try_complete(decoder)? { + ProtocolObjectDecodeState::Completed(c) => c, + ProtocolObjectDecodeState::Pending(pv) => { + self.size = ProtocolObjectDecodeState::Pending(pv); + return Ok(ProtocolObjectDecodeState::Pending(self)); + } + }; + self.size = ProtocolObjectDecodeState::Completed(size); + if decoder.has_left(size) { + let block = decoder.next_chunk(size); + let v = T::finish(block)?; + self.v = v; + Ok(ProtocolObjectDecodeState::Completed(self)) + } else { + Ok(ProtocolObjectDecodeState::Pending(self)) + } } - fn _creq(&self, b: u8) -> bool { - (self.b[core::cmp::min(self.i, self.b.len() - 1)] == b) & !self._cursor_eof() +} + +impl SpObject for Vec { + fn init() -> Self { + vec![] } - fn _current(&self) -> &[u8] { - &self.b[self.i..] + fn finish(block: &[u8]) -> ProtocolResult { + Ok(block.to_owned()) } } -trait DecodeDelimited { - fn update(&mut self, _: u8) -> bool { - true +impl SpObject for String { + fn init() -> Self { + String::new() + } + fn finish(block: &[u8]) -> ProtocolResult { + if core::str::from_utf8(block).is_ok() { + Ok(unsafe { String::from_utf8_unchecked(block.to_owned()) }) + } else { + Err(ProtocolError::InvalidServerResponseForData) + } } - fn pack_completed(self, meta: ValueStateMeta, full_buffer: &[u8]) -> ProtocolResult; - fn pack_pending(self, meta: ValueStateMeta) -> ValueState; } -trait DecodePsize { - fn finish(b: &[u8]) -> ProtocolResult; - fn empty() -> Value; +#[derive(Debug, PartialEq)] +pub(crate) enum PendingValue { + Bool(bool), + UInt8(LfsValue), + UInt16(LfsValue), + UInt32(LfsValue), + UInt64(LfsValue), + SInt8(LfsValue), + SInt16(LfsValue), + SInt32(LfsValue), + SInt64(LfsValue), + Float32(LfsValue), + Float64(LfsValue), + Binary(SpObjectState>), + String(SpObjectState), + List(ValueStream), } -impl DecodePsize for Vec { - fn finish(b: &[u8]) -> ProtocolResult { - Ok(Value::Binary(b.to_owned())) - } - fn empty() -> Value { - Value::Binary(vec![]) +macro_rules! translate_pending_lfs { + ($($base:ident => {$($type:ty as $variant:ident),*}),* $(,)?) => { + $($( + impl From<$base<$type>> for PendingValue { + fn from(t: $base<$type>) -> Self { + PendingValue::$variant(t) + } + } + impl From<$base<$type>> for Value { + fn from(t: $base<$type>) -> Value { + Value::$variant(t.into_value()) + } + } + )*)* } } -impl DecodePsize for String { - fn finish(b: &[u8]) -> ProtocolResult { - core::str::from_utf8(b) - .map(String::from) - .map(Value::String) - .map_err(|_| ProtocolError::InvalidServerResponseForData) +translate_pending_lfs!( + LfsValue => { + u8 as UInt8, u16 as UInt16, u32 as UInt32, u64 as UInt64, i8 as SInt8, i16 as SInt16, i32 as SInt32, i64 as SInt64, f32 as Float32, f64 as Float64 + }, + SpObjectState => {Vec as Binary, String as String}, +); + +impl From for PendingValue { + fn from(value: ValueStream) -> Self { + Self::List(value) } - fn empty() -> Value { - Value::String(String::new()) +} + +impl From for Value { + fn from(value: ValueStream) -> Self { + Self::List(value.items) } } -macro_rules! impl_uint { - ($($ty:ty as $variant:ident),*) => { - $(impl DecodeDelimited for $ty { - fn update(&mut self, b: u8) -> bool { - let mut okay = true; let (r1, of_1) = self.overflowing_mul(10); - okay &= !of_1; let (r2, of_2) = r1.overflowing_add((b & 0x0f) as $ty); - okay &= !of_2; - okay &= b.is_ascii_digit(); *self = r2; okay +impl PendingValue { + fn next_value_with_code( + decoder: &mut Decoder, + code: u8, + ) -> ProtocolResult> { + match code { + 0x00 => Ok(ProtocolObjectDecodeState::Completed(Value::Null)), + 0x01 => Self::decode_bool(decoder), + 0x02 => Self::try_value::>(decoder), + 0x03 => Self::try_value::>(decoder), + 0x04 => Self::try_value::>(decoder), + 0x05 => Self::try_value::>(decoder), + 0x06 => Self::try_value::>(decoder), + 0x07 => Self::try_value::>(decoder), + 0x08 => Self::try_value::>(decoder), + 0x09 => Self::try_value::>(decoder), + 0x0A => Self::try_value::>(decoder), + 0x0B => Self::try_value::>(decoder), + 0x0C => Self::try_value::>>(decoder), + 0x0D => Self::try_value::>(decoder), + 0x0E => Self::try_value::(decoder), + _ => Err(ProtocolError::InvalidServerResponseUnknownDataType), + } + } + fn next_value( + decoder: &mut Decoder, + ) -> ProtocolResult> { + let code = decoder.next(); + Self::next_value_with_code(decoder, code) + } + fn try_complete_self( + self, + decoder: &mut Decoder, + ) -> ProtocolResult> { + match self { + PendingValue::Bool(_) => Self::decode_bool(decoder), + PendingValue::UInt8(pv) => Self::complete_value(pv, decoder), + PendingValue::UInt16(pv) => Self::complete_value(pv, decoder), + PendingValue::UInt32(pv) => Self::complete_value(pv, decoder), + PendingValue::UInt64(pv) => Self::complete_value(pv, decoder), + PendingValue::SInt8(pv) => Self::complete_value(pv, decoder), + PendingValue::SInt16(pv) => Self::complete_value(pv, decoder), + PendingValue::SInt32(pv) => Self::complete_value(pv, decoder), + PendingValue::SInt64(pv) => Self::complete_value(pv, decoder), + PendingValue::Float32(pv) => Self::complete_value(pv, decoder), + PendingValue::Float64(pv) => Self::complete_value(pv, decoder), + PendingValue::Binary(pv) => Self::complete_value(pv, decoder), + PendingValue::String(pv) => Self::complete_value(pv, decoder), + PendingValue::List(pv) => Self::complete_value(pv, decoder), + } + } + fn complete_value + Into>( + current: T, + decoder: &mut Decoder, + ) -> ProtocolResult> { + match current.complete(decoder)? { + ProtocolObjectDecodeState::Completed(c) => { + Ok(ProtocolObjectDecodeState::Completed(c.into())) + } + ProtocolObjectDecodeState::Pending(p) => { + Ok(ProtocolObjectDecodeState::Pending(p.into())) + } + } + } + fn try_value + Into>( + decoder: &mut Decoder, + ) -> ProtocolResult> { + Self::complete_value(T::initialize(decoder), decoder) + } + fn decode_bool( + decoder: &mut Decoder, + ) -> Result, ProtocolError> { + // bool + if !decoder.eof() { + let value = decoder.next(); + if value > 1 { + Err(ProtocolError::InvalidServerResponseForData) + } else { + Ok(ProtocolObjectDecodeState::Completed(Value::Bool( + value == 1, + ))) } - fn pack_pending(self, meta: ValueStateMeta) -> ValueState { ValueState::new(Value::$variant(self), meta) } - fn pack_completed(self, _: ValueStateMeta, _: &[u8]) -> ProtocolResult { Ok(Value::$variant(self)) } - })* + } else { + Ok(ProtocolObjectDecodeState::Pending(PendingValue::Bool( + false, + ))) + } } } -macro_rules! impl_fstr { - ($($ty:ty as $variant:ident),*) => { - $(impl DecodeDelimited for $ty { - fn pack_pending(self, meta: ValueStateMeta) -> ValueState { ValueState::new(Value::$variant(self), meta) } - fn pack_completed(self, _: ValueStateMeta, b: &[u8]) -> ProtocolResult { - core::str::from_utf8(b).map_err(|_| ProtocolError::InvalidServerResponseForData)?.parse().map(Value::$variant).map_err(|_| ProtocolError::InvalidServerResponseForData) - } - })* - }; +/* + value stream: a sequential list of values (state cached) +*/ + +pub(crate) trait AsValueStream: Sized { + fn from_value_stream(v: Vec) -> Self; } -impl_uint!(u8 as UInt8, u16 as UInt16, u32 as UInt32, u64 as UInt64); -impl_fstr!( - i8 as SInt8, - i16 as SInt16, - i32 as SInt32, - i64 as SInt64, - f32 as Float32, - f64 as Float64 -); +impl AsValueStream for Row { + fn from_value_stream(v: Vec) -> Self { + Row::new(v) + } +} -impl<'a> Decoder<'a> { - fn parse_list( - &mut self, - mut stack: Vec<(Vec, ValueStateMeta)>, - mut last: Option, - ) -> ProtocolResult> { - let (mut current_list, mut current_meta) = stack.pop().unwrap(); - loop { - if !current_meta.md.finished(self)? { - return Ok(ValueDecodeStateAny::Pending(PendingValue::new( - ValueState::new(Value::List(vec![]), ValueStateMeta::zero()), - None, - stack, - ))); +#[derive(Debug, PartialEq)] +pub(crate) struct ValueStream { + size: ProtocolObjectDecodeState>, + items: Vec, + pending: Option>, +} + +impl ProtocolObjectState for ValueStream { + type Value = Vec; + fn initialize(decoder: &Decoder) -> Self { + Self { + size: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)), + items: vec![], + pending: None, + } + } + fn complete( + mut self, + decoder: &mut Decoder, + ) -> ProtocolResult> { + let size = match self.size.try_complete(decoder)? { + ProtocolObjectDecodeState::Completed(c) => c, + ProtocolObjectDecodeState::Pending(pv) => { + self.size = ProtocolObjectDecodeState::Pending(pv); + return Ok(ProtocolObjectDecodeState::Pending(self)); } - if current_list.len() as u64 == current_meta.md.val() { - match stack.pop() { - None => { - return Ok(ValueDecodeStateAny::Decoded(Value::List(current_list))); - } - Some((mut parent, parent_meta)) => { - parent.push(Value::List(current_list)); - current_list = parent; - current_meta = parent_meta; - continue; - } - } + }; + self.size = ProtocolObjectDecodeState::Completed(size); + while self.items.len() != size { + if decoder.eof() { + return Ok(ProtocolObjectDecodeState::Pending(self)); } - let v = match last.take() { - None => { - // nothing present, we need to decode - if self._cursor_eof() { - // wow, nothing here - stack.push((current_list, current_meta)); - return Ok(ValueDecodeStateAny::Pending(PendingValue::new( - ValueState::new(Value::List(vec![]), ValueStateMeta::zero()), - None, - stack, - ))); - } - match self._cursor_next() { - 0x0E => { - // that's a list - stack.push((current_list, current_meta)); - current_list = vec![]; - current_meta = ValueStateMeta::zero(); - continue; - } - code => self.start_decode(false, code, vec![], None), - } - } - Some(v) => self.resume_decode(false, v, vec![], None), + let r = match self.pending.take() { + Some(v) => v.try_complete_self(decoder), + None => PendingValue::next_value(decoder), }?; - let v = match v { - ValueDecodeStateAny::Pending(pv) => { - stack.push((current_list, current_meta)); - return Ok(ValueDecodeStateAny::Pending(PendingValue::new( - ValueState::new(Value::List(vec![]), ValueStateMeta::zero()), - Some(pv.state), - stack, - ))); + match r { + ProtocolObjectDecodeState::Completed(v) => { + self.items.push(v); + } + ProtocolObjectDecodeState::Pending(pv) => { + self.pending = Some(Box::new(pv)); + return Ok(ProtocolObjectDecodeState::Pending(self)); } - ValueDecodeStateAny::Decoded(v) => v, - }; - current_list.push(v); + } } + Ok(ProtocolObjectDecodeState::Completed(self)) + } + fn into_value(self) -> Self::Value { + self.items } } -impl<'a> Decoder<'a> { - fn start_decode( - &mut self, - root: bool, - code: u8, - mut stack: Vec<(Vec, ValueStateMeta)>, - last: Option, - ) -> ProtocolResult { - let md = ValueStateMeta::new(self._cursor(), 0, false); - let v = match code { - 0x00 => return Ok(ValueDecodeStateAny::Decoded(Value::Null)), - 0x01 => return self.parse_bool(stack), - 0x02 => self.__resume_decode(0u8, md), - 0x03 => self.__resume_decode(0u16, md), - 0x04 => self.__resume_decode(0u32, md), - 0x05 => self.__resume_decode(0u64, md), - 0x06 => self.__resume_decode(0i8, md), - 0x07 => self.__resume_decode(0i16, md), - 0x08 => self.__resume_decode(0i32, md), - 0x09 => self.__resume_decode(0i64, md), - 0x0A => self.__resume_decode(0f32, md), - 0x0B => self.__resume_decode(0f64, md), - 0x0C => self.__resume_psize::>(md), - 0x0D => self.__resume_psize::(md), - 0x0E => { - if !root { - unreachable!("recursive structure not captured by root"); - } - stack.push((vec![], ValueStateMeta::zero())); - return self.parse_list(stack, last); +/* + multi value stream: sequential collection of value streams +*/ + +#[derive(Debug, PartialEq)] +pub(crate) struct MultiValueStream { + count: ProtocolObjectDecodeState>, + items: Vec>, + pending: Option, +} + +impl ProtocolObjectState for MultiValueStream { + type Value = Vec>; + fn initialize(decoder: &Decoder) -> Self { + Self { + count: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)), + items: vec![], + pending: None, + } + } + fn complete( + mut self, + decoder: &mut Decoder, + ) -> ProtocolResult> { + let size = match self.count.try_complete(decoder)? { + ProtocolObjectDecodeState::Completed(sz) => sz, + ProtocolObjectDecodeState::Pending(pv) => { + self.count = ProtocolObjectDecodeState::Pending(pv); + return Ok(ProtocolObjectDecodeState::Pending(self)); } - _ => return Err(ProtocolError::InvalidServerResponseUnknownDataType), - }?; - Self::check_pending(v, stack) - } - fn resume_decode( - &mut self, - root: bool, - ValueState { v, meta }: ValueState, - stack: Vec<(Vec, ValueStateMeta)>, - last: Option, - ) -> ProtocolResult { - let r = match v { - Value::Null => unreachable!(), - Value::Bool(_) => return self.parse_bool(stack), - Value::UInt8(l) => self.__resume_decode(l, meta), - Value::UInt16(l) => self.__resume_decode(l, meta), - Value::UInt32(l) => self.__resume_decode(l, meta), - Value::UInt64(l) => self.__resume_decode(l, meta), - Value::SInt8(l) => self.__resume_decode(l, meta), - Value::SInt16(l) => self.__resume_decode(l, meta), - Value::SInt32(l) => self.__resume_decode(l, meta), - Value::SInt64(l) => self.__resume_decode(l, meta), - Value::Float32(l) => self.__resume_decode(l, meta), - Value::Float64(l) => self.__resume_decode(l, meta), - Value::Binary(_) => self.__resume_psize::>(meta), - Value::String(_) => self.__resume_psize::(meta), - Value::List(_) => { - if !root { - unreachable!("recursive structure not captured by root"); + }; + self.count = ProtocolObjectDecodeState::Completed(size); + while self.items.len() != size { + match match self.pending.take() { + Some(pending_vs) => pending_vs.complete(decoder), + None => ValueStream::initialize(decoder).complete(decoder), + }? { + ProtocolObjectDecodeState::Completed(vs) => { + self.items.push(vs.items); + } + ProtocolObjectDecodeState::Pending(pvs) => { + self.pending = Some(pvs); + return Ok(ProtocolObjectDecodeState::Pending(self)); } - return self.parse_list(stack, last); } - }?; - Self::check_pending(r, stack) - } - fn parse_bool( - &mut self, - stack: Vec<(Vec, ValueStateMeta)>, - ) -> ProtocolResult { - if self._cursor_eof() { - return Ok(ValueDecodeStateAny::Pending(PendingValue::new( - ValueState::new(Value::Bool(false), ValueStateMeta::zero()), - None, - stack, - ))); } - let nx = self._cursor_next(); - if nx < 2 { - return Ok(ValueDecodeStateAny::Decoded(Value::Bool(nx == 1))); - } else { - return Err(ProtocolError::InvalidServerResponseForData); + Ok(ProtocolObjectDecodeState::Completed(self)) + } + fn into_value(self) -> Self::Value { + self.items + } +} + +#[test] +fn decode_lfs_object() { + { + let b = b"-3.142\n"; + for i in 1..b.len() { + let mut decoder = Decoder::new(&b[..i], 0); + assert!(matches!( + LfsValue::::initialize(&decoder) + .complete(&mut decoder) + .unwrap(), + ProtocolObjectDecodeState::Pending(_) + )) } + let mut decoder = Decoder::new(b, 0); + assert_eq!( + LfsValue::::initialize(&decoder) + .complete(&mut decoder) + .unwrap() + .into_completed() + .unwrap() + .into_value(), + -3.142_f32 + ); } - fn check_pending( - r: ValueDecodeStateAny, - stack: Vec<(Vec, ValueStateMeta)>, - ) -> Result, ProtocolError> { - match r { - ValueDecodeStateAny::Pending(p) => Ok(ValueDecodeStateAny::Pending(PendingValue::new( - p, None, stack, - ))), - ValueDecodeStateAny::Decoded(v) => Ok(ValueDecodeStateAny::Decoded(v)), + { + let b = b"1096\n"; + for i in 1..b.len() { + let mut decoder = Decoder::new(&b[..i], 0); + assert!(matches!( + LfsValue::::initialize(&decoder) + .complete(&mut decoder) + .unwrap(), + ProtocolObjectDecodeState::Pending(_) + )) } + let mut decoder = Decoder::new(b, 0); + assert_eq!( + LfsValue::::initialize(&decoder) + .complete(&mut decoder) + .unwrap() + .into_completed() + .unwrap() + .into_value(), + 1096u16 + ); + } + { + let b = b"-1032\n"; + for i in 1..b.len() { + let mut decoder = Decoder::new(&b[..i], 0); + assert!(matches!( + LfsValue::::initialize(&decoder) + .complete(&mut decoder) + .unwrap(), + ProtocolObjectDecodeState::Pending(_) + )) + } + let mut decoder = Decoder::new(b, 0); + assert_eq!( + LfsValue::::initialize(&decoder) + .complete(&mut decoder) + .unwrap() + .into_completed() + .unwrap() + .into_value(), + -1032i16 + ); } } #[test] -fn t_mrow() { - const MROW_QUERY: &[u8] = b"\x133\n5\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n"; - for i in 1..MROW_QUERY.len() { - let mut decoder = Decoder::new(&MROW_QUERY[..i], 0); - if i == 1 { +fn decode_sp_object() { + { + let b = b"5\nhello"; + for i in 1..b.len() { + let mut decoder = Decoder::new(&b[..i], 0); assert!(matches!( - decoder.validate_response(RState::default()), - DecodeState::ChangeState(RState(_)) - )); - } else { + SpObjectState::>::initialize(&decoder) + .complete(&mut decoder) + .unwrap(), + ProtocolObjectDecodeState::Pending(_) + )) + } + let mut decoder = Decoder::new(b, 0); + assert_eq!( + SpObjectState::>::initialize(&decoder) + .complete(&mut decoder) + .unwrap() + .into_completed() + .unwrap() + .into_value(), + b"hello" + ); + } + { + let b = b"6\nworld!"; + for i in 1..b.len() { + let mut decoder = Decoder::new(&b[..i], 0); assert!(matches!( - decoder.validate_response(RState::default()), - DecodeState::ChangeState(RState(ResponseState::PMultiRow(_))) - )); + SpObjectState::::initialize(&decoder) + .complete(&mut decoder) + .unwrap(), + ProtocolObjectDecodeState::Pending(_) + )) } + let mut decoder = Decoder::new(b, 0); + assert_eq!( + SpObjectState::::initialize(&decoder) + .complete(&mut decoder) + .unwrap() + .into_completed() + .unwrap() + .into_value(), + "world!" + ); } - let mut decoder = Decoder::new(MROW_QUERY, 0); +} + +#[test] +fn decode_value_stream() { + // [null, bool, uint, sint, float, binary, string, [binary, string]] + const QUERY: &[u8] = b"8\n\x00\x01\x01\x0518446744073709551615\n\x09-9223372036854775808\n\x0A-3.141592654\n\x0C5\nabcde\x0D5\nfghij\x0E2\n\x0C5\nabcde\x0D5\nfghij"; + for i in 1..QUERY.len() { + let block = &QUERY[..i]; + let mut decoder = Decoder::new(block, 0); + assert!(matches!( + ValueStream::initialize(&decoder) + .complete(&mut decoder) + .unwrap(), + ProtocolObjectDecodeState::Pending(_) + )); + } + let mut decoder = Decoder::new(QUERY, 0); assert_eq!( - decoder.validate_response(RState::default()), - DecodeState::Completed(Response::Rows(vec![ - Row::new(vec![ - Value::Null, - Value::Bool(true), - Value::String("sayan".into()), - Value::UInt8(20), - Value::List(vec![]) - ]), - Row::new(vec![ - Value::Null, - Value::Bool(true), - Value::String("elana".into()), - Value::UInt8(21), - Value::List(vec![]) - ]), - Row::new(vec![ - Value::Null, - Value::Bool(true), - Value::String("emily".into()), - Value::UInt8(22), - Value::List(vec![]) + ValueStream::initialize(&decoder) + .complete(&mut decoder) + .unwrap() + .into_completed() + .unwrap() + .into_value(), + vec![ + Value::Null, + Value::Bool(true), + Value::UInt64(u64::MAX), + Value::SInt64(i64::MIN), + Value::Float32(-3.141592654), + Value::Binary(b"abcde".to_vec()), + Value::String("fghij".to_string()), + Value::List(vec![ + Value::Binary(b"abcde".to_vec()), + Value::String("fghij".to_string()) ]) - ])) + ] ); } + #[test] -fn t_num() { - const NUM: &[u8] = b"1234\n"; - fn decoder(i: usize) -> Decoder<'static> { - Decoder::new(&NUM[..i], 0) - } - for (i, expected) in [1, 12, 123, 1234u64] - .iter() - .enumerate() - .map(|(a, b)| (a + 1, *b)) - { - assert_eq!( - decoder(i) - .__resume_decode(0u64, ValueStateMeta::zero()) +fn decode_multi_value_stream() { + let packet = [ + b"5\n".to_vec(), + "8\n\x00\x01\x01\x0518446744073709551615\n\x09-9223372036854775808\n\x0A-3.141592654\n\x0C5\nabcde\x0D5\nfghij\x0E2\n\x0C5\nabcde\x0D5\nfghij".repeat(5).into_bytes() + ].concat(); + for i in 1..packet.len() { + let mut decoder = Decoder::new(&packet[..i], 0); + assert!(matches!( + MultiValueStream::initialize(&decoder) + .complete(&mut decoder) .unwrap(), - ValueDecodeStateAny::Pending(ValueState::new( - Value::UInt64(expected), - ValueStateMeta::zero() - )) - ); + ProtocolObjectDecodeState::Pending(_) + )) } + let mut decoder = Decoder::new(&packet, 0); assert_eq!( - decoder(NUM.len()) - .__resume_decode(0u64, ValueStateMeta::zero()) - .unwrap(), - ValueDecodeStateAny::Decoded(Value::UInt64(1234)) + MultiValueStream::initialize(&decoder) + .complete(&mut decoder) + .unwrap() + .into_completed() + .unwrap() + .into_value(), + (0..5) + .map(|_| vec![ + Value::Null, + Value::Bool(true), + Value::UInt64(u64::MAX), + Value::SInt64(i64::MIN), + Value::Float32(-3.141592654), + Value::Binary(b"abcde".to_vec()), + Value::String("fghij".to_string()), + Value::List(vec![ + Value::Binary(b"abcde".to_vec()), + Value::String("fghij".to_string()) + ]) + ]) + .collect::>>() ); } diff --git a/src/protocol/pipe.rs b/src/protocol/pipe.rs index fd01fe6..992d054 100644 --- a/src/protocol/pipe.rs +++ b/src/protocol/pipe.rs @@ -15,10 +15,7 @@ */ use { - super::{ - state::{DecodeState, RState, ResponseState}, - Decoder, ProtocolError, - }, + super::{DecodeState, Decoder, ProtocolError, RState, ResponseState}, crate::response::Response, }; @@ -42,35 +39,38 @@ impl MRespState { fn except() -> PipelineResult { PipelineResult::Error(ProtocolError::InvalidPacket) } - fn step(mut self, decoder: &mut Decoder, expected: usize) -> PipelineResult { + fn step(mut self, mut decoder: Decoder, expected: usize) -> (PipelineResult, usize) { + let buf = decoder.b; loop { - if decoder._cursor_eof() { - return PipelineResult::Pending(self); + if decoder.eof() { + return (PipelineResult::Pending(self), decoder.position()); } - if decoder._cursor_value() == ILLEGAL_PACKET_ESCAPE { - return Self::except(); + if decoder.cursor_value() == ILLEGAL_PACKET_ESCAPE { + return (Self::except(), 0); } - match decoder.validate_response(RState( + let (_state, _position) = decoder.validate_response(RState( self.pending.take().unwrap_or(ResponseState::Initial), - )) { - DecodeState::ChangeState(RState(s)) => { - self.pending = Some(s); - return PipelineResult::Pending(self); - } - DecodeState::Completed(c) => { - self.processed.push(c); + )); + match _state { + DecodeState::Completed(resp) => { + self.processed.push(resp); if self.processed.len() == expected { - return PipelineResult::Completed(self.processed); + return (PipelineResult::Completed(self.processed), _position); } + decoder = Decoder::new(buf, _position); + } + DecodeState::ChangeState(RState(s)) => { + self.pending = Some(s); + return (PipelineResult::Pending(self), _position); } - DecodeState::Error(e) => return PipelineResult::Error(e), + DecodeState::Error(e) => return (PipelineResult::Error(e), _position), } } } } impl<'a> Decoder<'a> { - pub fn validate_pipe(&mut self, expected: usize, state: MRespState) -> PipelineResult { + pub fn validate_pipe(self, expected: usize, state: MRespState) -> (PipelineResult, usize) { state.step(self, expected) } } @@ -81,9 +81,9 @@ const QUERY: &[u8] = b"\x12\x10\xFF\xFF\x115\n\x00\x01\x01\x0D5\nsayan\x0220\n\x #[test] fn t_pipe() { use crate::response::{Response, Row, Value}; - let mut decoder = Decoder::new(QUERY, 0); + let decoder = Decoder::new(QUERY, 0); assert_eq!( - decoder.validate_pipe(5, MRespState::default()), + decoder.validate_pipe(5, MRespState::default()).0, PipelineResult::Completed(vec![ Response::Empty, Response::Error(u16::MAX), @@ -115,15 +115,15 @@ fn t_pipe() { #[test] fn t_pipe_staged() { for i in Decoder::MIN_READBACK..QUERY.len() { - let mut dec = Decoder::new(&QUERY[..i], 0); + let dec = Decoder::new(&QUERY[..i], 0); if i < 3 { assert!(matches!( - dec.validate_pipe(5, MRespState::default()), + dec.validate_pipe(5, MRespState::default()).0, PipelineResult::Pending(_) )); } else { assert!(matches!( - dec.validate_pipe(5, MRespState::default()), + dec.validate_pipe(5, MRespState::default()).0, PipelineResult::Pending(_) )); } diff --git a/src/protocol/state.rs b/src/protocol/state.rs deleted file mode 100644 index 3e5753d..0000000 --- a/src/protocol/state.rs +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Copyright 2024, Sayan Nandan - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -use { - super::{Decoder, ProtocolError, ProtocolResult}, - crate::response::{Response, Row, Value}, -}; - -pub type ValueDecodeStateRaw = ValueDecodeStateAny; -pub type ValueDecodeState = ValueDecodeStateAny; - -/* - pending value - --- - a stack is useful for recursive types -*/ - -#[derive(Debug, PartialEq)] -pub struct PendingValue { - pub(super) state: ValueState, - pub(super) tmp: Option, - pub(super) stack: Vec<(Vec, ValueStateMeta)>, -} - -impl PendingValue { - pub fn new( - state: ValueState, - tmp: Option, - stack: Vec<(Vec, ValueStateMeta)>, - ) -> Self { - Self { state, tmp, stack } - } -} - -/* - value state -*/ - -#[derive(Debug, PartialEq)] -pub enum ValueDecodeStateAny { - Pending(P), - Decoded(V), -} - -#[derive(Debug, PartialEq)] -pub struct ValueState { - pub(super) v: Value, - pub(super) meta: ValueStateMeta, -} - -impl ValueState { - pub fn new(v: Value, meta: ValueStateMeta) -> Self { - Self { v, meta } - } -} - -#[derive(Debug, PartialEq)] -pub struct ValueStateMeta { - pub(super) start: usize, - pub(super) md: MetaState, -} - -impl ValueStateMeta { - pub fn zero() -> Self { - Self { - start: 0, - md: MetaState::default(), - } - } - pub fn new(start: usize, md1: u64, md1_flag: bool) -> Self { - Self { - start, - md: MetaState::new(md1_flag, md1), - } - } -} - -/* - metadata init state -*/ - -#[derive(Debug, Default, PartialEq)] -pub struct MetaState { - completed: bool, - val: u64, -} - -impl MetaState { - pub fn new(completed: bool, val: u64) -> Self { - Self { completed, val } - } - #[inline(always)] - pub fn finished(&mut self, decoder: &mut Decoder) -> ProtocolResult { - self.finish_or_continue(decoder, || Ok(true), || Ok(false), |e| Err(e)) - } - #[inline(always)] - pub fn finish_or_continue( - &mut self, - decoder: &mut Decoder, - if_completed: impl FnOnce() -> T, - if_pending: impl FnOnce() -> T, - if_err: impl FnOnce(ProtocolError) -> T, - ) -> T { - Self::try_finish_or_continue( - self.completed, - &mut self.val, - decoder, - if_completed, - if_pending, - if_err, - ) - } - #[inline(always)] - pub fn try_finish( - decoder: &mut Decoder, - completed: bool, - val: &mut u64, - ) -> ProtocolResult { - Self::try_finish_or_continue( - completed, - val, - decoder, - || Ok(true), - || Ok(false), - |e| Err(e), - ) - } - #[inline(always)] - pub fn try_finish_or_continue( - completed: bool, - val: &mut u64, - decoder: &mut Decoder, - if_completed: impl FnOnce() -> T, - if_pending: impl FnOnce() -> T, - if_err: impl FnOnce(ProtocolError) -> T, - ) -> T { - if completed { - if_completed() - } else { - match decoder.__resume_decode(*val, ValueStateMeta::zero()) { - Ok(vs) => match vs { - ValueDecodeStateAny::Pending(ValueState { v, .. }) => { - *val = v.u64(); - if_pending() - } - ValueDecodeStateAny::Decoded(v) => { - *val = v.u64(); - if_completed() - } - }, - Err(e) => if_err(e), - } - } - } - #[inline(always)] - pub fn val(&self) -> u64 { - self.val - } -} - -/* - row state -*/ - -#[derive(Debug, PartialEq)] -pub struct RowState { - pub(super) meta: ValueStateMeta, - pub(super) row: Vec, - pub(super) tmp: Option, -} - -impl RowState { - pub fn new(meta: ValueStateMeta, row: Vec, tmp: Option) -> Self { - Self { meta, row, tmp } - } -} - -/* - multi row state -*/ - -#[derive(Debug, PartialEq)] -pub struct MultiRowState { - pub(super) c_row: Option, - pub(super) rows: Vec, - pub(super) md_state: u8, - pub(super) md1_target: u64, - pub(super) md2_col_cnt: u64, -} - -impl Default for MultiRowState { - fn default() -> Self { - Self::new(None, vec![], 0, 0, 0) - } -} - -impl MultiRowState { - pub fn new( - c_row: Option, - rows: Vec, - md_s: u8, - md_cnt: u64, - md_target: u64, - ) -> Self { - Self { - c_row, - rows, - md_state: md_s, - md1_target: md_target, - md2_col_cnt: md_cnt, - } - } -} - -/* - response state -*/ - -#[derive(Debug, PartialEq)] -pub enum ResponseState { - Initial, - PValue(PendingValue), - PError, - PRow(RowState), - PMultiRow(MultiRowState), -} - -#[derive(Debug, PartialEq)] -pub enum DecodeState { - ChangeState(RState), - Completed(Response), - Error(ProtocolError), -} - -#[derive(Debug, PartialEq)] -pub struct RState(pub(super) ResponseState); -impl Default for RState { - fn default() -> Self { - RState(ResponseState::Initial) - } -} From 61484871f1f1b5d41ea81e0b0ce4d9a7f04ccefb Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 23 Jun 2024 17:58:40 +0530 Subject: [PATCH 2/3] response: Add `FromResponse` for multi-row responses --- CHANGELOG.md | 30 ++++++++++++++++++++---------- src/protocol/mod.rs | 2 +- src/response.rs | 9 +++++++++ 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4b3c03..3c4b888 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,31 +2,41 @@ All changes in this project will be noted in this file. -### 0.8.7 (unreleased) +## 0.8.8 + +### Fixes + +- Fixed response decoder and handling issues + +### Additions + +- Added `FromResponse` for `Vec` + +## 0.8.7 > - **Field change warnings**: > - The `Config` struct now has one additional field. This is not a breaking change because the functionality of the library remains unchanged -#### Additions +### Additions - Added support for pipelines - Added `Response::parse` to convert a response into compatible types -### 0.8.6 +## 0.8.6 Reduced allocations in `Query`. -### 0.8.5 +## 0.8.5 Fixed bugs with the derive macros. -### 0.8.4 +## 0.8.4 > **Yanked version** Fixed an issue with single-item struct derives when using the `Response` macro. -### 0.8.3 +## 0.8.3 Added the following implementations: - `FromResponse` for `Row` @@ -34,22 +44,22 @@ Added the following implementations: - Added the `Value::parse` and `Value::parse_cloned` member methods - Added `Row::into_first` and `Row::into_first_as` member methods -### 0.8.2 +## 0.8.2 Support deriving queries and responses. -### 0.8.1 +## 0.8.1 Fixed issues with documentation ## 0.8.0 -#### New features +### New features - Completely up to date for Skyhash 2.0 - New query API interface for Skytable Octave (completely breaking!) - No longer depends on OpenSSL -#### Breaking changes +### Breaking changes The enter query interface as changed and is incompatible with previous driver versions. Please consider reading the Skytable Octave upgrade guide. diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index c36b00c..84178fc 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -884,6 +884,6 @@ fn decode_multi_value_stream() { Value::String("fghij".to_string()) ]) ]) - .collect::>>() + .collect::>() ); } diff --git a/src/response.rs b/src/response.rs index 93bd5cf..9483df4 100644 --- a/src/response.rs +++ b/src/response.rs @@ -314,3 +314,12 @@ impl FromResponse for Row { } } } + +impl FromResponse for Vec { + fn from_response(resp: Response) -> ClientResult { + match resp { + Response::Rows(rows) => Ok(rows), + _ => Err(Error::ParseError(ParseError::ResponseMismatch)), + } + } +} From 0e6add51f92154ae43b026dea190650f8d80edfb Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 23 Jun 2024 18:14:31 +0530 Subject: [PATCH 3/3] response: Add SQParam for `&Vec` --- CHANGELOG.md | 1 + src/query.rs | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c4b888..6b00246 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All changes in this project will be noted in this file. ### Additions - Added `FromResponse` for `Vec` +- Added `SQParam` impl for `&Vec` ## 0.8.7 diff --git a/src/query.rs b/src/query.rs index e8445cd..3254c9b 100644 --- a/src/query.rs +++ b/src/query.rs @@ -366,6 +366,11 @@ impl SQParam for Vec { 1 } } +impl<'a> SQParam for &'a Vec { + fn append_param(&self, q: &mut Vec) -> usize { + self.as_slice().append_param(q) + } +} // str impl<'a> SQParam for &'a str { fn append_param(&self, buf: &mut Vec) -> usize {