Skip to content

Commit 3d676fb

Browse files
chewiseanmonstar
authored andcommitted
feat(client): Add connect timeout to HttpConnector
This takes the same strategy as golang, where the timeout value is divided equally between the candidate socket addresses. If happy eyeballs is enabled, the division takes place "below" the IPv4/IPv6 partitioning. Backported to 0.12 from master.
1 parent 90c1e8f commit 3d676fb

File tree

2 files changed

+62
-15
lines changed

2 files changed

+62
-15
lines changed

src/client/connect/dns.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ impl IpAddrs {
261261
pub(super) fn is_empty(&self) -> bool {
262262
self.iter.as_slice().is_empty()
263263
}
264+
265+
pub(super) fn len(&self) -> usize {
266+
self.iter.as_slice().len()
267+
}
264268
}
265269

266270
impl Iterator for IpAddrs {

src/client/connect/http.rs

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use http::uri::Scheme;
1212
use net2::TcpBuilder;
1313
use tokio_reactor::Handle;
1414
use tokio_tcp::{TcpStream, ConnectFuture};
15-
use tokio_timer::Delay;
15+
use tokio_timer::{Delay, Timeout};
1616

1717
use super::{Connect, Connected, Destination};
1818
use super::dns::{self, GaiResolver, Resolve, TokioThreadpoolGaiResolver};
@@ -29,6 +29,7 @@ use super::dns::{self, GaiResolver, Resolve, TokioThreadpoolGaiResolver};
2929
pub struct HttpConnector<R = GaiResolver> {
3030
enforce_http: bool,
3131
handle: Option<Handle>,
32+
connect_timeout: Option<Duration>,
3233
happy_eyeballs_timeout: Option<Duration>,
3334
keep_alive_timeout: Option<Duration>,
3435
local_address: Option<IpAddr>,
@@ -120,6 +121,7 @@ impl<R> HttpConnector<R> {
120121
HttpConnector {
121122
enforce_http: true,
122123
handle: None,
124+
connect_timeout: None,
123125
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
124126
keep_alive_timeout: None,
125127
local_address: None,
@@ -187,6 +189,17 @@ impl<R> HttpConnector<R> {
187189
self.local_address = addr;
188190
}
189191

192+
/// Set the connect timeout.
193+
///
194+
/// If a domain resolves to multiple IP addresses, the timeout will be
195+
/// evenly divided across them.
196+
///
197+
/// Default is `None`.
198+
#[inline]
199+
pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
200+
self.connect_timeout = dur;
201+
}
202+
190203
/// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
191204
///
192205
/// If hostname resolves to both IPv4 and IPv6 addresses and connection
@@ -259,6 +272,7 @@ where
259272
HttpConnecting {
260273
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
261274
handle: self.handle.clone(),
275+
connect_timeout: self.connect_timeout,
262276
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
263277
keep_alive_timeout: self.keep_alive_timeout,
264278
nodelay: self.nodelay,
@@ -285,6 +299,7 @@ fn invalid_url<R: Resolve>(err: InvalidUrl, handle: &Option<Handle>) -> HttpConn
285299
keep_alive_timeout: None,
286300
nodelay: false,
287301
port: 0,
302+
connect_timeout: None,
288303
happy_eyeballs_timeout: None,
289304
reuse_address: false,
290305
send_buffer_size: None,
@@ -319,6 +334,7 @@ impl StdError for InvalidUrl {
319334
pub struct HttpConnecting<R: Resolve = GaiResolver> {
320335
state: State<R>,
321336
handle: Option<Handle>,
337+
connect_timeout: Option<Duration>,
322338
happy_eyeballs_timeout: Option<Duration>,
323339
keep_alive_timeout: Option<Duration>,
324340
nodelay: bool,
@@ -348,7 +364,7 @@ impl<R: Resolve> Future for HttpConnecting<R> {
348364
// skip resolving the dns and start connecting right away.
349365
if let Some(addrs) = dns::IpAddrs::try_parse(host, self.port) {
350366
state = State::Connecting(ConnectingTcp::new(
351-
local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address));
367+
local_addr, addrs, self.connect_timeout, self.happy_eyeballs_timeout, self.reuse_address));
352368
} else {
353369
let name = dns::Name::new(mem::replace(host, String::new()));
354370
state = State::Resolving(resolver.resolve(name), local_addr);
@@ -364,7 +380,7 @@ impl<R: Resolve> Future for HttpConnecting<R> {
364380
.collect();
365381
let addrs = dns::IpAddrs::new(addrs);
366382
state = State::Connecting(ConnectingTcp::new(
367-
local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address));
383+
local_addr, addrs, self.connect_timeout, self.happy_eyeballs_timeout, self.reuse_address));
368384
}
369385
};
370386
},
@@ -417,6 +433,7 @@ impl ConnectingTcp {
417433
fn new(
418434
local_addr: Option<IpAddr>,
419435
remote_addrs: dns::IpAddrs,
436+
connect_timeout: Option<Duration>,
420437
fallback_timeout: Option<Duration>,
421438
reuse_address: bool,
422439
) -> ConnectingTcp {
@@ -425,25 +442,25 @@ impl ConnectingTcp {
425442
if fallback_addrs.is_empty() {
426443
return ConnectingTcp {
427444
local_addr,
428-
preferred: ConnectingTcpRemote::new(preferred_addrs),
445+
preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout),
429446
fallback: None,
430447
reuse_address,
431448
};
432449
}
433450

434451
ConnectingTcp {
435452
local_addr,
436-
preferred: ConnectingTcpRemote::new(preferred_addrs),
453+
preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout),
437454
fallback: Some(ConnectingTcpFallback {
438455
delay: Delay::new(Instant::now() + fallback_timeout),
439-
remote: ConnectingTcpRemote::new(fallback_addrs),
456+
remote: ConnectingTcpRemote::new(fallback_addrs, connect_timeout),
440457
}),
441458
reuse_address,
442459
}
443460
} else {
444461
ConnectingTcp {
445462
local_addr,
446-
preferred: ConnectingTcpRemote::new(remote_addrs),
463+
preferred: ConnectingTcpRemote::new(remote_addrs, connect_timeout),
447464
fallback: None,
448465
reuse_address,
449466
}
@@ -458,13 +475,17 @@ struct ConnectingTcpFallback {
458475

459476
struct ConnectingTcpRemote {
460477
addrs: dns::IpAddrs,
461-
current: Option<ConnectFuture>,
478+
connect_timeout: Option<Duration>,
479+
current: Option<MaybeTimedConnectFuture>,
462480
}
463481

464482
impl ConnectingTcpRemote {
465-
fn new(addrs: dns::IpAddrs) -> Self {
483+
fn new(addrs: dns::IpAddrs, connect_timeout: Option<Duration>) -> Self {
484+
let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32));
485+
466486
Self {
467487
addrs,
488+
connect_timeout,
468489
current: None,
469490
}
470491
}
@@ -481,7 +502,18 @@ impl ConnectingTcpRemote {
481502
let mut err = None;
482503
loop {
483504
if let Some(ref mut current) = self.current {
484-
match current.poll() {
505+
let poll: Poll<TcpStream, io::Error> = match current {
506+
MaybeTimedConnectFuture::Timed(future) => match future.poll() {
507+
Ok(tcp) => Ok(tcp),
508+
Err(err) => if err.is_inner() {
509+
Err(err.into_inner().unwrap())
510+
} else {
511+
Err(io::Error::new(io::ErrorKind::TimedOut, err.description()))
512+
}
513+
},
514+
MaybeTimedConnectFuture::Untimed(future) => future.poll(),
515+
};
516+
match poll {
485517
Ok(Async::Ready(tcp)) => {
486518
debug!("connected to {:?}", tcp.peer_addr().ok());
487519
return Ok(Async::Ready(tcp));
@@ -492,14 +524,14 @@ impl ConnectingTcpRemote {
492524
err = Some(e);
493525
if let Some(addr) = self.addrs.next() {
494526
debug!("connecting to {}", addr);
495-
*current = connect(&addr, local_addr, handle, reuse_address)?;
527+
*current = connect(&addr, local_addr, handle, reuse_address, self.connect_timeout)?;
496528
continue;
497529
}
498530
}
499531
}
500532
} else if let Some(addr) = self.addrs.next() {
501533
debug!("connecting to {}", addr);
502-
self.current = Some(connect(&addr, local_addr, handle, reuse_address)?);
534+
self.current = Some(connect(&addr, local_addr, handle, reuse_address, self.connect_timeout)?);
503535
continue;
504536
}
505537

@@ -508,7 +540,12 @@ impl ConnectingTcpRemote {
508540
}
509541
}
510542

511-
fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>, reuse_address: bool) -> io::Result<ConnectFuture> {
543+
enum MaybeTimedConnectFuture {
544+
Timed(Timeout<ConnectFuture>),
545+
Untimed(ConnectFuture),
546+
}
547+
548+
fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>, reuse_address: bool, connect_timeout: Option<Duration>) -> io::Result<MaybeTimedConnectFuture> {
512549
let builder = match addr {
513550
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
514551
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
@@ -540,7 +577,13 @@ fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handl
540577
None => Cow::Owned(Handle::default()),
541578
};
542579

543-
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, &handle))
580+
let stream = TcpStream::connect_std(builder.to_tcp_stream()?, addr, &handle);
581+
582+
if let Some(timeout) = connect_timeout {
583+
Ok(MaybeTimedConnectFuture::Timed(Timeout::new(stream, timeout)))
584+
} else {
585+
Ok(MaybeTimedConnectFuture::Untimed(stream))
586+
}
544587
}
545588

546589
impl ConnectingTcp {
@@ -706,7 +749,7 @@ mod tests {
706749
}
707750

708751
let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect();
709-
let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout), false);
752+
let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), None, Some(fallback_timeout), false);
710753
let fut = ConnectingTcpFuture(connecting_tcp);
711754

712755
let start = Instant::now();

0 commit comments

Comments
 (0)