Skip to content

Commit b7cab0a

Browse files
committed
Fix a potential deadlock on fork
If a fork happens while some thread is sending metrics and holds one of the locks, the lock would remain locked in the child process and would deadlock, either in the post_fork handler (when the post_fork hook tries to close the socket), or later (when user code tries to send a metric). Work around the issue by resetting the socket and buffer locks in the child process. If those were locked in the parent at the time of the fork, the internal client state may be inconsistent, so we reset it as well. With config lock, we can not reset the state to some known good state, and to avoid problems when fork is called while a thread modifies the client configuration, the config lock will be held across fork. Both the client and the parent can safely unlock it afterwards.
1 parent cca8ac7 commit b7cab0a

File tree

4 files changed

+95
-21
lines changed

4 files changed

+95
-21
lines changed

datadog/dogstatsd/base.py

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,26 @@ def pre_fork():
105105
c.pre_fork()
106106

107107

108-
def post_fork():
108+
def post_fork_parent():
109109
"""Restore all client instances after a fork.
110110
111111
If SUPPORTS_FORKING is true, this will be called automatically after os.fork().
112112
"""
113113
for c in _instances:
114-
c.post_fork()
114+
c.post_fork_parent()
115+
116+
117+
def post_fork_child():
118+
for c in _instances:
119+
c.post_fork_child()
115120

116121

117122
if SUPPORTS_FORKING:
118-
os.register_at_fork(before=pre_fork, after_in_child=post_fork, after_in_parent=post_fork) # type: ignore
123+
os.register_at_fork( # type: ignore
124+
before=pre_fork,
125+
after_in_child=post_fork_child,
126+
after_in_parent=post_fork_parent,
127+
)
119128

120129

121130
# pylint: disable=useless-object-inheritance,too-many-instance-attributes
@@ -473,7 +482,8 @@ def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
473482
Applications should call stop() before exiting to make sure all pending payloads are sent.
474483
475484
Compatible with os.fork() starting with Python 3.7. On earlier versions, compatible if applications
476-
arrange to call pre_fork() and post_fork() module functions around calls to os.fork().
485+
arrange to call pre_fork(), post_fork_parent() and post_fork_child() module functions around calls
486+
to os.fork().
477487
478488
:param sender_queue_size: Set the maximum number of packets to queue for the sender.
479489
How many packets to queue before blocking or dropping the packet if the packet queue is already full.
@@ -1397,29 +1407,46 @@ def wait_for_pending(self):
13971407
def pre_fork(self):
13981408
"""Prepare client for a process fork.
13991409
1400-
Flush any pending payloads, stop all background threads and
1401-
close the connection. Once the function returns.
1410+
Flush any pending payloads and stop all background threads.
14021411
14031412
The client should not be used from this point until
1404-
post_fork() is called.
1413+
state is restored by calling post_fork_parent() or
1414+
post_fork_child().
14051415
"""
1406-
log.debug("[%d] pre_fork for %s", os.getpid(), self)
14071416

1408-
self._forking = True
1417+
# Hold the config lock across fork. This will make sure that
1418+
# we don't fork in the middle of the concurrent modification
1419+
# of the client's settings. Data protected by other locks may
1420+
# be left in inconsistent state in the child process, which we
1421+
# will clean up in post_fork_child.
14091422

1410-
with self._config_lock:
1411-
self._stop_flush_thread()
1412-
self._stop_sender_thread()
1413-
self.close_socket()
1423+
self._config_lock.acquire()
1424+
self._stop_flush_thread()
1425+
self._stop_sender_thread()
14141426

1415-
def post_fork(self):
1416-
"""Restore the client state after a fork."""
1427+
def post_fork_parent(self):
1428+
"""Restore the client state after a fork in the parent process."""
1429+
self._start_flush_thread(self._flush_interval)
1430+
self._start_sender_thread()
1431+
self._config_lock.release()
14171432

