LLT-7053: Raw DNS forwarder#1709
Conversation
0ef4b6a to
214691c
Compare
214691c to
699976a
Compare
7d71c6e to
1b4d02e
Compare
7bd6827 to
377c2bc
Compare
377c2bc to
955a15f
Compare
1fd7087 to
a8bc6c2
Compare
955a15f to
170b52a
Compare
a8bc6c2 to
3676dbd
Compare
0beecb0 to
87e30d5
Compare
87e30d5 to
7a69b3b
Compare
7a69b3b to
9aa76d9
Compare
6fd70ed to
e14c0f8
Compare
ed2af6b to
44cb4d5
Compare
fa508e3 to
5cd3d05
Compare
5cd3d05 to
3bc7a17
Compare
0ddbe6e to
dbbfec7
Compare
dbbfec7 to
c37a86f
Compare
c37a86f to
0621e15
Compare
Add component that forwards raw DNS queries to upstream resolvers over UDP socket
0621e15 to
e4660a6
Compare
| /// Errors returned when forwarding a DNS query | ||
| #[derive(Error, Debug)] | ||
| pub enum ForwardError { | ||
| /// Failed upstream socket bind operation | ||
| #[error("Failed socket bind operation: {0}")] | ||
| SocketBind(#[from] io::Error), | ||
| /// Failed to send a DNS query to the upstream resolver | ||
| #[error("Failed to send DNS query: {0}")] | ||
| Send(io::Error), | ||
| /// No upstreams configured | ||
| #[error("No upstream resolvers configured")] | ||
| NoUpstreams, | ||
| /// The upstream resolvers did not respond within the configured timeout | ||
| #[error("DNS query timed out")] | ||
| Timeout, | ||
| /// The forwarder channel was closed | ||
| #[error("Forwarder channel closed")] | ||
| ChannelClosed, | ||
| /// Too many concurrent requests | ||
| #[error("Too many concurrent requests in flight")] | ||
| TooManyRequests, | ||
| /// The DNS packet is too short | ||
| #[error("DNS packet too short")] | ||
| PacketTooShort, | ||
| } |
There was a problem hiding this comment.
nit: SocketBind and Send read like operation names, while the rest of the variants (NoUpstreams, Timeout, ChannelClosed, TooManyRequests, PacketTooShort) describe an error state. Consider renaming them for consistency.
| let first_upstream = { | ||
| let locked = upstreams.lock().await; | ||
| locked.first().cloned() | ||
| }; | ||
|
|
||
| let upstream_addr = match first_upstream { | ||
| Some(addr) => addr, | ||
| None => { | ||
| send_channel_response!(msg.respond_to, Err(ForwardError::NoUpstreams)); | ||
| return; | ||
| } | ||
| }; |
There was a problem hiding this comment.
nit: Consider this:
| let first_upstream = { | |
| let locked = upstreams.lock().await; | |
| locked.first().cloned() | |
| }; | |
| let upstream_addr = match first_upstream { | |
| Some(addr) => addr, | |
| None => { | |
| send_channel_response!(msg.respond_to, Err(ForwardError::NoUpstreams)); | |
| return; | |
| } | |
| }; | |
| let Some(upstream_addr) = upstreams.lock().await.first().cloned() else { | |
| send_channel_response!(msg.respond_to, Err(ForwardError::NoUpstreams)); | |
| return; | |
| }; |
There was a problem hiding this comment.
Would this still hold the lock on upstreams while doing send_channel_response?
| } | ||
|
|
||
| /// Handle new query | ||
| async fn handle_new_query( |
There was a problem hiding this comment.
nit: Consider extracting the body into a Result-returning helper (e.g. prepare_query) and dispatching the result once at the end. The current shape repeats the match { Ok(v) => v, Err(e) => { send_channel_response!(...); return; } } pattern four times — collapsing them with ? would significantly shorten the function. But it is readable as it is so just a nitpick. Not sure if it would be readable after doing this.
| let expired_ids: Vec<(u16, bool)> = pending | ||
| .iter() | ||
| .filter(|(_, entry)| entry.deadline <= now || entry.respond_to.is_closed()) | ||
| .map(|(&id, entry)| (id, entry.respond_to.is_closed())) | ||
| .collect(); |
There was a problem hiding this comment.
nit: is_closed() is called twice per entry — once in filter, once in map. A single-pass filter_map is possible:
let expired_ids: Vec<(u16, bool)> = pending
.iter()
.filter_map(|(&id, entry)| {
let closed = entry.respond_to.is_closed();
(entry.deadline <= now || closed).then_some((id, closed))
})
.collect();| let is_known_upstream = { | ||
| let locked = upstreams.lock().await; | ||
| locked.iter().any(|u| u.ip() == src.ip()) | ||
| }; | ||
| if !is_known_upstream { | ||
| telio_log_warn!("Received DNS response from unknown source: {src}, ignoring"); | ||
| continue; | ||
| } |
There was a problem hiding this comment.
nit: this branch has no test
There was a problem hiding this comment.
It's not so straight forward, since it only checks for the IP address, and in unit tests all the injected responses would be from localhost 🤔 It's probably better tested in nat-lab
Unless we change the check to include source port as well, it would be more strict. But I don't know how often could servers use ephemeral ports for their responses (for example for load balancing).. probably need to check this more
| fn allocate_id(pending: &HashMap<u16, PendingQuery>, next_id: &mut u16) -> Option<u16> { | ||
| for _ in 0..DNS_ID_SPACE { | ||
| let candidate = *next_id; | ||
| *next_id = next_id.wrapping_add(1); | ||
| if !pending.contains_key(&candidate) { | ||
| return Some(candidate); | ||
| } | ||
| } | ||
| None | ||
| } |
There was a problem hiding this comment.
allocate_id walks pending linearly and bails out after a full sweep — correctness depends on IDs being released back into the pool once a query is delivered or times out. That contract is not directly tested anywhere; a regression here (e.g. an entry forgotten in pending on some error path) would show up as a slow leak, not a unit test failure.
Easy smoke test: drive the forwarder through more queries than DNS_ID_SPACE and assert no TooManyRequests. The looping echo stub used by spawn_multi_stub is a good base — spawn_stub exits after one packet so it cannot serve enough queries here.
#[tokio::test]
async fn ids_are_released_after_response() {
let (addr, _h) = spawn_multi_stub(DNS_ID_SPACE as usize + 1000).await;
let forwarder = RawForwarder::new().await.unwrap();
forwarder.set_upstreams(vec![addr]).await;
for i in 0..(DNS_ID_SPACE as u32 + 1000) {
let req = make_dns_packet(i as u16, b"x");
forwarder.query(&req).await.expect("id pool exhausted — leak in `pending`");
}
}| /// Handle expired queries | ||
| async fn handle_timeouts( | ||
| socket: &UdpSocket, | ||
| upstreams: &Arc<Mutex<Vec<SocketAddr>>>, | ||
| timeout: &Arc<Mutex<Duration>>, | ||
| pending: &mut HashMap<u16, PendingQuery>, | ||
| ) { | ||
| let now = Instant::now(); | ||
| let expired_ids: Vec<(u16, bool)> = pending | ||
| .iter() | ||
| .filter(|(_, entry)| entry.deadline <= now || entry.respond_to.is_closed()) | ||
| .map(|(&id, entry)| (id, entry.respond_to.is_closed())) | ||
| .collect(); | ||
|
|
||
| if expired_ids.is_empty() { | ||
| return; | ||
| } | ||
|
|
||
| let current_upstreams = { | ||
| let locked = upstreams.lock().await; | ||
| locked.clone() | ||
| }; | ||
|
|
||
| for (internal_id, is_closed) in expired_ids { | ||
| let mut entry = match pending.remove(&internal_id) { | ||
| Some(e) => e, | ||
| None => continue, | ||
| }; | ||
|
|
||
| if is_closed { | ||
| telio_log_warn!("Caller dropped for: {internal_id}"); | ||
| continue; | ||
| } | ||
|
|
||
| let next_index = entry.upstream_index + 1; | ||
| match current_upstreams.get(next_index) { | ||
| Some(&next_upstream) => { | ||
| telio_log_debug!( | ||
| "Upstream timed out for request: {internal_id}, trying next: {next_upstream}" | ||
| ); | ||
| entry.upstream_index = next_index; | ||
| entry.deadline = Instant::now() + *timeout.lock().await; | ||
|
|
||
| if let Err(e) = socket.send_to(&entry.query_bytes, next_upstream).await { | ||
| send_channel_response!(entry.respond_to, Err(ForwardError::Send(e))); | ||
| continue; | ||
| } | ||
|
|
||
| pending.insert(internal_id, entry); | ||
| } | ||
| None => { | ||
| telio_log_warn!("All upstreams exhausted for request: {internal_id}"); | ||
| send_channel_response!(entry.respond_to, Err(ForwardError::Timeout)); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Bug: upstream_index is captured at submit time but resolved here against current_upstreams, which may have been replaced via set_upstreams while this query was pending. After a swap, entry.upstream_index + 1 can point at an unrelated resolver — possibly the same one we just timed out on.
Suggestion: store a snapshot of the upstream list (or the next SocketAddr directly) inside PendingQuery so retry behavior is independent of mutations to the shared list.
Regression test that fails on the current implementation with count_a == 2 (the retry hits the same blackhole that just timed out):
// Demonstrates that `PendingQuery::upstream_index` is unstable across
// `set_upstreams`: the index is captured at submit time but resolved later
// against the *current* upstream list, so a reorder/replace mid-flight makes
// the retry land on the wrong resolver — possibly the same one we just
// timed out on.
//
// Sequence:
// 1. upstreams = [addr_a]; submit one query → goes to addr_a (index 0).
// 2. mid-flight, swap upstreams to [echo_addr, addr_a].
// 3. timeout fires; retry uses next_index = upstream_index + 1 = 1.
// 4. current_upstreams[1] is addr_a → blackhole gets a SECOND packet.
//
// The unambiguous bug signal is `count_a == 2`. Whether the query should
// ultimately succeed via echo or fail with Timeout depends on the eventual
// fix design (snapshot at submit time vs. "use latest list, skip already-tried").
#[tokio::test]
async fn retry_targets_wrong_upstream_after_list_reorder() {
use std::sync::atomic::{AtomicUsize, Ordering};
// Counting blackhole: records every datagram but never sends a reply.
let socket_a = UdpSocket::bind(\"127.0.0.1:0\").await.unwrap();
let addr_a = socket_a.local_addr().unwrap();
let count_a = Arc::new(AtomicUsize::new(0));
let count_a_task = count_a.clone();
let _stub_a = tokio::spawn(async move {
let mut buf = vec![0u8; 4096];
loop {
if socket_a.recv_from(&mut buf).await.is_ok() {
count_a_task.fetch_add(1, Ordering::SeqCst);
}
}
});
let (echo_addr, _echo) = spawn_stub(StubBehavior::Echo).await;
let forwarder = RawForwarder::new().await.unwrap();
forwarder.set_upstreams(vec![addr_a]).await;
forwarder.set_timeout(Duration::from_millis(100)).await;
let f = forwarder.clone();
let query_handle = tokio::spawn(async move {
let request = make_dns_packet(TEST_PACKET_ID, TEST_DNS_PAYLOAD);
f.query(&request).await
});
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(count_a.load(Ordering::SeqCst), 1, "initial query did not reach addr_a yet");
// Reorder upstreams BEFORE timeout fires:
// index 0 -> echo_addr
// index 1 -> addr_a <-- this is what handle_timeouts will pick
forwarder.set_upstreams(vec![echo_addr, addr_a]).await;
let _result = query_handle.await.unwrap();
assert_eq!(
count_a.load(Ordering::SeqCst),
1,
"addr_a was retried after upstream list reorder — got {} packets, expected 1",
count_a.load(Ordering::SeqCst)
);
}| /// Extract the 16-bit transaction ID of a DNS packet | ||
| fn get_dns_id(packet: &[u8]) -> Result<u16, ForwardError> { | ||
| if packet.len() < DNS_HEADER_OFFSET { | ||
| return Err(ForwardError::PacketTooShort); | ||
| } | ||
|
|
||
| // This is ok because the size is checked above | ||
| #[allow(clippy::indexing_slicing)] | ||
| Ok(u16::from_be_bytes([packet[0], packet[1]])) | ||
| } | ||
|
|
||
| /// Overwrite the 16-bit transaction ID of a DNS packet | ||
| fn set_dns_id(packet: &mut [u8], id: u16) -> Result<(), ForwardError> { | ||
| if packet.len() < DNS_HEADER_OFFSET { | ||
| return Err(ForwardError::PacketTooShort); | ||
| } | ||
| let bytes = id.to_be_bytes(); | ||
|
|
||
| // This is ok because the size is checked above | ||
| #[allow(clippy::indexing_slicing)] | ||
| { | ||
| packet[0] = bytes[0]; | ||
| packet[1] = bytes[1]; | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Larger refactor option (only worth it if more header inspection is on the horizon — get_flags, is_response, rcode, qdcount, etc.): split validation from access. A validate_dns_header gate validates once and returns a typed reference; getters/setters take the already-validated &[u8; 12], do not check length, do not return Result, do not need #[allow].
fn validate_dns_header(packet: &[u8]) -> Result<&[u8; DNS_HEADER_OFFSET], ForwardError> {
packet.first_chunk().ok_or(ForwardError::PacketTooShort)
}
fn validate_dns_header_mut(
packet: &mut [u8],
) -> Result<&mut [u8; DNS_HEADER_OFFSET], ForwardError> {
packet.first_chunk_mut().ok_or(ForwardError::PacketTooShort)
}
fn get_dns_id(header: &[u8; DNS_HEADER_OFFSET]) -> u16 {
u16::from_be_bytes([header[0], header[1]])
}
fn set_dns_id(header: &mut [u8; DNS_HEADER_OFFSET], id: u16) {
let bytes = id.to_be_bytes();
header[0] = bytes[0];
header[1] = bytes[1];
}Indexing [0]/[1] on &[u8; 12] is compile-time safe (the type guarantees the length), so clippy stays quiet without #[allow]. Call sites validate once on entry to handle_new_query / handle_response and then pass the typed header around.
Trade-off: two layers instead of one for the current ID-only use case. If ID is all you will ever read from the header, this is over-engineered. If more header fields show up later, this scales without duplicating length checks across every getter.
Problem
Non
.nordDNS queries are forwarded through hickory-server's ForwardAuthority zone. We want to removehickorydependencies as it adds unnecessary overhead, requires additional maintenance and was a source of bugs.Solution
Add a
RawForwarder, that sends DNS queries directly to upstream resolvers as UDP packets.The forwarder will be integrated in a subsequent PR
☑️ Definition of Done checklist