1+ use bitreq:: Client ;
12use prost:: Message ;
2- use reqwest:: header:: CONTENT_TYPE ;
3- use reqwest:: Client ;
43use std:: collections:: HashMap ;
54use std:: default:: Default ;
65use std:: sync:: Arc ;
76
87use log:: trace;
98
109use crate :: error:: VssError ;
11- use crate :: headers:: { get_headermap , FixedHeaders , VssHeaderProvider } ;
10+ use crate :: headers:: { FixedHeaders , VssHeaderProvider } ;
1211use crate :: types:: {
1312 DeleteObjectRequest , DeleteObjectResponse , GetObjectRequest , GetObjectResponse ,
1413 ListKeyVersionsRequest , ListKeyVersionsResponse , PutObjectRequest , PutObjectResponse ,
@@ -17,7 +16,10 @@ use crate::util::retry::{retry, RetryPolicy};
1716use crate :: util:: KeyValueVecKeyPrinter ;
1817
1918const APPLICATION_OCTET_STREAM : & str = "application/octet-stream" ;
20- const DEFAULT_TIMEOUT : std:: time:: Duration = std:: time:: Duration :: from_secs ( 10 ) ;
19+ const CONTENT_TYPE : & str = "content-type" ;
20+ const DEFAULT_TIMEOUT_SECS : u64 = 10 ;
21+ const MAX_RESPONSE_BODY_SIZE : usize = 1024 * 1024 * 1024 ; // 1GB
22+ const DEFAULT_CLIENT_CAPACITY : usize = 10 ;
2123
2224/// Thin-client to access a hosted instance of Versioned Storage Service (VSS).
2325/// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API.
@@ -35,11 +37,11 @@ where
3537impl < R : RetryPolicy < E = VssError > > VssClient < R > {
3638 /// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint.
3739 pub fn new ( base_url : String , retry_policy : R ) -> Self {
38- let client = build_client ( ) ;
40+ let client = Client :: new ( DEFAULT_CLIENT_CAPACITY ) ;
3941 Self :: from_client ( base_url, client, retry_policy)
4042 }
4143
42- /// Constructs a [`VssClient`] from a given [`reqwest ::Client`], using `base_url` as the VSS server endpoint.
44+ /// Constructs a [`VssClient`] from a given [`bitreq ::Client`], using `base_url` as the VSS server endpoint.
4345 pub fn from_client ( base_url : String , client : Client , retry_policy : R ) -> Self {
4446 Self {
4547 base_url,
@@ -49,7 +51,7 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
4951 }
5052 }
5153
52- /// Constructs a [`VssClient`] from a given [`reqwest ::Client`], using `base_url` as the VSS server endpoint.
54+ /// Constructs a [`VssClient`] from a given [`bitreq ::Client`], using `base_url` as the VSS server endpoint.
5355 ///
5456 /// HTTP headers will be provided by the given `header_provider`.
5557 pub fn from_client_and_headers (
@@ -65,7 +67,7 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
6567 pub fn new_with_headers (
6668 base_url : String , retry_policy : R , header_provider : Arc < dyn VssHeaderProvider > ,
6769 ) -> Self {
68- let client = build_client ( ) ;
70+ let client = Client :: new ( DEFAULT_CLIENT_CAPACITY ) ;
6971 Self { base_url, client, retry_policy, header_provider }
7072 }
7173
@@ -85,15 +87,17 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
8587 let res = retry (
8688 || async {
8789 let url = format ! ( "{}/getObject" , self . base_url) ;
88- self . post_request ( request, & url) . await . and_then ( |response : GetObjectResponse | {
89- if response. value . is_none ( ) {
90- Err ( VssError :: InternalServerError (
91- "VSS Server API Violation, expected value in GetObjectResponse but found none" . to_string ( ) ,
92- ) )
93- } else {
94- Ok ( response)
95- }
96- } )
90+ self . post_request ( request, & url, true ) . await . and_then (
91+ |response : GetObjectResponse | {
92+ if response. value . is_none ( ) {
93+ Err ( VssError :: InternalServerError (
94+ "VSS Server API Violation, expected value in GetObjectResponse but found none" . to_string ( ) ,
95+ ) )
96+ } else {
97+ Ok ( response)
98+ }
99+ } ,
100+ )
97101 } ,
98102 & self . retry_policy ,
99103 )
@@ -121,7 +125,7 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
121125 let res = retry (
122126 || async {
123127 let url = format ! ( "{}/putObjects" , self . base_url) ;
124- self . post_request ( request, & url) . await
128+ self . post_request ( request, & url, false ) . await
125129 } ,
126130 & self . retry_policy ,
127131 )
@@ -147,7 +151,7 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
147151 let res = retry (
148152 || async {
149153 let url = format ! ( "{}/deleteObject" , self . base_url) ;
150- self . post_request ( request, & url) . await
154+ self . post_request ( request, & url, true ) . await
151155 } ,
152156 & self . retry_policy ,
153157 )
@@ -175,7 +179,7 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
175179 let res = retry (
176180 || async {
177181 let url = format ! ( "{}/listKeyVersions" , self . base_url) ;
178- self . post_request ( request, & url) . await
182+ self . post_request ( request, & url, true ) . await
179183 } ,
180184 & self . retry_policy ,
181185 )
@@ -187,40 +191,36 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
187191 }
188192
189193 async fn post_request < Rq : Message , Rs : Message + Default > (
190- & self , request : & Rq , url : & str ,
194+ & self , request : & Rq , url : & str , enable_pipelining : bool ,
191195 ) -> Result < Rs , VssError > {
192196 let request_body = request. encode_to_vec ( ) ;
193- let headermap = self
197+ let headers = self
194198 . header_provider
195199 . get_headers ( & request_body)
196200 . await
197- . and_then ( |h| get_headermap ( & h) )
198201 . map_err ( |e| VssError :: AuthError ( e. to_string ( ) ) ) ?;
199- let response_raw = self
200- . client
201- . post ( url)
202- . header ( CONTENT_TYPE , APPLICATION_OCTET_STREAM )
203- . headers ( headermap)
204- . body ( request_body)
205- . send ( )
206- . await ?;
207- let status = response_raw. status ( ) ;
208- let payload = response_raw. bytes ( ) . await ?;
209-
210- if status. is_success ( ) {
202+
203+ let mut http_request = bitreq:: post ( url)
204+ . with_header ( CONTENT_TYPE , APPLICATION_OCTET_STREAM )
205+ . with_headers ( headers)
206+ . with_body ( request_body)
207+ . with_timeout ( DEFAULT_TIMEOUT_SECS )
208+ . with_max_body_size ( Some ( MAX_RESPONSE_BODY_SIZE ) ) ;
209+
210+ if enable_pipelining {
211+ http_request = http_request. with_pipelining ( ) ;
212+ }
213+
214+ let response = self . client . send_async ( http_request) . await ?;
215+
216+ let status_code = response. status_code ;
217+ let payload = response. into_bytes ( ) ;
218+
219+ if ( 200 ..300 ) . contains ( & status_code) {
211220 let response = Rs :: decode ( & payload[ ..] ) ?;
212221 Ok ( response)
213222 } else {
214- Err ( VssError :: new ( status , payload) )
223+ Err ( VssError :: new ( status_code , payload) )
215224 }
216225 }
217226}
218-
219- fn build_client ( ) -> Client {
220- Client :: builder ( )
221- . timeout ( DEFAULT_TIMEOUT )
222- . connect_timeout ( DEFAULT_TIMEOUT )
223- . read_timeout ( DEFAULT_TIMEOUT )
224- . build ( )
225- . unwrap ( )
226- }
0 commit comments