From 1ae81b9f32d5c339a76c09a0f0cbe7181347722d Mon Sep 17 00:00:00 2001 From: Tomasz Pytel Date: Fri, 11 Dec 2020 11:03:06 -0300 Subject: [PATCH] Fix: queue.get(block=False) can raise Empty --- skywalking/agent/protocol/grpc.py | 7 +++++-- skywalking/agent/protocol/http.py | 7 +++++-- skywalking/agent/protocol/kafka.py | 7 +++++-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index f5468a44..42070702 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -18,7 +18,7 @@ import logging from skywalking.loggings import logger import traceback -from queue import Queue +from queue import Queue, Empty import grpc @@ -70,7 +70,10 @@ def on_error(self): def report(self, queue: Queue, block: bool = True): def generator(): while True: - segment = queue.get(block=block) # type: Segment + try: + segment = queue.get(block=block) # type: Segment + except Empty: + return logger.debug('reporting segment %s', segment) diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py index 331f71a9..89d43bfb 100644 --- a/skywalking/agent/protocol/http.py +++ b/skywalking/agent/protocol/http.py @@ -16,7 +16,7 @@ # from skywalking.loggings import logger -from queue import Queue +from queue import Queue, Empty from skywalking.agent import Protocol from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService @@ -41,7 +41,10 @@ def connected(self): def report(self, queue: Queue, block: bool = True): def generator(): while True: - segment = queue.get(block=block) # type: Segment + try: + segment = queue.get(block=block) # type: Segment + except Empty: + return logger.debug('reporting segment %s', segment) diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py index 8e6e75af..3eabfdbb 100644 --- a/skywalking/agent/protocol/kafka.py +++ b/skywalking/agent/protocol/kafka.py @@ -17,7 +17,7 @@ import logging from skywalking.loggings import logger, getLogger -from queue import Queue +from queue import Queue, Empty from skywalking import config from skywalking.agent import Protocol @@ -45,7 +45,10 @@ def heartbeat(self): def report(self, queue: Queue, block: bool = True): def generator(): while True: - segment = queue.get(block=block) # type: Segment + try: + segment = queue.get(block=block) # type: Segment + except Empty: + return logger.debug('reporting segment %s', segment)