Skip to content

Commit ea0985f

Browse files
committed
PROTON-2812: Implement async name lookup with c-ares
* TODO: Would be useful to implement a specific async name lookup call with a specific proactor event signalling the completed lookup for use in code that just provides us with fds.
1 parent e5d5c2b commit ea0985f

File tree

10 files changed

+706
-57
lines changed

10 files changed

+706
-57
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ jobs:
4545
- name: Install Linux dependencies
4646
if: runner.os == 'Linux'
4747
run: |
48-
sudo apt install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev softhsm2 opensc
48+
sudo apt install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev libc-ares-dev softhsm2 opensc
4949
- name: Install Windows dependencies
5050
if: runner.os == 'Windows'
5151
run: |

azure-pipelines/azure-pipelines.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
steps:
3737
- script: |
3838
sudo apt-get update
39-
sudo apt-get install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev
39+
sudo apt-get install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev libc-ares-dev
4040
name: InstallExtraStuff
4141
- template: steps.yml
4242
- job: MacOS

c/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,14 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT BUILD_PROACTOR))
368368
set (PROACTOR_OK epoll)
369369
set (qpid-proton-proactor src/proactor/epoll.c src/proactor/epoll_raw_connection.c src/proactor/epoll_timer.c ${qpid-proton-proactor-common})
370370
set (PROACTOR_LIBS Threads::Threads ${TIME_LIB})
371+
find_package(c-ares 1.16 CONFIG)
372+
option(ENABLE_ASYNC_DNS "Enable async DNS lookups (Using c-ares)" ${c-ares_FOUND})
373+
if (ENABLE_ASYNC_DNS)
374+
list(APPEND qpid-proton-proactor src/proactor/epoll_name_lookup_async.c)
375+
list(APPEND PROACTOR_LIBS c-ares::cares)
376+
else()
377+
list(APPEND qpid-proton-proactor src/proactor/epoll_name_lookup_sync.c)
378+
endif()
371379
endif()
372380
endif()
373381

c/src/proactor/epoll-internal.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ typedef enum {
6161
LISTENER_IO,
6262
PCONNECTION_IO,
6363
RAW_CONNECTION_IO,
64-
TIMER
64+
TIMER,
65+
NAME_LOOKUP_EPOLL /* Inner epoll for async name lookup (e.g. c-ares) */
6566
} epoll_type_t;
6667

6768
// Data to use with epoll.
@@ -78,7 +79,8 @@ typedef enum {
7879
PCONNECTION,
7980
LISTENER,
8081
RAW_CONNECTION,
81-
TIMER_MANAGER
82+
TIMER_MANAGER,
83+
NAME_LOOKUP
8284
} task_type_t;
8385

8486
typedef struct task_t {
@@ -142,9 +144,16 @@ typedef struct pni_timer_manager_t {
142144
bool sched_timeout;
143145
} pni_timer_manager_t;
144146

147+
typedef struct pname_lookup_t {
148+
task_t task;
149+
epoll_extended_t epoll_name_lookup;
150+
void *impl; /* NULL for sync; for async: implementation context */
151+
} pname_lookup_t;
152+
145153
struct pn_proactor_t {
146154
task_t task;
147155
pni_timer_manager_t timer_manager;
156+
pname_lookup_t name_lookup;
148157
epoll_extended_t epoll_schedule; /* ready list */
149158
epoll_extended_t epoll_interrupt;
150159
pn_event_batch_t batch;
@@ -364,7 +373,7 @@ bool proactor_remove(task_t *tsk);
364373
bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, tslot_t **resume_thread);
365374

366375
void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p);
367-
static void task_finalize(task_t* tsk) {
376+
static inline void task_finalize(task_t* tsk) {
368377
pmutex_finalize(&tsk->mutex);
369378
}
370379

@@ -377,7 +386,6 @@ bool start_polling(epoll_extended_t *ee, int epollfd);
377386
void stop_polling(epoll_extended_t *ee, int epollfd);
378387
void rearm_polling(epoll_extended_t *ee, int epollfd);
379388

380-
int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res);
381389
void configure_socket(int sock);
382390

383391
accepted_t *listener_accepted_next(pn_listener_t *listener);

c/src/proactor/epoll.c

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
#undef _GNU_SOURCE
6464

6565
#include "epoll-internal.h"
66+
#include "epoll_name_lookup.h"
6667
#include "proactor-internal.h"
6768
#include "core/engine-internal.h"
6869
#include "core/logger_private.h"
@@ -632,6 +633,10 @@ static inline pn_listener_t *task_listener(task_t *t) {
632633
return t->type == LISTENER ? containerof(t, pn_listener_t, task) : NULL;
633634
}
634635

636+
static inline pname_lookup_t *task_name_lookup(task_t *t) {
637+
return t->type == NAME_LOOKUP ? containerof(t, pname_lookup_t, task) : NULL;
638+
}
639+
635640
static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
636641
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
637642
static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch);
@@ -1408,15 +1413,6 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) {
14081413
pc->disconnected = true;
14091414
}
14101415

