diff --git a/docs/LogReporter.md b/docs/LogReporter.md index 9fc0b979..e4774ed9 100644 --- a/docs/LogReporter.md +++ b/docs/LogReporter.md @@ -1,4 +1,4 @@ -# Python agent gRPC log reporter +# Python agent log reporter This functionality reports logs collected from the Python logging module(in theory, also logging libraries depending on the core logging module). @@ -13,6 +13,8 @@ config.init(collector_address='127.0.0.1:11800', service_name='your awesome serv agent.start() ``` +Note, if chosen `HTTP` protocol instead of `gRPC`/`Kafka`, the logs will be batch-reported to the collector REST endpoint. + `log_reporter_active=True` - Enables the log reporter. `log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py index f5b1147d..2ca8bac4 100644 --- a/skywalking/agent/protocol/http.py +++ b/skywalking/agent/protocol/http.py @@ -15,13 +15,14 @@ # limitations under the License. # -from skywalking.loggings import logger from queue import Queue, Empty from time import time from skywalking import config from skywalking.agent import Protocol -from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService +from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService, HttpLogDataReportService +from skywalking.loggings import logger +from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.trace.segment import Segment @@ -30,6 +31,7 @@ def __init__(self): self.properties_sent = False self.service_management = HttpServiceManagementClient() self.traces_reporter = HttpTraceSegmentReportService() + self.log_reporter = HttpLogDataReportService() def fork_after_in_child(self): self.service_management.fork_after_in_child() @@ -64,3 +66,26 @@ def generator(): self.traces_reporter.report(generator=generator()) except Exception: pass + + def report_log(self, queue: Queue, block: bool = True): + start = time() + + def generator(): + while True: + try: + timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int + if timeout <= 0: + return + log_data = queue.get(block=block, timeout=timeout) # type: LogData + except Empty: + return + queue.task_done() + + logger.debug('Reporting Log') + + yield log_data + + try: + self.log_reporter.report(generator=generator()) + except Exception: + pass diff --git a/skywalking/client/http.py b/skywalking/client/http.py index 7408fedc..c6e26d68 100644 --- a/skywalking/client/http.py +++ b/skywalking/client/http.py @@ -14,13 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -from skywalking.loggings import logger +import json import requests +from google.protobuf import json_format from skywalking import config from skywalking.client import ServiceManagementClient, TraceSegmentReportService +from skywalking.loggings import logger class HttpServiceManagementClient(ServiceManagementClient): @@ -109,3 +110,20 @@ def report(self, generator): } for span in segment.spans] }) logger.debug('report traces response: %s', res) + + +class HttpLogDataReportService(TraceSegmentReportService): + def __init__(self): + proto = 'https://' if config.force_tls else 'http://' + self.url_report = proto + config.collector_address.rstrip('/') + '/v3/logs' + self.session = requests.Session() + + def fork_after_in_child(self): + self.session.close() + self.session = requests.Session() + + def report(self, generator): + log_batch = [json.loads(json_format.MessageToJson(log_data)) for log_data in generator] + if log_batch: # prevent empty batches + res = self.session.post(self.url_report, json=log_batch) + logger.debug('report batch log response: %s', res)