1418-
log.debug("[%d] post_fork for %s", os.getpid(), self)
1433+
def post_fork_child(self):
1434+
"""Restore the client state after a fork in the child process."""
1435+
self._config_lock.release()
14191436

1420-
self.close_socket()
1437+
# Discard the locks that could have been locked at the time
1438+
# when we forked. This may cause inconsistent internal state,
1439+
# which we will fix in the next steps.
1440+
self._socket_lock = Lock()
1441+
self._buffer_lock = RLock()
14211442

1422-
self._forking = False
1443+
# Reset the buffer so we don't send metrics from the parent
1444+
# process. Also makes sure buffer properties are consistent.
1445+
self._reset_buffer()
1446+
# Execute the socket_path setter to reconcile transport and
1447+
# payload size properties in respect to socket_path value.
1448+
self.socket_path = self.socket_path
1449+
self.close_socket()
14231450

14241451
with self._config_lock:
14251452
self._start_flush_thread(self._flush_interval)

tests/integration/dogstatsd/test_statsd_fork.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import itertools
33
import socket
4+
import threading
45

56
import pytest
67

@@ -31,7 +32,7 @@ def inner(*args, **kwargs):
3132
return inner
3233

3334
statsd.pre_fork = track(statsd.pre_fork)
34-
statsd.post_fork = track(statsd.post_fork)
35+
statsd.post_fork_parent = track(statsd.post_fork_parent)
3536

3637
pid = os.fork()
3738
if pid == 0:
@@ -41,3 +42,49 @@ def inner(*args, **kwargs):
4142
os.waitpid(pid, 0)
4243

4344
assert len(tracker) == 2
45+
46+
47+
def sender_a(statsd, running):
48+
while running[0]:
49+
statsd.gauge("spam", 1)
50+
51+
52+
def sender_b(statsd, signal):
53+
while running[0]:
54+
with statsd:
55+
statsd.gauge("spam", 1)
56+
57+
@pytest.mark.parametrize(
58+
"disable_background_sender, disable_buffering, sender",
59+
list(itertools.product([True, False], [True, False], [sender_a, sender_b])),
60+
)
61+
def test_fork_with_thread(disable_background_sender, disable_buffering, sender):
62+
if not SUPPORTS_FORKING:
63+
pytest.skip("os.register_at_fork is required for this test")
64+
65+
statsd = DogStatsd(
66+
telemetry_min_flush_interval=0,
67+
disable_background_sender=disable_background_sender,
68+
disable_buffering=disable_buffering,
69+
)
70+
71+
sender = None
72+
try:
73+
sender_running = [True]
74+
sender = threading.Thread(target=sender, args=(statsd, sender_running))
75+
sender.daemon = True
76+
sender.start()
77+
78+
pid = os.fork()
79+
if pid == 0:
80+
os._exit(42)
81+
82+
assert pid > 0
83+
(_, status) = os.waitpid(pid, 0)
84+
85+
assert os.WEXITSTATUS(status) == 42
86+
finally:
87+
statsd.stop()
88+
if sender:
89+
sender_running[0] = False
90+
sender.join()

tests/integration/dogstatsd/test_statsd_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def test_fork_hooks(disable_background_sender, disable_buffering):
7676
assert statsd._queue is None or statsd._queue.empty()
7777
assert len(statsd._buffer) == 0
7878

79-
statsd.post_fork()
79+
statsd.post_fork_parent()
8080

8181
assert disable_buffering or statsd._flush_thread.is_alive()
8282
assert disable_background_sender or statsd._sender_thread.is_alive()

tests/unit/dogstatsd/test_statsd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2021,7 +2021,7 @@ def inner():
20212021
# Statsd should survive this sequence of events
20222022
statsd.pre_fork()
20232023
statsd.get_socket()
2024-
statsd.post_fork()
2024+
statsd.post_fork_parent()
20252025
t = Thread(target=inner)
20262026
t.daemon = True
20272027
t.start()

0 commit comments

Comments
 (0)