Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Once;

cfg_replication!(
use http::uri::InvalidUri;
use crate::database::EncryptionConfig;
use libsql_replication::frame::FrameNo;

Expand Down Expand Up @@ -90,13 +91,16 @@ impl Database {

let endpoint = coerce_url_scheme(endpoint);
let remote = crate::replication::client::Client::new(
connector,
endpoint.as_str().try_into().unwrap(),
auth_token,
connector.clone(),
endpoint
.as_str()
.try_into()
.map_err(|e: InvalidUri| crate::Error::Replication(e.into()))?,
auth_token.clone(),
version.as_deref(),
http_request_callback,
http_request_callback.clone(),
)
.unwrap();
.map_err(|e| crate::Error::Replication(e.into()))?;
let path = PathBuf::from(db_path);
let client = RemoteClient::new(remote.clone(), &path)
.await
Expand Down Expand Up @@ -164,7 +168,10 @@ impl Database {
let endpoint = coerce_url_scheme(endpoint);
let remote = crate::replication::client::Client::new(
connector,
endpoint.as_str().try_into().unwrap(),
endpoint
.as_str()
.try_into()
.map_err(|e: InvalidUri| crate::Error::Replication(e.into()))?,
auth_token,
version.as_deref(),
http_request_callback,
Expand Down
60 changes: 44 additions & 16 deletions libsql/src/replication/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use libsql_replication::frame::{Frame, FrameHeader, FrameNo};
use libsql_replication::meta::WalIndexMeta;
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
use libsql_replication::rpc::replication::{
verify_session_token, HelloRequest, LogOffset, SESSION_TOKEN_KEY,
verify_session_token, Frames, HelloRequest, LogOffset, SESSION_TOKEN_KEY,
};
use tokio_stream::Stream;
use tonic::metadata::AsciiMetadataValue;
use tonic::{Response, Status};
use zerocopy::FromBytes;

/// A remote replicator client, that pulls frames over RPC
Expand All @@ -22,6 +23,7 @@ pub struct RemoteClient {
last_handshake_replication_index: Option<FrameNo>,
// the replication log is dirty, reset the meta on next handshake
dirty: bool,
prefetched_batch_log_entries: Option<Result<Response<Frames>, Status>>,
}

impl RemoteClient {
Expand All @@ -34,6 +36,7 @@ impl RemoteClient {
session_token: None,
dirty: false,
last_handshake_replication_index: None,
prefetched_batch_log_entries: None,
})
}

Expand Down Expand Up @@ -68,16 +71,31 @@ impl ReplicatorClient for RemoteClient {
/// Perform handshake with remote
async fn handshake(&mut self) -> Result<(), Error> {
tracing::info!("Attempting to perform handshake with primary.");
let req = self.make_request(HelloRequest::new());
let resp = self.remote.replication.hello(req).await?;
let hello = resp.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
self.session_token = Some(hello.session_token.clone());
if self.dirty {
self.prefetched_batch_log_entries = None;
self.meta.reset();
self.last_received = self.meta.current_frame_no();
self.dirty = false;
}
let hello_req = self.make_request(HelloRequest::new());
let log_offset_req = self.make_request(LogOffset {
next_offset: self.next_offset(),
});
let mut client_clone = self.remote.clone();
let hello_fut = self.remote.replication.hello(hello_req);
let (hello, frames) = if self.session_token.is_some() {
let (hello, frames) = tokio::join!(
hello_fut,
client_clone.replication.batch_log_entries(log_offset_req)
);
(hello, Some(frames))
} else {
(hello_fut.await, None)
};
let hello = hello?.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
let new_session = self.session_token != Some(hello.session_token.clone());
self.session_token = Some(hello.session_token.clone());
let current_replication_index = hello.current_replication_index;
if let Err(e) = self.meta.init_from_hello(hello) {
// set the meta as dirty. The caller should catch the error and clean the db
Expand All @@ -91,22 +109,32 @@ impl ReplicatorClient for RemoteClient {
}
self.last_handshake_replication_index = current_replication_index;
self.meta.flush().await?;
self.prefetched_batch_log_entries = if new_session {
tracing::warn!("Frames prefetching failed because of new session token returned by handshake");
None
} else {
frames
};

Ok(())
}

/// Return a stream of frames to apply to the database
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
let req = self.make_request(LogOffset {
next_offset: self.next_offset(),
});
let frames = self
.remote
.replication
.batch_log_entries(req)
.await?
.into_inner()
.frames;
let frames = match self.prefetched_batch_log_entries.take() {
Some(frames) => frames,
None => {
let req = self.make_request(LogOffset {
next_offset: self.next_offset(),
});
self
.remote
.replication
.batch_log_entries(req)
.await
}
};
let frames = frames?.into_inner().frames;

if let Some(f) = frames.last() {
let header: FrameHeader = FrameHeader::read_from_prefix(&f.data)
Expand Down