From 4dda0befc972a8c325d657c18fa123f6f73bb7c2 Mon Sep 17 00:00:00 2001 From: Tomasz Pytel Date: Thu, 10 Dec 2020 17:44:01 -0300 Subject: [PATCH 1/6] [Bugfix] allow pending data to send before exit --- skywalking/agent/__init__.py | 12 ++++++++---- skywalking/agent/protocol/__init__.py | 2 +- skywalking/agent/protocol/grpc.py | 12 +++++++++--- skywalking/agent/protocol/http.py | 11 ++++++++--- skywalking/agent/protocol/kafka.py | 11 ++++++++--- 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 3a6b001a..e494b30e 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -15,12 +15,13 @@ # limitations under the License. # -from skywalking.loggings import logger +import atexit from queue import Queue from threading import Thread, Event from typing import TYPE_CHECKING -from skywalking import config, plugins +from skywalking import config, plugins, loggings +from skywalking.loggings import logger from skywalking.agent.protocol import Protocol if TYPE_CHECKING: @@ -38,7 +39,8 @@ def __heartbeat(): def __report(): while not __finished.is_set(): if connected(): - __protocol.report(__queue) # is blocking actually + while __protocol.report(__queue): # blocking but has timeout + pass __finished.wait(1) @@ -70,16 +72,18 @@ def start(): global __started if __started: raise RuntimeError('the agent can only be started once') - from skywalking import loggings loggings.init() config.finalize() __started = True __init() __heartbeat_thread.start() __report_thread.start() + atexit.register(__protocol.report, (__queue, False)) def stop(): + atexit.unregister(__protocol.report) + __protocol.report(__queue, False) __finished.set() diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py index 8237f327..0f6e62e5 100644 --- a/skywalking/agent/protocol/__init__.py +++ b/skywalking/agent/protocol/__init__.py @@ -26,5 +26,5 @@ def connected(self): def heartbeat(self): raise NotImplementedError() - def report(self, queue: Queue): + def report(self, queue: Queue, block: bool = True): raise NotImplementedError() diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 0a7da16f..3b14b2a7 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -18,7 +18,7 @@ import logging from skywalking.loggings import logger import traceback -from queue import Queue +from queue import Queue, Empty import grpc @@ -67,10 +67,13 @@ def on_error(self): self.channel.unsubscribe(self._cb) self.channel.subscribe(self._cb, try_to_connect=True) - def report(self, queue: Queue): + def report(self, queue: Queue, block: bool = True): def generator(): while True: - segment = queue.get() # type: Segment + try: + segment = queue.get(block=block, timeout=0.5) # type: Segment + except Empty: + break logger.debug('reporting segment %s', segment) @@ -117,5 +120,8 @@ def generator(): try: self.traces_reporter.report(generator()) + + return True + except grpc.RpcError: self.on_error() diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py index e97cb227..9a0a1256 100644 --- a/skywalking/agent/protocol/http.py +++ b/skywalking/agent/protocol/http.py @@ -16,7 +16,7 @@ # from skywalking.loggings import logger -from queue import Queue +from queue import Queue, Empty from skywalking.agent import Protocol from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService @@ -38,10 +38,13 @@ def heartbeat(self): def connected(self): return True - def report(self, queue: Queue): + def report(self, queue: Queue, block: bool = True): def generator(): while True: - segment = queue.get() # type: Segment + try: + segment = queue.get(block=block, timeout=0.5) # type: Segment + except Empty: + break logger.debug('reporting segment %s', segment) @@ -50,3 +53,5 @@ def generator(): queue.task_done() self.traces_reporter.report(generator=generator()) + + return True diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py index 405a7288..35342ccb 100644 --- a/skywalking/agent/protocol/kafka.py +++ b/skywalking/agent/protocol/kafka.py @@ -17,7 +17,7 @@ import logging from skywalking.loggings import logger, getLogger -from queue import Queue +from queue import Queue, Empty from skywalking import config from skywalking.agent import Protocol @@ -42,10 +42,13 @@ def connected(self): def heartbeat(self): self.service_management.send_heart_beat() - def report(self, queue: Queue): + def report(self, queue: Queue, block: bool = True): def generator(): while True: - segment = queue.get() # type: Segment + try: + segment = queue.get(block=block, timeout=0.5) # type: Segment + except Empty: + break logger.debug('reporting segment %s', segment) @@ -91,3 +94,5 @@ def generator(): queue.task_done() self.traces_reporter.report(generator()) + + return True From ae9205d250cf2ac3bbeace228dee8707645c2297 Mon Sep 17 00:00:00 2001 From: Tomasz Pytel Date: Thu, 10 Dec 2020 19:47:12 -0300 Subject: [PATCH 2/6] this should be 100% reliable instead of 99.999999% --- skywalking/agent/__init__.py | 11 ++++++++--- skywalking/agent/protocol/grpc.py | 7 ++----- skywalking/agent/protocol/http.py | 7 ++----- skywalking/agent/protocol/kafka.py | 7 ++----- 4 files changed, 14 insertions(+), 18 deletions(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index e494b30e..ac2b18f7 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -68,6 +68,11 @@ def __init(): plugins.install() +def __fini(): + __protocol.report(__queue, False) + __queue.join() + + def start(): global __started if __started: @@ -78,12 +83,12 @@ def start(): __init() __heartbeat_thread.start() __report_thread.start() - atexit.register(__protocol.report, (__queue, False)) + atexit.register(__fini) def stop(): - atexit.unregister(__protocol.report) - __protocol.report(__queue, False) + atexit.unregister(__fini) + __fini() __finished.set() diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 3b14b2a7..024097d3 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -18,7 +18,7 @@ import logging from skywalking.loggings import logger import traceback -from queue import Queue, Empty +from queue import Queue import grpc @@ -70,10 +70,7 @@ def on_error(self): def report(self, queue: Queue, block: bool = True): def generator(): while True: - try: - segment = queue.get(block=block, timeout=0.5) # type: Segment - except Empty: - break + segment = queue.get(block=block) # type: Segment logger.debug('reporting segment %s', segment) diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py index 9a0a1256..bf0dcc8d 100644 --- a/skywalking/agent/protocol/http.py +++ b/skywalking/agent/protocol/http.py @@ -16,7 +16,7 @@ # from skywalking.loggings import logger -from queue import Queue, Empty +from queue import Queue from skywalking.agent import Protocol from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService @@ -41,10 +41,7 @@ def connected(self): def report(self, queue: Queue, block: bool = True): def generator(): while True: - try: - segment = queue.get(block=block, timeout=0.5) # type: Segment - except Empty: - break + segment = queue.get(block=block) # type: Segment logger.debug('reporting segment %s', segment) diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py index 35342ccb..3acd7a9b 100644 --- a/skywalking/agent/protocol/kafka.py +++ b/skywalking/agent/protocol/kafka.py @@ -17,7 +17,7 @@ import logging from skywalking.loggings import logger, getLogger -from queue import Queue, Empty +from queue import Queue from skywalking import config from skywalking.agent import Protocol @@ -45,10 +45,7 @@ def heartbeat(self): def report(self, queue: Queue, block: bool = True): def generator(): while True: - try: - segment = queue.get(block=block, timeout=0.5) # type: Segment - except Empty: - break + segment = queue.get(block=block) # type: Segment logger.debug('reporting segment %s', segment) From fe826d156c091b9afd9289a26bdc608750a79df0 Mon Sep 17 00:00:00 2001 From: Zhenxu Ke Date: Fri, 11 Dec 2020 11:36:36 +0800 Subject: [PATCH 3/6] Update kafka.py --- skywalking/agent/protocol/kafka.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py index 3acd7a9b..8e6e75af 100644 --- a/skywalking/agent/protocol/kafka.py +++ b/skywalking/agent/protocol/kafka.py @@ -91,5 +91,3 @@ def generator(): queue.task_done() self.traces_reporter.report(generator()) - - return True From d5ca6f9cbfc54d7772bf00bbbb0e93776393968a Mon Sep 17 00:00:00 2001 From: Zhenxu Ke Date: Fri, 11 Dec 2020 11:36:56 +0800 Subject: [PATCH 4/6] Update http.py --- skywalking/agent/protocol/http.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py index bf0dcc8d..331f71a9 100644 --- a/skywalking/agent/protocol/http.py +++ b/skywalking/agent/protocol/http.py @@ -50,5 +50,3 @@ def generator(): queue.task_done() self.traces_reporter.report(generator=generator()) - - return True From f19407176148bd35d110054b19008159d1f4fba6 Mon Sep 17 00:00:00 2001 From: Zhenxu Ke Date: Fri, 11 Dec 2020 11:37:19 +0800 Subject: [PATCH 5/6] Update grpc.py --- skywalking/agent/protocol/grpc.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 024097d3..f5468a44 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -117,8 +117,5 @@ def generator(): try: self.traces_reporter.report(generator()) - - return True - except grpc.RpcError: self.on_error() From eda7288e624127c44153c3a020d42f69dc483101 Mon Sep 17 00:00:00 2001 From: Zhenxu Ke Date: Fri, 11 Dec 2020 11:37:56 +0800 Subject: [PATCH 6/6] Update __init__.py --- skywalking/agent/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index ac2b18f7..e8544dc2 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -39,8 +39,7 @@ def __heartbeat(): def __report(): while not __finished.is_set(): if connected(): - while __protocol.report(__queue): # blocking but has timeout - pass + __protocol.report(__queue) # is blocking actually __finished.wait(1)