|
7 | 7 | import logging |
8 | 8 | import os |
9 | 9 | import socket |
| 10 | +from threading import Lock |
10 | 11 |
|
11 | 12 | # datadog |
12 | 13 | from datadog.dogstatsd.context import TimedContextManagerDecorator |
@@ -59,6 +60,8 @@ def __init__(self, host='localhost', port=8125, max_buffer_size=50, namespace=No |
59 | 60 | :type socket_path: string |
60 | 61 | """ |
61 | 62 |
|
| 63 | + self.lock = Lock() |
| 64 | + |
62 | 65 | # Connection |
63 | 66 | if socket_path is not None: |
64 | 67 | self.socket_path = socket_path |
@@ -114,16 +117,17 @@ def get_socket(self): |
114 | 117 | Note: connect the socket before assigning it to the class instance to |
115 | 118 | avoid bad thread race conditions. |
116 | 119 | """ |
117 | | - if not self.socket: |
118 | | - if self.socket_path is not None: |
119 | | - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
120 | | - sock.connect(self.socket_path) |
121 | | - sock.setblocking(0) |
122 | | - self.socket = sock |
123 | | - else: |
124 | | - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
125 | | - sock.connect((self.host, self.port)) |
126 | | - self.socket = sock |
| 120 | + with self.lock: |
| 121 | + if not self.socket: |
| 122 | + if self.socket_path is not None: |
| 123 | + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
| 124 | + sock.connect(self.socket_path) |
| 125 | + sock.setblocking(0) |
| 126 | + self.socket = sock |
| 127 | + else: |
| 128 | + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 129 | + sock.connect((self.host, self.port)) |
| 130 | + self.socket = sock |
127 | 131 |
|
128 | 132 | return self.socket |
129 | 133 |
|
@@ -289,9 +293,12 @@ def _send_to_server(self, packet): |
289 | 293 | except socket.timeout: |
290 | 294 | # dogstatsd is overflowing, drop the packets (mimicks the UDP behaviour) |
291 | 295 | return |
292 | | - except socket.error: |
293 | | - log.info("Error submitting packet, dropping the packet and closing the socket") |
| 296 | + except (socket.error, socket.herror, socket.gaierror) as se: |
| 297 | + log.warning("Error submitting packet: {}, dropping the packet and closing the socket".format(se)) |
294 | 298 | self.close_socket() |
| 299 | + except Exception as e: |
| 300 | + log.error("Unexpected error: %s", str(e)) |
| 301 | + return |
295 | 302 |
|
296 | 303 | def _send_to_buffer(self, packet): |
297 | 304 | self.buffer.append(packet) |
|
0 commit comments