1411-
int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
1412-
{
1413-
// NOTE: getaddrinfo can block on DNS lookup (PROTON-2812).
1414-
struct addrinfo hints = { 0 };
1415-
hints.ai_family = AF_UNSPEC;
1416-
hints.ai_socktype = SOCK_STREAM;
1417-
hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
1418-
return getaddrinfo(host, port, &hints, res);
1419-
}
14201416

14211417
static inline bool is_inactive(pn_proactor_t *p) {
14221418
return (!p->tasks && !p->disconnects_pending && !p->timeout_set && !p->shutting_down);
@@ -1431,23 +1427,39 @@ bool schedule_if_inactive(pn_proactor_t *p) {
14311427
return false;
14321428
}
14331429

1430+
/* Called when connection name lookup completes (from name_lookup done_cb). Call with task lock held. */
1431+
static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, int gai_error) {
1432+
pn_proactor_t *p = pc->task.proactor;
1433+
bool notify = false;
1434+
if (gai_error) {
1435+
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
1436+
} else if (ai) {
1437+
pc->addrinfo = ai;
1438+
pc->ai = ai;
1439+
pconnection_maybe_connect_lh(pc);
1440+
if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task)) {
1441+
return;
1442+
}
1443+
}
1444+
notify = schedule(&pc->task);
1445+
if (notify) notify_poller(p);
1446+
}
1447+
1448+
static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_error) {
1449+
pconnection_t *pc = (pconnection_t *)user_data;
1450+
lock(&pc->task.mutex);
1451+
connection_lookup_done_lh(pc, ai, gai_error);
1452+
unlock(&pc->task.mutex);
1453+
}
1454+
14341455
// Call from pconnection_process with task lock held.
14351456
// Return true if the socket is connecting and there are no Proton events to deliver.
14361457
static bool pconnection_first_connect_lh(pconnection_t *pc) {
1458+
pn_proactor_t *p = pc->task.proactor;
14371459
unlock(&pc->task.mutex);
1438-
// TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups.
1439-
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
1460+
bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc, connection_done_cb);
14401461
lock(&pc->task.mutex);
1441-
1442-
if (!gai_error) {
1443-
pc->ai = pc->addrinfo;
1444-
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
1445-
if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task))
1446-
return true;
1447-
} else {
1448-
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
1449-
}
1450-
return false;
1462+
return rc;
14511463
}
14521464

14531465
void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
@@ -1579,7 +1591,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
15791591
pni_parse_addr(addr, l->addr_buf, sizeof(l->addr_buf), &l->host, &l->port);
15801592

15811593
struct addrinfo *addrinfo = NULL;
1582-
int gai_err = pgetaddrinfo(l->host, l->port, AI_PASSIVE | AI_ALL, &addrinfo);
1594+
int gai_err = pni_name_lookup_blocking(l->host, l->port, AI_PASSIVE | AI_ALL, &addrinfo);
15831595
if (!gai_err) {
15841596
/* Count addresses, allocate enough space for sockets */
15851597
size_t len = 0;
@@ -2021,23 +2033,27 @@ pn_proactor_t *pn_proactor(void) {
20212033
if ((p->epollfd = epoll_create(1)) >= 0) {
20222034
if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
20232035
if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
2024-
if (pni_timer_manager_init(&p->timer_manager))
2025-
if ((p->collector = pn_collector()) != NULL) {
2026-
p->batch.next_event = &proactor_batch_next;
2027-
start_polling(&p->timer_manager.epoll_timer, p->epollfd); // TODO: check for error
2028-
epoll_eventfd_init(&p->epoll_schedule, p->eventfd, p->epollfd, true);
2029-
epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false);
2030-
p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
2031-
grow_poller_bufs(p);
2032-
p->ready_list_generation = 1;
2033-
return p;
2036+
if (pni_timer_manager_init(&p->timer_manager)) {
2037+
if (pni_name_lookup_init(&p->name_lookup, p)) {
2038+
if ((p->collector = pn_collector()) != NULL) {
2039+
p->batch.next_event = &proactor_batch_next;
2040+
start_polling(&p->timer_manager.epoll_timer, p->epollfd); // TODO: check for error
2041+
epoll_eventfd_init(&p->epoll_schedule, p->eventfd, p->epollfd, true);
2042+
epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false);
2043+
p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
2044+
grow_poller_bufs(p);
2045+
p->ready_list_generation = 1;
2046+
return p;
2047+
}
20342048
}
2049+
}
20352050
}
20362051
}
20372052
}
20382053
if (p->epollfd >= 0) close(p->epollfd);
20392054
if (p->eventfd >= 0) close(p->eventfd);
20402055
if (p->interruptfd >= 0) close(p->interruptfd);
2056+
pni_name_lookup_cleanup(&p->name_lookup, p);
20412057
pni_timer_manager_finalize(&p->timer_manager);
20422058
pmutex_finalize(&p->timeout_mutex);
20432059
pmutex_finalize(&p->tslot_mutex);
@@ -2071,11 +2087,15 @@ void pn_proactor_free(pn_proactor_t *p) {
20712087
case RAW_CONNECTION:
20722088
pni_raw_connection_forced_shutdown(pni_task_raw_connection(tsk));
20732089
break;
2090+
case NAME_LOOKUP:
2091+
pni_name_lookup_forced_shutdown(task_name_lookup(tsk));
2092+
break;
20742093
default:
20752094
break;
20762095
}
20772096
}
20782097

