Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/LogReporter.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Python agent gRPC log reporter
# Python agent log reporter

This functionality reports logs collected from the Python logging module(in theory, also logging libraries depending on the core logging module).

Expand All @@ -13,6 +13,8 @@ config.init(collector_address='127.0.0.1:11800', service_name='your awesome serv
agent.start()
```

Note, if chosen `HTTP` protocol instead of `gRPC`/`Kafka`, the logs will be batch-reported to the collector REST endpoint.

`log_reporter_active=True` - Enables the log reporter.

`log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped.
Expand Down
29 changes: 27 additions & 2 deletions skywalking/agent/protocol/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# limitations under the License.
#

from skywalking.loggings import logger
from queue import Queue, Empty
from time import time

from skywalking import config
from skywalking.agent import Protocol
from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService
from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService, HttpLogDataReportService
from skywalking.loggings import logger
from skywalking.protocol.logging.Logging_pb2 import LogData
from skywalking.trace.segment import Segment


Expand All @@ -30,6 +31,7 @@ def __init__(self):
self.properties_sent = False
self.service_management = HttpServiceManagementClient()
self.traces_reporter = HttpTraceSegmentReportService()
self.log_reporter = HttpLogDataReportService()

def fork_after_in_child(self):
self.service_management.fork_after_in_child()
Expand Down Expand Up @@ -64,3 +66,26 @@ def generator():
self.traces_reporter.report(generator=generator())
except Exception:
pass

def report_log(self, queue: Queue, block: bool = True):
start = time()

def generator():
while True:
try:
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
if timeout <= 0:
return
log_data = queue.get(block=block, timeout=timeout) # type: LogData
except Empty:
return
queue.task_done()

logger.debug('Reporting Log')

yield log_data

try:
self.log_reporter.report(generator=generator())
except Exception:
pass
22 changes: 20 additions & 2 deletions skywalking/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

from skywalking.loggings import logger
import json

import requests
from google.protobuf import json_format

from skywalking import config
from skywalking.client import ServiceManagementClient, TraceSegmentReportService
from skywalking.loggings import logger


class HttpServiceManagementClient(ServiceManagementClient):
Expand Down Expand Up @@ -109,3 +110,20 @@ def report(self, generator):
} for span in segment.spans]
})
logger.debug('report traces response: %s', res)


class HttpLogDataReportService(TraceSegmentReportService):
def __init__(self):
proto = 'https://' if config.force_tls else 'http://'
self.url_report = proto + config.collector_address.rstrip('/') + '/v3/logs'
self.session = requests.Session()

def fork_after_in_child(self):
self.session.close()
self.session = requests.Session()

def report(self, generator):
log_batch = [json.loads(json_format.MessageToJson(log_data)) for log_data in generator]
if log_batch: # prevent empty batches
res = self.session.post(self.url_report, json=log_batch)
logger.debug('report batch log response: %s', res)