Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions src/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,6 @@ pub(crate) enum PollAt {
Ingress,
}

impl PollAt {
#[cfg(feature = "socket-tcp")]
fn is_ingress(&self) -> bool {
match *self {
PollAt::Ingress => true,
_ => false,
}
}
}

/// A network socket.
///
/// This enumeration abstracts the various types of sockets based on the IP protocol.
Expand Down
198 changes: 192 additions & 6 deletions src/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ enum Timer {
}
}

const ACK_DELAY_DEFAULT: Duration = Duration { millis: 10 };
const CLOSE_DELAY: Duration = Duration { millis: 10_000 };

impl Default for Timer {
Expand Down Expand Up @@ -341,6 +342,12 @@ pub struct TcpSocket<'a> {
/// each other which have the same ACK number.
local_rx_dup_acks: u8,

/// Duration for Delayed ACK. If None no ACKs will be delayed.
ack_delay: Option<Duration>,
/// Delayed ack timer. If set, packets containing exclusively
/// ACK or window updates (ie, no data) won't be sent until expiry.
ack_delay_until: Option<Instant>,

#[cfg(feature = "async")]
rx_waker: WakerRegistration,
#[cfg(feature = "async")]
Expand Down Expand Up @@ -397,6 +404,8 @@ impl<'a> TcpSocket<'a> {
local_rx_last_ack: None,
local_rx_last_seq: None,
local_rx_dup_acks: 0,
ack_delay: Some(ACK_DELAY_DEFAULT),
ack_delay_until: None,

#[cfg(feature = "async")]
rx_waker: WakerRegistration::new(),
Expand Down Expand Up @@ -453,6 +462,13 @@ impl<'a> TcpSocket<'a> {
self.timeout
}

/// Return the ACK delay duration.
///
/// See also the [set_ack_delay](#method.set_ack_delay) method.
pub fn ack_delay(&self) -> Option<Duration> {
self.ack_delay
}

/// Return the current window field value, including scaling according to RFC 1323.
///
/// Used in internal calculations as well as packet generation.
Expand All @@ -478,6 +494,13 @@ impl<'a> TcpSocket<'a> {
self.timeout = duration
}

/// Set the ACK delay duration.
///
/// By default, the ACK delay is set to 10ms.
pub fn set_ack_delay(&mut self, duration: Option<Duration>) {
self.ack_delay = duration
}

/// Return the keep-alive interval.
///
/// See also the [set_keep_alive](#method.set_keep_alive) method.
Expand Down Expand Up @@ -578,6 +601,8 @@ impl<'a> TcpSocket<'a> {
self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8;
self.remote_mss = DEFAULT_MSS;
self.remote_last_ts = None;
self.ack_delay = Some(ACK_DELAY_DEFAULT);
self.ack_delay_until = None;

#[cfg(feature = "async")]
{
Expand Down Expand Up @@ -1541,6 +1566,30 @@ impl<'a> TcpSocket<'a> {
self.assembler);
}

// Handle delayed acks
if let Some(ack_delay) = self.ack_delay {
if self.ack_to_transmit() || self.window_to_update() {
self.ack_delay_until = match self.ack_delay_until {
None => {
net_trace!("{}:{}:{}: starting delayed ack timer",
self.meta.handle, self.local_endpoint, self.remote_endpoint
);

Some(timestamp + ack_delay)
}
// RFC1122 says "in a stream of full-sized segments there SHOULD be an ACK
// for at least every second segment".
// For now, we send an ACK every second received packet, full-sized or not.
Some(_) => {
net_trace!("{}:{}:{}: delayed ack timer already started, forcing expiry",
self.meta.handle, self.local_endpoint, self.remote_endpoint
);
None
}
};
}
}

// Per RFC 5681, we should send an immediate ACK when either:
// 1) an out-of-order segment is received, or
// 2) a segment arrives that fills in all or part of a gap in sequence space.
Expand Down Expand Up @@ -1590,6 +1639,13 @@ impl<'a> TcpSocket<'a> {
can_data || can_fin
}

fn delayed_ack_expired(&self, timestamp: Instant) -> bool {
match self.ack_delay_until {
None => true,
Some(t) => t <= timestamp,
}
}

fn ack_to_transmit(&self) -> bool {
if let Some(remote_last_ack) = self.remote_last_ack {
remote_last_ack < self.remote_seq_no + self.rx_buffer.len()
Expand Down Expand Up @@ -1644,11 +1700,11 @@ impl<'a> TcpSocket<'a> {
// If we have data to transmit and it fits into partner's window, do it.
net_trace!("{}:{}:{}: outgoing segment will send data or flags",
self.meta.handle, self.local_endpoint, self.remote_endpoint);
} else if self.ack_to_transmit() {
} else if self.ack_to_transmit() && self.delayed_ack_expired(timestamp) {
// If we have data to acknowledge, do it.
net_trace!("{}:{}:{}: outgoing segment will acknowledge",
self.meta.handle, self.local_endpoint, self.remote_endpoint);
} else if self.window_to_update() {
} else if self.window_to_update() && self.delayed_ack_expired(timestamp) {
// If we have window length increase to advertise, do it.
net_trace!("{}:{}:{}: outgoing segment will update window",
self.meta.handle, self.local_endpoint, self.remote_endpoint);
Expand Down Expand Up @@ -1812,6 +1868,15 @@ impl<'a> TcpSocket<'a> {
// the keep-alive timer.
self.timer.rewind_keep_alive(timestamp, self.keep_alive);

// Reset delayed-ack timer
if self.ack_delay_until.is_some() {
net_trace!("{}:{}:{}: stop delayed ack timer",
self.meta.handle, self.local_endpoint, self.remote_endpoint
);

self.ack_delay_until = None;
}

// Leave the rest of the state intact if sending a keep-alive packet, since those
// carry a fake segment.
if is_keep_alive { return Ok(()) }
Expand Down Expand Up @@ -1851,10 +1916,17 @@ impl<'a> TcpSocket<'a> {
} else if self.state == State::Closed {
// Socket was aborted, we have an RST packet to transmit.
PollAt::Now
} else if self.seq_to_transmit() || self.ack_to_transmit() || self.window_to_update() {
} else if self.seq_to_transmit() {
// We have a data or flag packet to transmit.
PollAt::Now
} else {
let want_ack = self.ack_to_transmit() || self.window_to_update();
let delayed_ack_poll_at = match (want_ack, self.ack_delay_until) {
(false, _) => PollAt::Ingress,
(true, None) => PollAt::Now,
(true, Some(t)) => PollAt::Time(t),
};

let timeout_poll_at = match (self.remote_last_ts, self.timeout) {
// If we're transmitting or retransmitting data, we need to poll at the moment
// when the timeout would expire.
Expand All @@ -1864,9 +1936,8 @@ impl<'a> TcpSocket<'a> {
};

// We wait for the earliest of our timers to fire.
*[self.timer.poll_at(), timeout_poll_at]
*[self.timer.poll_at(), timeout_poll_at, delayed_ack_poll_at]
.iter()
.filter(|x| !x.is_ingress())
.min().unwrap_or(&PollAt::Ingress)
}
}
Expand Down Expand Up @@ -2076,7 +2147,9 @@ mod test {

let rx_buffer = SocketBuffer::new(vec![0; rx_len]);
let tx_buffer = SocketBuffer::new(vec![0; tx_len]);
TcpSocket::new(rx_buffer, tx_buffer)
let mut socket = TcpSocket::new(rx_buffer, tx_buffer);
socket.set_ack_delay(None);
socket
}

fn socket_syn_received_with_buffer_sizes(
Expand Down Expand Up @@ -5084,6 +5157,119 @@ mod test {
assert_eq!(s.recv(|_| (0, ())), Err(Error::Illegal));
}

// =========================================================================================//
// Tests for delayed ACK
// =========================================================================================//

#[test]
fn test_delayed_ack() {
let mut s = socket_established();
s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abc"[..],
..SEND_TEMPL
});

// No ACK is immediately sent.
recv!(s, Err(Error::Exhausted));

// After 10ms, it is sent.
recv!(s, time 11, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 3),
window_len: 61,
..RECV_TEMPL
}));
}

#[test]
fn test_delayed_ack_win() {
let mut s = socket_established();
s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abc"[..],
..SEND_TEMPL
});

// Reading the data off the buffer should cause a window update.
s.recv(|data| {
assert_eq!(data, b"abc");
(3, ())
}).unwrap();

// However, no ACK or window update is immediately sent.
recv!(s, Err(Error::Exhausted));

// After 10ms, it is sent.
recv!(s, time 11, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 3),
..RECV_TEMPL
}));
}

#[test]
fn test_delayed_ack_reply() {
let mut s = socket_established();
s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abc"[..],
..SEND_TEMPL
});

s.recv(|data| {
assert_eq!(data, b"abc");
(3, ())
}).unwrap();

s.send_slice(&b"xyz"[..]).unwrap();

// Writing data to the socket causes ACK to not be delayed,
// because it is immediately sent with the data.
recv!(s, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 3),
payload: &b"xyz"[..],
..RECV_TEMPL
}));
}

#[test]
fn test_delayed_ack_every_second_packet() {
let mut s = socket_established();
s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abc"[..],
..SEND_TEMPL
});

// No ACK is immediately sent.
recv!(s, Err(Error::Exhausted));

send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1 + 3,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"def"[..],
..SEND_TEMPL
});

// Every 2nd packet, ACK is sent without delay.
recv!(s, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 6),
window_len: 58,
..RECV_TEMPL
}));
}

// =========================================================================================//
// Tests for packet filtering.
// =========================================================================================//
Expand Down