2098+
pni_name_lookup_cleanup(&p->name_lookup, p);
20792099
pni_timer_manager_finalize(&p->timer_manager);
20802100
pn_collector_free(p->collector);
20812101
pmutex_finalize(&p->timeout_mutex);
@@ -2309,6 +2329,11 @@ static pn_event_batch_t *process(task_t *tsk) {
23092329
batch = pni_timer_manager_process(tm, timeout, tsk_ready);
23102330
break;
23112331
}
2332+
case NAME_LOOKUP:
2333+
unlock(&p->sched_mutex);
2334+
pni_name_lookup_process_events(&p->name_lookup);
2335+
batch = NULL;
2336+
break;
23122337
default:
23132338
assert(NULL);
23142339
}
@@ -2350,6 +2375,11 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
23502375
}
23512376
// else if (ee->fd == p->eventfd)... schedule_ready_list already performed by poller task.
23522377
break;
2378+
case NAME_LOOKUP_EPOLL: {
2379+
tsk = &p->name_lookup.task;
2380+
tsk->sched_pending = true;
2381+
break;
2382+
}
23532383
case PCONNECTION_IO: {
23542384
psocket_t *ps = containerof(ee, psocket_t, epoll_io);
23552385
pconnection_t *pc = psocket_pconnection(ps);
@@ -2581,11 +2611,10 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
25812611
unlock(&p->eventfd_mutex);
25822612
}
25832613

2584-
int timeout = (epoll_immediate) ? 0 : -1;
2585-
p->poller_suspended = (timeout == -1);
2614+
p->poller_suspended = !epoll_immediate;
25862615
unlock(&p->sched_mutex);
25872616

2588-
n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
2617+
n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, epoll_immediate ? 0 : -1);
25892618

25902619
lock(&p->sched_mutex);
25912620
p->poller_suspended = false;
@@ -2612,8 +2641,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
26122641
unlock(&p->eventfd_mutex);
26132642

26142643
if (n_events < 0) {
2615-
if (errno != EINTR)
2644+
if (errno != EINTR) {
26162645
perror("epoll_wait"); // TODO: proper log
2646+
}
26172647
if (!can_block && !unpolled_work)
26182648
return true;
26192649
else
@@ -2622,8 +2652,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
26222652
if (!can_block && !unpolled_work)
26232653
return true;
26242654
else {
2625-
if (!epoll_immediate)
2655+
if (!epoll_immediate) {
26262656
perror("epoll_wait unexpected timeout"); // TODO: proper log
2657+
}
26272658
if (!unpolled_work)
26282659
continue;
26292660
}

c/src/proactor/epoll_name_lookup.h

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
*/
20+
21+
#ifndef PROACTOR_NAME_LOOKUP_H
22+
#define PROACTOR_NAME_LOOKUP_H
23+
24+
#include "epoll-internal.h"
25+
26+
#ifdef __cplusplus
27+
extern "C" {
28+
#endif
29+
30+
/* Blocking name lookup - used by listeners and as fallback for connections.
31+
* Same signature as getaddrinfo.
32+
*/
33+
int pni_name_lookup_blocking(const char *host, const char *port, int flags, struct addrinfo **res);
34+
35+
/* Initialize name lookup subsystem. Returns false if error. */
36+
bool pni_name_lookup_init(pname_lookup_t *nl, pn_proactor_t *p);
37+
38+
/* Cleanup name lookup subsystem. */
39+
void pni_name_lookup_cleanup(pname_lookup_t *nl, pn_proactor_t *p);
40+
41+
/* Callback when lookup completes. Called with (user_data, addrinfo or NULL, gai_error). */
42+
typedef void (*pni_nl_done_cb)(void *user_data, struct addrinfo *ai, int gai_error);
43+
44+
/* Start a name lookup. Returns true if started successfully. */
45+
bool pni_name_lookup_start(pname_lookup_t *nl, const char *host, const char *port, void *user_data, pni_nl_done_cb done_cb);
46+
47+
/* Clear lookup state when forcing shutdown. */
48+
void pni_name_lookup_forced_shutdown(pname_lookup_t *nl);
49+
50+
/* Process events. */
51+
void pni_name_lookup_process_events(pname_lookup_t *nl);
52+
53+
#ifdef __cplusplus
54+
}
55+
#endif
56+
57+
#endif /* PROACTOR_NAME_LOOKUP_H */

0 commit comments

Comments
 (0)