From 6fcb29fd6966037b41cd5051228f5710677b47b4 Mon Sep 17 00:00:00 2001 From: YihaoChen Date: Tue, 10 Aug 2021 21:37:36 +0800 Subject: [PATCH 1/2] WIP Try support HTTP protocol Signed-off-by: YihaoChen --- skywalking/agent/protocol/http.py | 29 +++++++++++++++++++++++++++-- skywalking/client/http.py | 22 ++++++++++++++++++++-- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py index f5b1147d..2ca8bac4 100644 --- a/skywalking/agent/protocol/http.py +++ b/skywalking/agent/protocol/http.py @@ -15,13 +15,14 @@ # limitations under the License. # -from skywalking.loggings import logger from queue import Queue, Empty from time import time from skywalking import config from skywalking.agent import Protocol -from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService +from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService, HttpLogDataReportService +from skywalking.loggings import logger +from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.trace.segment import Segment @@ -30,6 +31,7 @@ def __init__(self): self.properties_sent = False self.service_management = HttpServiceManagementClient() self.traces_reporter = HttpTraceSegmentReportService() + self.log_reporter = HttpLogDataReportService() def fork_after_in_child(self): self.service_management.fork_after_in_child() @@ -64,3 +66,26 @@ def generator(): self.traces_reporter.report(generator=generator()) except Exception: pass + + def report_log(self, queue: Queue, block: bool = True): + start = time() + + def generator(): + while True: + try: + timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int + if timeout <= 0: + return + log_data = queue.get(block=block, timeout=timeout) # type: LogData + except Empty: + return + queue.task_done() + + logger.debug('Reporting Log') + + yield log_data + + try: + self.log_reporter.report(generator=generator()) + except Exception: + pass diff --git a/skywalking/client/http.py b/skywalking/client/http.py index 7408fedc..a2664e88 100644 --- a/skywalking/client/http.py +++ b/skywalking/client/http.py @@ -14,13 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -from skywalking.loggings import logger +import json import requests +from google.protobuf import json_format from skywalking import config from skywalking.client import ServiceManagementClient, TraceSegmentReportService +from skywalking.loggings import logger class HttpServiceManagementClient(ServiceManagementClient): @@ -109,3 +110,20 @@ def report(self, generator): } for span in segment.spans] }) logger.debug('report traces response: %s', res) + + +class HttpLogDataReportService(TraceSegmentReportService): + def __init__(self): + proto = 'https://' if config.force_tls else 'http://' + self.url_report = proto + config.collector_address.rstrip('/') + '/v3/logs' + self.session = requests.Session() + + def fork_after_in_child(self): + self.session.close() + self.session = requests.Session() + + def report(self, generator): + for log_data in generator: + json_string = json_format.MessageToJson(log_data) + res = self.session.post(self.url_report, json=[json.loads(json_string)]) + logger.debug('report log response: %s', res) From 1b7fabd3fecb2d45191f37866a04b0b61ad10809 Mon Sep 17 00:00:00 2001 From: YihaoChen Date: Thu, 12 Aug 2021 13:40:00 +0800 Subject: [PATCH 2/2] Support batch reporting Signed-off-by: YihaoChen --- docs/LogReporter.md | 4 +++- skywalking/client/http.py | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/LogReporter.md b/docs/LogReporter.md index 9fc0b979..e4774ed9 100644 --- a/docs/LogReporter.md +++ b/docs/LogReporter.md @@ -1,4 +1,4 @@ -# Python agent gRPC log reporter +# Python agent log reporter This functionality reports logs collected from the Python logging module(in theory, also logging libraries depending on the core logging module). @@ -13,6 +13,8 @@ config.init(collector_address='127.0.0.1:11800', service_name='your awesome serv agent.start() ``` +Note, if chosen `HTTP` protocol instead of `gRPC`/`Kafka`, the logs will be batch-reported to the collector REST endpoint. + `log_reporter_active=True` - Enables the log reporter. `log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. diff --git a/skywalking/client/http.py b/skywalking/client/http.py index a2664e88..c6e26d68 100644 --- a/skywalking/client/http.py +++ b/skywalking/client/http.py @@ -123,7 +123,7 @@ def fork_after_in_child(self): self.session = requests.Session() def report(self, generator): - for log_data in generator: - json_string = json_format.MessageToJson(log_data) - res = self.session.post(self.url_report, json=[json.loads(json_string)]) - logger.debug('report log response: %s', res) + log_batch = [json.loads(json_format.MessageToJson(log_data)) for log_data in generator] + if log_batch: # prevent empty batches + res = self.session.post(self.url_report, json=log_batch) + logger.debug('report batch log response: %s', res)