From b159534d5f190591eba2413d54479897bd9ac121 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Fri, 10 Feb 2023 19:13:29 +0000 Subject: [PATCH 01/26] Refactoring --- CHANGELOG.md | 1 + docs/en/setup/Configuration.md | 2 +- docs/en/setup/faq/How-to-use-with-uwsgi.md | 4 +- skywalking/agent/__init__.py | 537 +++++++++++-------- skywalking/bootstrap/loader/sitecustomize.py | 2 +- skywalking/config.py | 6 +- skywalking/log/sw_logging.py | 3 +- skywalking/loggings.py | 2 +- skywalking/meter/__init__.py | 8 +- skywalking/meter/meter.py | 11 +- skywalking/meter/meter_service.py | 9 +- skywalking/plugins/sw_loguru.py | 3 +- skywalking/profile/profile_context.py | 2 +- skywalking/profile/profile_service.py | 4 +- skywalking/trace/context.py | 8 +- skywalking/utils/singleton.py | 35 ++ tests/unit/test_meter.py | 5 +- 17 files changed, 375 insertions(+), 267 deletions(-) create mode 100644 skywalking/utils/singleton.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 35106d69..216c3396 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - Add support for the tags of Virtual Cache for Redis (#263) - Add a new configuration `kafka_namespace` to prefix the kafka topic names (#277) - Add log reporter support for loguru (#276) + - Add **experimental** support for explicit os.fork(), restarts agent in new process (#278) - Plugins: - Add aioredis, aiormq, amqp, asyncpg, aio-pika, kombu RMQ plugins (#230 Missing test coverage) diff --git a/docs/en/setup/Configuration.md b/docs/en/setup/Configuration.md index d9f28d95..2e0ec2cb 100644 --- a/docs/en/setup/Configuration.md +++ b/docs/en/setup/Configuration.md @@ -38,7 +38,7 @@ export SW_AGENT_YourConfiguration=YourValue | heartbeat_period | SW_AGENT_HEARTBEAT_PERIOD | | 30 | The agent will exchange heartbeat message with SkyWalking OAP backend every `period` seconds | | collector_properties_report_period_factor | SW_AGENT_COLLECTOR_PROPERTIES_REPORT_PERIOD_FACTOR | | 10 | The agent will report service instance properties every `factor * heartbeat period` seconds default: 10*30 = 300 seconds | | instance_properties_json | SW_AGENT_INSTANCE_PROPERTIES_JSON | | | A custom JSON string to be reported as service instance properties, e.g. `{"key": "value"}` | -| experimental_fork_support | SW_AGENT_EXPERIMENTAL_FORK_SUPPORT | | False | The agent will try to restart itself in any os.fork()-ed child process. Important Note: it's not suitable for short-lived processes as each one will introduce overhead and create a new instance in SkyWalking dashboard in format of `service_instance-child-` (TODO) | +| experimental_fork_support | SW_AGENT_EXPERIMENTAL_FORK_SUPPORT | | False | **Experimental** The agent will try to restart itself in any os.fork()-ed child process. Important Note: it's not suitable for short-lived processes as each one will introduce overhead and create a new instance in SkyWalking dashboard in format of `service_instance-child-` | | queue_timeout | SW_AGENT_QUEUE_TIMEOUT | | 1 | DANGEROUS - This option controls the interval of each bulk report from telemetry data queues Do not modify unless you have evaluated its impact given your service load. | ### SW_PYTHON Auto Instrumentation CLI | Configuration | Environment Variable | Type | Default Value | Description | diff --git a/docs/en/setup/faq/How-to-use-with-uwsgi.md b/docs/en/setup/faq/How-to-use-with-uwsgi.md index f051c97a..48c596e8 100644 --- a/docs/en/setup/faq/How-to-use-with-uwsgi.md +++ b/docs/en/setup/faq/How-to-use-with-uwsgi.md @@ -13,10 +13,12 @@ The following is an example of the use of uWSGI and flask, the initialization pa ```python # main.py from uwsgidecorators import postfork -from skywalking import agent, config @postfork def init_tracing(): + # Note: it's highly recommended to only import skywalking modules within the @postfork method, + # as gRPC/protobuf dependency import actions may not work properly unless being imported exactly once in each new process + from skywalking import agent, config config.init(collector_address='127.0.0.1:11800', service_name='your awesome service') agent.start() diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 9d61487b..c2f69875 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -16,9 +16,10 @@ # import atexit +import os from queue import Queue, Full from threading import Thread, Event -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from skywalking import config, plugins from skywalking import loggings @@ -31,255 +32,315 @@ from skywalking.profile.snapshot import TracingThreadSnapshot from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.protocol.language_agent.Meter_pb2 import MeterData +from skywalking.utils.singleton import Singleton +import functools if TYPE_CHECKING: from skywalking.trace.context import Segment -__started = False -__protocol = None # type: Protocol -__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \ - = __send_profile_thread = __queue = __log_queue = __snapshot_queue = __meter_queue = __finished = None - -def __heartbeat(): - wait = base = 30 - - while not __finished.is_set(): - try: - __protocol.heartbeat() - wait = base # reset to base wait time on success - except Exception as exc: - logger.error(str(exc)) - wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum - - __finished.wait(wait) - - -def __report(): - wait = base = 0 - - while not __finished.is_set(): +def report_with_backoff(init_wait): + """ + An exponential backoff for retrying reporters. + """ + + def backoff_decorator(func): + @functools.wraps(func) + def backoff_wrapper(self, *args, **kwargs): + wait = base = init_wait + while not self._finished.is_set(): + try: + func(self, *args, **kwargs) + wait = base # reset to base wait time on success + except Exception: # noqa + wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum + logger.exception(f'Exception in reporter in pid {os.getpid()}, retry in {wait} seconds') + + self._finished.wait(wait) + logger.info('finished reporter thread') + + return backoff_wrapper + + return backoff_decorator + + +class SkyWalkingAgent(Singleton): + """ + The main singleton class and entrypoint of SkyWalking Python Agent. + Upon fork(), original instance rebuild everything (queues, threads, instrumentation) by + calling the fork handlers in the class instance. + """ + __started: bool = False # shared by all instances + + def __init__(self): + """ + Protocol is one of gRPC, HTTP and Kafka that + provides clients to reporters to communicate with OAP backend. + """ + self.started_pid = None + self.__protocol: Optional[Protocol] = None + self._finished: Optional[Event] = None + + def __bootstrap(self): + # when forking, already instrumented modules must not be instrumented again + # otherwise it will cause double instrumentation! (we should provide an un-instrument method) + if config.protocol == 'grpc': + from skywalking.agent.protocol.grpc import GrpcProtocol + self.__protocol = GrpcProtocol() + elif config.protocol == 'http': + from skywalking.agent.protocol.http import HttpProtocol + self.__protocol = HttpProtocol() + elif config.protocol == 'kafka': + from skywalking.agent.protocol.kafka import KafkaProtocol + self.__protocol = KafkaProtocol() + + # Initialize queues for segment, log, meter and profiling snapshots + self.__segment_queue: Optional[Queue] = None + self.__log_queue: Optional[Queue] = None + self.__meter_queue: Optional[Queue] = None + self.__snapshot_queue: Optional[Queue] = None + + # Start reporter threads and register queues + self.__init_threading() + + def __init_threading(self) -> None: + """ + This method initializes all the queues and threads for the agent and reporters. + Upon os.fork(), callback will reinitialize threads and queues by calling this method + + Heartbeat thread is started by default. + Segment reporter thread and segment queue is created by default. + All other queues and threads depends on user configuration. + """ + self._finished = Event() + + __heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True) + __heartbeat_thread.start() + + self.__segment_queue = Queue(maxsize=config.trace_reporter_max_buffer_size) + __segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True) + __segment_report_thread.start() + + if config.meter_reporter_active: + self.__meter_queue = Queue(maxsize=config.meter_reporter_max_buffer_size) + __meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True) + __meter_report_thread.start() + + if config.pvm_meter_reporter_active: + from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource + from skywalking.meter.pvm.gc_data import GCDataSource + from skywalking.meter.pvm.mem_usage import MEMUsageDataSource + from skywalking.meter.pvm.thread_data import ThreadDataSource + + MEMUsageDataSource().register() + CPUUsageDataSource().register() + GCDataSource().register() + ThreadDataSource().register() + + if config.log_reporter_active: + self.__log_queue = Queue(maxsize=config.log_reporter_max_buffer_size) + __log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True) + __log_report_thread.start() + + if config.profiler_active: + # Now only profiler receives commands from OAP + __command_dispatch_thread = Thread(name='CommandDispatchThread', target=self.__command_dispatch, + daemon=True) + __command_dispatch_thread.start() + + self.__snapshot_queue = Queue(maxsize=config.profile_snapshot_transport_buffer_size) + + __query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command, + daemon=True) + __query_profile_thread.start() + + __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=self.__send_profile_snapshot, + daemon=True) + __send_profile_thread.start() + + def __fork_before(self) -> None: + """ + This handles explicit fork() calls. The child process will not have a running thread, so we need to + revive all of them. The parent process will continue to run as normal. + + This does not affect pre-forking server support, which are handled separately. + """ + # possible deadlock would be introduced if some queue is in use when fork() is called and + # therefore child process will inherit a locked queue. To avoid this and have side benefit + # of a clean queue in child process (prevent duplicated reporting), we simply restart the agent and + # reinitialize all queues and threads. + logger.warning('SkyWalking Python agent fork support is currently experimental, ' + 'please report issues if you encounter any.') + + def __fork_after_in_parent(self) -> None: + """ + Something to do after fork() in parent process + """ + ... + + def __fork_after_in_child(self) -> None: + """ + Simply restart the agent after we detect a fork() call + """ + self.start() + + def start(self) -> None: + """ + Start would be called by user or os.register_at_fork() callback + Start will proceed if and only if the agent is not started in the + current process. + + When os.fork(), the service instance should be changed to a new one by appending pid. + """ + # export grpc fork support env + # This is required for grpcio to work with fork() + # https://github.com/grpc/grpc/blob/master/doc/fork_support.md + if config.protocol == 'grpc': + os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' + os.environ['GRPC_POLL_STRATEGY'] = 'poll' + + if not self.__started: + # if not already started, start the agent + self.__started = True + # Install logging plugins + # TODO - Add support for printing traceID/ context in logs + if config.log_reporter_active: + from skywalking import log + log.install() + # Here we install all other lib plugins on first time start (parent process) + plugins.install() + elif self.__started and os.getpid() == self.started_pid: + # if already started, and this is the same process, raise an error + raise RuntimeError('SkyWalking Python agent has already been started in this process') + else: + # otherwise we assume a fork() happened, give it a new service instance name + logger.info('New process detected, re-initializing SkyWalking Python agent') + # Note: this is for experimental change, default config should never reach here + # Fork support is controlled by config.agent_fork_support :default: False + # Important: This does not impact pre-forking server support (uwsgi, gunicorn, etc...) + # This is only for explicit long-running fork() calls. + config.service_instance = f'{config.service_instance}-child-{os.getpid()}' + + self.started_pid = os.getpid() + + flag = False try: - __protocol.report_segment(__queue) # is blocking actually, blocks for max config.queue_timeout seconds - wait = base - except Exception as exc: - logger.error(str(exc)) - wait = min(60, wait * 2 or 1) - - __finished.wait(wait) - - -def __report_log(): - wait = base = 0 - - while not __finished.is_set(): + from gevent import monkey + flag = monkey.is_module_patched('socket') + except ModuleNotFoundError: + logger.debug("it was found that no gevent was used, if you don't use, please ignore.") + if flag: + import grpc.experimental.gevent as grpc_gevent + grpc_gevent.init_gevent() + + loggings.init() + config.finalize() + profile.init() + meter.init(force=True) # force re-init after fork() + + self.__bootstrap() # calls init_threading + + atexit.register(self.__fini) + + if config.experimental_fork_support: + if hasattr(os, 'register_at_fork'): + os.register_at_fork(before=self.__fork_before, after_in_parent=self.__fork_after_in_parent, + after_in_child=self.__fork_after_in_child) + + def __fini(self): + """ + This method is called when the agent is shutting down. + Clean up all the queues and threads. + """ + self.__protocol.report_segment(self.__segment_queue, False) + self.__segment_queue.join() + + if config.log_reporter_active: + self.__protocol.report_log(self.__log_queue, False) + self.__log_queue.join() + + if config.profiler_active: + self.__protocol.report_snapshot(self.__snapshot_queue, False) + self.__snapshot_queue.join() + + if config.meter_reporter_active: + self.__protocol.report_meter(self.__meter_queue, False) + self.__meter_queue.join() + + self._finished.set() + + def stop(self): + atexit.unregister(self.__fini) + self.__fini() + self.__started = False + + @report_with_backoff(init_wait=config.heartbeat_period) + def __heartbeat(self): + self.__protocol.heartbeat() + + @report_with_backoff(init_wait=0) + def __report_segment(self): + if not self.__segment_queue.empty(): + self.__protocol.report_segment(self.__segment_queue) + + @report_with_backoff(init_wait=0) + def __report_log(self): + if not self.__log_queue.empty(): + self.__protocol.report_log(self.__log_queue) + + @report_with_backoff(init_wait=config.meter_reporter_period) + def __report_meter(self): + if not self.__meter_queue.empty(): + self.__protocol.report_meter(self.__meter_queue) + + @report_with_backoff(init_wait=0.5) + def __send_profile_snapshot(self): + if not self.__snapshot_queue.empty(): + self.__protocol.report_snapshot(self.__snapshot_queue) + + @report_with_backoff(init_wait=config.get_profile_task_interval) + def __query_profile_command(self): + self.__protocol.query_profile_commands() + + @staticmethod + def __command_dispatch(): + # command dispatch will stuck when there are no commands + command_service.dispatch() + + def is_segment_queue_full(self): + return self.__segment_queue.full() + + def archive_segment(self, segment: 'Segment'): + try: # unlike checking __queue.full() then inserting, this is atomic + self.__segment_queue.put(segment, block=False) + except Full: + logger.warning('the queue is full, the segment will be abandoned') + + def archive_log(self, log_data: 'LogData'): try: - __protocol.report_log(__log_queue) - wait = base - except Exception as exc: - logger.error(str(exc)) - wait = min(60, wait * 2 or 1) - - __finished.wait(wait) + self.__log_queue.put(log_data, block=False) + except Full: + logger.warning('the queue is full, the log will be abandoned') - -def __send_profile_snapshot(): - wait = base = 0.5 - - while not __finished.is_set(): + def archive_meter(self, meter_data: 'MeterData'): try: - __protocol.report_snapshot(__snapshot_queue) - wait = base - except Exception as exc: - logger.error(str(exc)) - wait = min(60, wait * 2 or 1) - - __finished.wait(wait) - + self.__meter_queue.put(meter_data, block=False) + except Full: + logger.warning('the queue is full, the meter will be abandoned') -def __query_profile_command(): - wait = base = config.get_profile_task_interval - - while not __finished.is_set(): + def add_profiling_snapshot(self, snapshot: TracingThreadSnapshot): try: - __protocol.query_profile_commands() - wait = base - except Exception as exc: - logger.error(str(exc)) - wait = min(60, wait * 2 or 1) - - __finished.wait(wait) - - -def __report_meter(): - wait = base = 1 + self.__snapshot_queue.put(snapshot) + except Full: + logger.warning('the snapshot queue is full, the snapshot will be abandoned') - while not __finished.is_set(): + def notify_profile_finish(self, task: ProfileTask): try: - __protocol.report_meter(__meter_queue) # is blocking actually, blocks for max config.queue_timeout seconds - wait = base - except Exception as exc: - logger.error(str(exc)) - wait = min(60, wait * 2 or 1) - - __finished.wait(wait) - - -def __command_dispatch(): - # command dispatch will stuck when there are no commands - command_service.dispatch() - - -def __init_threading(): - global __heartbeat_thread, __report_thread, __log_report_thread, __query_profile_thread, \ - __command_dispatch_thread, __send_profile_thread, __queue, __log_queue, __snapshot_queue, __meter_queue, __finished - - __queue = Queue(maxsize=config.trace_reporter_max_buffer_size) - __finished = Event() - __heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True) - __report_thread = Thread(name='ReportThread', target=__report, daemon=True) - __command_dispatch_thread = Thread(name='CommandDispatchThread', target=__command_dispatch, daemon=True) - - __heartbeat_thread.start() - __report_thread.start() - __command_dispatch_thread.start() - - if config.meter_reporter_active: - __meter_queue = Queue(maxsize=config.meter_reporter_max_buffer_size) - __meter_report_thread = Thread(name='MeterReportThread', target=__report_meter, daemon=True) - __meter_report_thread.start() - - if config.pvm_meter_reporter_active: - from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource - from skywalking.meter.pvm.gc_data import GCDataSource - from skywalking.meter.pvm.mem_usage import MEMUsageDataSource - from skywalking.meter.pvm.thread_data import ThreadDataSource - - MEMUsageDataSource().registry() - CPUUsageDataSource().registry() - GCDataSource().registry() - ThreadDataSource().registry() - - - if config.log_reporter_active: - __log_queue = Queue(maxsize=config.log_reporter_max_buffer_size) - __log_report_thread = Thread(name='LogReportThread', target=__report_log, daemon=True) - __log_report_thread.start() - - if config.profiler_active: - __snapshot_queue = Queue(maxsize=config.profile_snapshot_transport_buffer_size) - - __query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True) - __query_profile_thread.start() - - __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=__send_profile_snapshot, daemon=True) - __send_profile_thread.start() - - -def __init(): - global __protocol - if config.protocol == 'grpc': - from skywalking.agent.protocol.grpc import GrpcProtocol - __protocol = GrpcProtocol() - elif config.protocol == 'http': - from skywalking.agent.protocol.http import HttpProtocol - __protocol = HttpProtocol() - elif config.protocol == 'kafka': - from skywalking.agent.protocol.kafka import KafkaProtocol - __protocol = KafkaProtocol() - - plugins.install() - if config.log_reporter_active: # todo - Add support for printing traceID/ context in logs - from skywalking import log - log.install() - - __init_threading() - - -def __fini(): - __protocol.report_segment(__queue, False) - __queue.join() - - if config.log_reporter_active: - __protocol.report_log(__log_queue, False) - __log_queue.join() - - if config.profiler_active: - __protocol.report_snapshot(__snapshot_queue, False) - __snapshot_queue.join() - - __finished.set() - - -def start(): - global __started - if __started: - return - __started = True - - flag = False - try: - from gevent import monkey - flag = monkey.is_module_patched('socket') - except ModuleNotFoundError: - logger.debug("it was found that no gevent was used, if you don't use, please ignore.") - if flag: - import grpc.experimental.gevent as grpc_gevent - grpc_gevent.init_gevent() - - loggings.init() - config.finalize() - profile.init() - meter.init() - - __init() - - atexit.register(__fini) - - -def stop(): - atexit.unregister(__fini) - __fini() - - -def started(): - return __started - - -def isfull(): - return __queue.full() - - -def archive(segment: 'Segment'): - try: # unlike checking __queue.full() then inserting, this is atomic - __queue.put(segment, block=False) - except Full: - logger.warning('the queue is full, the segment will be abandoned') - - -def archive_log(log_data: 'LogData'): - try: - __log_queue.put(log_data, block=False) - except Full: - logger.warning('the queue is full, the log will be abandoned') - - -def archive_meter(meterdata: 'MeterData'): - try: - __meter_queue.put(meterdata, block=False) - except Full: - logger.warning('the queue is full, the meter will be abandoned') - - -def add_profiling_snapshot(snapshot: TracingThreadSnapshot): - try: - __snapshot_queue.put(snapshot) - except Full: - logger.warning('the snapshot queue is full, the snapshot will be abandoned') + self.__protocol.notify_profile_task_finish(task) + except Exception as e: + logger.error(f'notify profile task finish to backend fail. {str(e)}') -def notify_profile_finish(task: ProfileTask): - try: - __protocol.notify_profile_task_finish(task) - except Exception as e: - logger.error(f'notify profile task finish to backend fail. {str(e)}') +# Export for user (backwards compatibility) +agent = SkyWalkingAgent() +start = agent.start diff --git a/skywalking/bootstrap/loader/sitecustomize.py b/skywalking/bootstrap/loader/sitecustomize.py index 46f0772e..620f87dd 100644 --- a/skywalking/bootstrap/loader/sitecustomize.py +++ b/skywalking/bootstrap/loader/sitecustomize.py @@ -41,7 +41,7 @@ def _get_sw_loader_logger(): from logging import getLogger logger = getLogger('skywalking-loader') ch = logging.StreamHandler() - formatter = logging.Formatter('%(name)s [%(threadName)s] [%(levelname)s] %(message)s') + formatter = logging.Formatter('%(name)s [pid:%(process)d] [%(threadName)s] [%(levelname)s] %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) logger.propagate = False diff --git a/skywalking/config.py b/skywalking/config.py index 4cc97e02..7b56af78 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -91,9 +91,9 @@ collector_properties_report_period_factor = int(os.getenv('SW_AGENT_COLLECTOR_PROPERTIES_REPORT_PERIOD_FACTOR', '10')) # A custom JSON string to be reported as service instance properties, e.g. `{"key": "value"}` instance_properties_json: str = os.getenv('SW_INSTANCE_PROPERTIES_JSON', '') -# The agent will try to restart itself in any os.fork()-ed child process. Important Note: it's not suitable for -# short-lived processes as each one will introduce overhead and create a new instance in SkyWalking dashboard -# in format of `service_instance-child-` (TODO) +# **Experimental** The agent will try to restart itself in any os.fork()-ed child process. +# Important Note: it's not suitable for short-lived processes as each one will introduce overhead +# and create a new instance in SkyWalking dashboard in format of `service_instance-child-` experimental_fork_support: bool = os.getenv('SW_AGENT_EXPERIMENTAL_FORK_SUPPORT', '').lower() == 'true' # DANGEROUS - This option controls the interval of each bulk report from telemetry data queues # Do not modify unless you have evaluated its impact given your service load. diff --git a/skywalking/log/sw_logging.py b/skywalking/log/sw_logging.py index 9564366b..b8bb613d 100644 --- a/skywalking/log/sw_logging.py +++ b/skywalking/log/sw_logging.py @@ -17,7 +17,8 @@ import logging -from skywalking import config, agent +from skywalking import config +from skywalking.agent import agent from skywalking.protocol.common.Common_pb2 import KeyStringValuePair from skywalking.protocol.logging.Logging_pb2 import LogData, LogDataBody, TraceContext, LogTags, TextLog from skywalking.trace.context import get_context diff --git a/skywalking/loggings.py b/skywalking/loggings.py index 93e1bdcf..f9d472ad 100644 --- a/skywalking/loggings.py +++ b/skywalking/loggings.py @@ -25,7 +25,7 @@ def getLogger(name=None): # noqa logger = logging.getLogger(name) ch = logging.StreamHandler() - formatter = logging.Formatter('%(name)s [%(threadName)s] [%(levelname)s] %(message)s') + formatter = logging.Formatter('%(name)s [pid:%(process)d] [%(threadName)s] [%(levelname)s] %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) logger.propagate = False diff --git a/skywalking/meter/__init__.py b/skywalking/meter/__init__.py index eacfe5ff..413f7395 100644 --- a/skywalking/meter/__init__.py +++ b/skywalking/meter/__init__.py @@ -18,11 +18,15 @@ _meter_service = None -def init(): +def init(force: bool = False): + """ + If the meter service is not initialized, initialize it. + if force, we are in a fork(), we force re-initialization + """ from skywalking.meter.meter_service import MeterService global _meter_service - if _meter_service: + if _meter_service and not force: return _meter_service = MeterService() diff --git a/skywalking/meter/meter.py b/skywalking/meter/meter.py index f8eacd11..9f04b5e1 100644 --- a/skywalking/meter/meter.py +++ b/skywalking/meter/meter.py @@ -14,9 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # - from abc import ABC, abstractmethod from enum import Enum +from typing import Optional + from skywalking.protocol.language_agent.Meter_pb2 import Label import skywalking.meter as meter @@ -82,9 +83,10 @@ class BaseMeter(ABC): meter_service = None def __init__(self, name: str, tags=None): - if BaseMeter.meter_service is None: - BaseMeter.meter_service = meter._meter_service - + # Should always override to use the correct meter service. + # Otherwise, forked process will inherit the original + # meter_service in parent. We want a new one in child. + BaseMeter.meter_service = meter._meter_service self.meterId = MeterId(name, self.get_type(), tags) def get_name(self): @@ -110,6 +112,7 @@ class Builder(ABC): def __init__(self, name: str, tags=None): # Derived Builder should instantiate its corresponding meter here. # self.meter = BaseMeter(name, tags) + self.meter: Optional[BaseMeter] = None pass def tag(self, name: str, value): diff --git a/skywalking/meter/meter_service.py b/skywalking/meter/meter_service.py index fc7608f6..8afc4d43 100644 --- a/skywalking/meter/meter_service.py +++ b/skywalking/meter/meter_service.py @@ -14,20 +14,21 @@ # See the License for the specific language governing permissions and # limitations under the License. # - import time from concurrent.futures import ThreadPoolExecutor from threading import Thread from skywalking import config -from skywalking import agent +from skywalking.agent import agent from skywalking.meter.meter import BaseMeter from skywalking.utils.time import current_milli_time from skywalking.config import meter_reporter_period +from skywalking.loggings import logger class MeterService(Thread): def __init__(self): - super().__init__(daemon=True) + super().__init__(name='meterService', daemon=True) + logger.debug('Started meter service') self.meter_map = {} def register(self, meter: BaseMeter): @@ -45,7 +46,7 @@ def archive(meterdata): meterdata.timestamp = current_milli_time() agent.archive_meter(meterdata) - with ThreadPoolExecutor(max_workers=1) as executor: + with ThreadPoolExecutor(thread_name_prefix='meter_service_pool_worker', max_workers=1) as executor: executor.map(archive, self.meter_map.values()) def run(self): diff --git a/skywalking/plugins/sw_loguru.py b/skywalking/plugins/sw_loguru.py index f3a94d27..8ebab5f2 100644 --- a/skywalking/plugins/sw_loguru.py +++ b/skywalking/plugins/sw_loguru.py @@ -22,7 +22,8 @@ from os.path import basename, splitext from threading import current_thread -from skywalking import config, agent +from skywalking import config +from skywalking.agent import agent from skywalking.protocol.common.Common_pb2 import KeyStringValuePair from skywalking.protocol.logging.Logging_pb2 import LogData, LogDataBody, TraceContext, LogTags, TextLog from skywalking.trace.context import get_context diff --git a/skywalking/profile/profile_context.py b/skywalking/profile/profile_context.py index 29ae4610..0db7556e 100644 --- a/skywalking/profile/profile_context.py +++ b/skywalking/profile/profile_context.py @@ -23,7 +23,7 @@ from threading import Thread, Event, current_thread from typing import Optional -from skywalking import agent +from skywalking.agent import agent from skywalking import config from skywalking import profile from skywalking.loggings import logger diff --git a/skywalking/profile/profile_service.py b/skywalking/profile/profile_service.py index bc444809..5caf0ee3 100644 --- a/skywalking/profile/profile_service.py +++ b/skywalking/profile/profile_service.py @@ -20,7 +20,7 @@ from threading import Timer, RLock, Lock from typing import Tuple -from skywalking import agent +from skywalking.agent import agent from skywalking.loggings import logger, logger_debug_enabled from skywalking.profile.profile_constants import ProfileConstants from skywalking.profile.profile_context import ProfileTaskExecutionContext @@ -55,7 +55,7 @@ def __init__(self): self._last_command_create_time = -1 # type: int # single thread executor - self.profile_executor = ThreadPoolExecutor(max_workers=1) + self.profile_executor = ThreadPoolExecutor(thread_name_prefix='profile-executor', max_workers=1) self.task_execution_context = AtomicRef(None) self.profile_task_scheduler = Scheduler() diff --git a/skywalking/trace/context.py b/skywalking/trace/context.py index 53dafc2c..baabec30 100644 --- a/skywalking/trace/context.py +++ b/skywalking/trace/context.py @@ -16,9 +16,9 @@ # from typing import Optional -from skywalking import Component, agent, config +from skywalking import Component, config from skywalking import profile -from skywalking.agent import isfull +from skywalking.agent import agent from skywalking.profile.profile_status import ProfileStatusReference from skywalking.trace import ID from skywalking.trace.carrier import Carrier @@ -104,7 +104,7 @@ def __init__(self): @staticmethod def ignore_check(op: str, kind: Kind, carrier: Optional[Carrier] = None): - if config.RE_IGNORE_PATH.match(op) or isfull() or (carrier is not None and carrier.is_suppressed): + if config.RE_IGNORE_PATH.match(op) or agent.is_segment_queue_full() or (carrier is not None and carrier.is_suppressed): return NoopSpan(context=NoopContext()) return None @@ -219,7 +219,7 @@ def stop(self, span: Span) -> bool: self._nspans -= 1 if self._nspans == 0: - agent.archive(self.segment) + agent.archive_segment(self.segment) return True return False diff --git a/skywalking/utils/singleton.py b/skywalking/utils/singleton.py new file mode 100644 index 00000000..f38e3abb --- /dev/null +++ b/skywalking/utils/singleton.py @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +class Singleton(object): + """ + This is to ensure a single process can only have one instance of agent. + Written by Guido van Rossum to implement a singleton pattern. + https://www.python.org/download/releases/2.2/descrintro/#__new__ + Classes that inherit from this class will be singletons. + """ + def __new__(cls, *args, **kwds): + it = cls.__dict__.get('__it__') + if it is not None: + return it + cls.__it__ = it = object.__new__(cls) + it.init(*args, **kwds) + return it + + def init(self, *args, **kwds): + pass diff --git a/tests/unit/test_meter.py b/tests/unit/test_meter.py index 4447abcd..cb9ae190 100644 --- a/tests/unit/test_meter.py +++ b/tests/unit/test_meter.py @@ -23,6 +23,7 @@ from skywalking.meter.histogram import Histogram from skywalking.meter.gauge import Gauge from skywalking.meter.meter import BaseMeter +from skywalking import meter class MockMeterService(): @@ -44,7 +45,7 @@ def transform(self, meter): meter_service = MockMeterService() -BaseMeter.meter_service = meter_service +meter._meter_service = meter_service # picked empirically tolerance = 5e-2 @@ -85,7 +86,6 @@ def test_counter_with_satement(self): self.assertLess(abs(i - (meterdata.singleValue.value - pre)), tolerance) pre = meterdata.singleValue.value - def test_counter_increase_decarator(self): builder = Counter.Builder('c3', CounterMode.INCREMENT) c = builder.build() @@ -168,7 +168,6 @@ def test_histogram_with_satement(self): meterdata = meter_service.transform(h) self.assertEqual(repeat, meterdata.histogram.values[idx].count) - def test_gauge(self): ls = list(range(1, 10)) random.shuffle(ls) From a31b182315f263944bdbeee38befedcc9e8ce56e Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Fri, 10 Feb 2023 19:41:57 +0000 Subject: [PATCH 02/26] Speed up e2e --- skywalking/meter/pvm/data_source.py | 2 +- tests/e2e/script/prepare/install-swctl.sh | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/skywalking/meter/pvm/data_source.py b/skywalking/meter/pvm/data_source.py index 988886ef..3f97c781 100644 --- a/skywalking/meter/pvm/data_source.py +++ b/skywalking/meter/pvm/data_source.py @@ -19,7 +19,7 @@ class DataSource: - def registry(self): + def register(self): for name in dir(self): if name.endswith('generator'): generator = getattr(self, name)() diff --git a/tests/e2e/script/prepare/install-swctl.sh b/tests/e2e/script/prepare/install-swctl.sh index c0c2bc95..a4b46bb0 100644 --- a/tests/e2e/script/prepare/install-swctl.sh +++ b/tests/e2e/script/prepare/install-swctl.sh @@ -25,9 +25,5 @@ BIN_DIR=$2 set -ex if ! command -v swctl &> /dev/null; then - mkdir -p $BASE_DIR/swctl && cd $BASE_DIR/swctl - curl -kLo skywalking-cli.tar.gz https://github.com/apache/skywalking-cli/archive/${SW_CTL_COMMIT}.tar.gz - tar -zxf skywalking-cli.tar.gz --strip=1 - utype=$(uname | awk '{print tolower($0)}') - make $utype-amd64 && mv bin/swctl-*-$utype-amd64 $BIN_DIR/swctl + bash -c "$(curl -fsSL https://raw.githubusercontent.com/apache/skywalking-cli/${SW_CTL_COMMIT}/scripts/install.sh)" fi From eb72dd487d3238798d94d791d579765b31228adf Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Fri, 10 Feb 2023 19:45:02 +0000 Subject: [PATCH 03/26] Speed up yq --- tests/e2e/script/prepare/install-yq.sh | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/e2e/script/prepare/install-yq.sh b/tests/e2e/script/prepare/install-yq.sh index 2f69839f..cc103baf 100644 --- a/tests/e2e/script/prepare/install-yq.sh +++ b/tests/e2e/script/prepare/install-yq.sh @@ -23,8 +23,5 @@ BASE_DIR=$1 BIN_DIR=$2 if ! command -v yq &> /dev/null; then - mkdir -p $BASE_DIR/yq && cd $BASE_DIR/yq - curl -kLo yq.tar.gz https://github.com/mikefarah/yq/archive/v4.29.2.tar.gz - tar -zxf yq.tar.gz --strip=1 - go install && go build -ldflags -s && cp yq $BIN_DIR/ + snap install yq fi From 59f90b8d54d754a292d48f91a97c85f1d4e1b17d Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Fri, 10 Feb 2023 20:46:27 +0000 Subject: [PATCH 04/26] Fix flaky --- .github/workflows/CI.yaml | 2 +- tests/plugin/base.py | 6 ++++++ tests/plugin/web/sw_sanic/services/provider.py | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 70996d02..53935c20 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -156,7 +156,7 @@ jobs: ( always() && ! cancelled() ) && ((github.event_name == 'schedule' && github.repository == 'apache/skywalking-python') || needs.changes.outputs.agent == 'true') runs-on: ubuntu-latest - timeout-minutes: 30 + timeout-minutes: 20 strategy: matrix: python-version: [ "3.7", "3.8", "3.9", "3.10" ] diff --git a/tests/plugin/base.py b/tests/plugin/base.py index b61ece0b..5d1c2826 100644 --- a/tests/plugin/base.py +++ b/tests/plugin/base.py @@ -57,6 +57,12 @@ def validate(self, expected_file_name=None): )) print('diff list: ') + print('----------') + print('-actual') + print(actual_data.splitlines(keepends=True)) + print('----------') + print('-expected') + print(yaml.dump(yaml.load(expected_data, Loader=Loader)).splitlines(keepends=True)) sys.stdout.writelines(diff_list) assert response.status_code == 200 diff --git a/tests/plugin/web/sw_sanic/services/provider.py b/tests/plugin/web/sw_sanic/services/provider.py index 2cc7ba92..1669a21f 100644 --- a/tests/plugin/web/sw_sanic/services/provider.py +++ b/tests/plugin/web/sw_sanic/services/provider.py @@ -30,4 +30,4 @@ async def application(req): ) PORT = 9091 - app.run(host='0.0.0.0', port=PORT, debug=True) + app.run(host='0.0.0.0', port=PORT) From 95828d986cc45ac5442b1ade2d57142203091348 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Sat, 11 Feb 2023 04:43:05 +0000 Subject: [PATCH 05/26] Fix flaky --- .github/workflows/CI.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 53935c20..a36168bd 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -156,7 +156,7 @@ jobs: ( always() && ! cancelled() ) && ((github.event_name == 'schedule' && github.repository == 'apache/skywalking-python') || needs.changes.outputs.agent == 'true') runs-on: ubuntu-latest - timeout-minutes: 20 + timeout-minutes: 15 strategy: matrix: python-version: [ "3.7", "3.8", "3.9", "3.10" ] @@ -254,7 +254,7 @@ jobs: - name: Load docker images run: find docker-images -name "*.tar" -exec docker load -i {} \; - name: Run E2E Tests - uses: apache/skywalking-infra-e2e@964ede199fe199e166920169dc5f8c9b214cfac5 + uses: superskyyy/skywalking-infra-e2e@main with: log-dir: /tmp/e2e-logs e2e-file: ${{ matrix.case.path }} From 50e8cc148aa0c41921bdb7d865372cf10c3c585f Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Sat, 11 Feb 2023 04:45:02 +0000 Subject: [PATCH 06/26] ffast --- .github/workflows/CI.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index a36168bd..bf513046 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -184,7 +184,7 @@ jobs: - name: Run unit tests run: | make env - poetry run pytest -v ${{ matrix.test-path }} + poetry run pytest -x -v ${{ matrix.test-path }} docker-e2e: # build docker image for E2E tests, single Python version for now. From 99de99022ac09dd3ca629e992f142fdd6e23ccde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Superskyyy=20=28AWAY=2C=20busy=20graduating=20=7C=20Debug?= =?UTF-8?q?=20=E4=BA=BA=29?= Date: Sun, 12 Feb 2023 18:45:13 -0500 Subject: [PATCH 07/26] Update skywalking/agent/__init__.py --- skywalking/agent/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index c2f69875..59d284bf 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -197,7 +197,6 @@ def start(self) -> None: # https://github.com/grpc/grpc/blob/master/doc/fork_support.md if config.protocol == 'grpc': os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' - os.environ['GRPC_POLL_STRATEGY'] = 'poll' if not self.__started: # if not already started, start the agent From 99a6ae17cf4a9b59e5205eed08f30da8b18e978d Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Mon, 13 Feb 2023 00:44:16 +0000 Subject: [PATCH 08/26] fix --- tests/e2e/script/prepare/install-swctl.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/e2e/script/prepare/install-swctl.sh b/tests/e2e/script/prepare/install-swctl.sh index a4b46bb0..c0c2bc95 100644 --- a/tests/e2e/script/prepare/install-swctl.sh +++ b/tests/e2e/script/prepare/install-swctl.sh @@ -25,5 +25,9 @@ BIN_DIR=$2 set -ex if ! command -v swctl &> /dev/null; then - bash -c "$(curl -fsSL https://raw.githubusercontent.com/apache/skywalking-cli/${SW_CTL_COMMIT}/scripts/install.sh)" + mkdir -p $BASE_DIR/swctl && cd $BASE_DIR/swctl + curl -kLo skywalking-cli.tar.gz https://github.com/apache/skywalking-cli/archive/${SW_CTL_COMMIT}.tar.gz + tar -zxf skywalking-cli.tar.gz --strip=1 + utype=$(uname | awk '{print tolower($0)}') + make $utype-amd64 && mv bin/swctl-*-$utype-amd64 $BIN_DIR/swctl fi From f23b3bcc68f1b1a0b115e059d11eefc6fddabf25 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Mon, 13 Feb 2023 01:13:53 +0000 Subject: [PATCH 09/26] fix --- skywalking/agent/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 59d284bf..455e41fb 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -195,9 +195,9 @@ def start(self) -> None: # export grpc fork support env # This is required for grpcio to work with fork() # https://github.com/grpc/grpc/blob/master/doc/fork_support.md - if config.protocol == 'grpc': - os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' - + # if config.protocol == 'grpc': + # os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' + # print(os.getpid()) if not self.__started: # if not already started, start the agent self.__started = True From 28430878cb30d403f59f7ad765dea5de48df8bfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Superskyyy=20=28AWAY=2C=20busy=20graduating=20=7C=20Debug?= =?UTF-8?q?=20=E4=BA=BA=29?= Date: Sun, 12 Feb 2023 22:08:32 -0500 Subject: [PATCH 10/26] Fix hanging --- skywalking/agent/__init__.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 455e41fb..065c4e48 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -192,12 +192,14 @@ def start(self) -> None: When os.fork(), the service instance should be changed to a new one by appending pid. """ - # export grpc fork support env # This is required for grpcio to work with fork() # https://github.com/grpc/grpc/blob/master/doc/fork_support.md - # if config.protocol == 'grpc': - # os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' - # print(os.getpid()) + # This is not available in Python 3.7 due to hanging issue + # https://github.com/grpc/grpc/issues/18075 + if config.protocol == 'grpc' and config.experimental_fork_support: + os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' + os.environ['GRPC_POLL_STRATEGY'] = 'poll' + if not self.__started: # if not already started, start the agent self.__started = True From 78acdb7b19aefb1999c0e36e2c3353da34fb2fdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Superskyyy=20=28AWAY=2C=20busy=20graduating=20=7C=20Debug?= =?UTF-8?q?=20=E4=BA=BA=29?= Date: Wed, 15 Feb 2023 02:11:39 -0500 Subject: [PATCH 11/26] Fix logger --- skywalking/meter/meter_service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/skywalking/meter/meter_service.py b/skywalking/meter/meter_service.py index e51bb312..a56fbfe3 100644 --- a/skywalking/meter/meter_service.py +++ b/skywalking/meter/meter_service.py @@ -21,6 +21,7 @@ from skywalking.agent import agent from skywalking.meter.meter import BaseMeter from skywalking.utils.time import current_milli_time +from skywalking.loggings import logger class MeterService(Thread): From 0c1ccd8fcf914efd49eceb2e7c168d58c859f703 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Wed, 15 Feb 2023 18:31:17 +0000 Subject: [PATCH 12/26] fix --- README.md | 2 +- docs/README.md | 2 +- docs/en/contribution/How-to-release.md | 2 +- docs/en/setup/CLI.md | 11 ++- docs/en/setup/Configuration.md | 2 +- docs/en/setup/Installation.md | 2 +- docs/en/setup/Intrusive.md | 21 ++-- docs/en/setup/Plugins.md | 1 + docs/en/setup/advanced/LogReporter.md | 21 ++-- docs/en/setup/advanced/MeterReporter.md | 11 ++- docs/en/setup/faq/How-to-use-with-uwsgi.md | 4 +- docs/menu.yml | 6 +- pyproject.toml | 2 +- skywalking/agent/__init__.py | 104 ++++++++++++-------- skywalking/config.py | 6 +- skywalking/plugins/sw_kafka.py | 2 +- skywalking/plugins/sw_redis.py | 109 +++------------------ skywalking/plugins/sw_requests.py | 2 +- skywalking/plugins/sw_tornado.py | 4 +- 19 files changed, 130 insertions(+), 184 deletions(-) diff --git a/README.md b/README.md index 2df7758d..c1b04987 100755 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Sky Walking logo -**SkyWalking-Python**: The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging abilities for Python projects. +**SkyWalking-Python**: The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging/profiling abilities for Python projects. **[SkyWalking](https://github.com/apache/skywalking)**: Application performance monitor tool for distributed systems, especially designed for microservices, cloud native and container-based (Kubernetes) architectures. diff --git a/docs/README.md b/docs/README.md index b28327f4..a31481fb 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,7 +2,7 @@ **This is the official documentation of SkyWalking Python agent. Welcome to the SkyWalking community!** -The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging abilities for Python projects. +The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging/profiling abilities for Python projects. This documentation covers a number of ways to set up the Python agent for various use cases. diff --git a/docs/en/contribution/How-to-release.md b/docs/en/contribution/How-to-release.md index ea000a10..57181cdd 100644 --- a/docs/en/contribution/How-to-release.md +++ b/docs/en/contribution/How-to-release.md @@ -201,7 +201,7 @@ Vote result should follow these: On behalf of the SkyWalking Team, I’m glad to announce that SkyWalking Python $VERSION is now released. - SkyWalking Python: The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging abilities for Python projects. + SkyWalking Python: The Python Agent for Apache SkyWalking provides the native tracing/metrics/logging/profiling abilities for Python projects. SkyWalking: APM (application performance monitor) tool for distributed systems, especially designed for microservices, cloud native and container-based (Docker, Kubernetes, Mesos) architectures. diff --git a/docs/en/setup/CLI.md b/docs/en/setup/CLI.md index 9aa712da..51703a09 100644 --- a/docs/en/setup/CLI.md +++ b/docs/en/setup/CLI.md @@ -1,9 +1,12 @@ -# SkyWalking Python Agent Command-Line Interface(CLI) +# SkyWalking Python Agent Command-Line Interface (sw-python CLI) In releases before 0.7.0, you would at least need to add the following lines to your applications to get the agent attached and running. +This is the recommended way of running your application with Python agent. + ```python -from skywalking import agent +from skywalking import agent, config +config.init(SomeConfig) agent.start() ``` @@ -17,6 +20,8 @@ just like the [SkyWalking Java Agent](https://github.com/apache/skywalking-java) Upon successful [installation of the SkyWalking Python agent via pip](Installation.md#from-pypi), a command-line script `sw-python` is installed in your environment (virtual env preferred). +run `sw-python` to see if it is available. + ### The `run` option Currently, the `sw-python` CLI provides a `run` option, which you can use to execute your applications @@ -64,7 +69,7 @@ You would normally want to provide additional configurations other than the defa The currently supported method is to provide the environment variables listed and explained in the [Environment Variables List](Configuration.md). -#### Through a sw-config.yaml +#### Through a sw-config.yaml (TBD) Currently, only environment variable configuration is supported; an optional `yaml` configuration is to be implemented. diff --git a/docs/en/setup/Configuration.md b/docs/en/setup/Configuration.md index fc947793..249ec021 100644 --- a/docs/en/setup/Configuration.md +++ b/docs/en/setup/Configuration.md @@ -18,7 +18,7 @@ export SW_AGENT_YourConfiguration=YourValue | Configuration | Environment Variable | Type | Default Value | Description | | :------------ | :------------ | :------------ | :------------ | :------------ | | agent_collector_backend_services | SW_AGENT_AGENT_COLLECTOR_BACKEND_SERVICES | | oap_host:oap_port | The backend OAP server address, 11800 is default OAP gRPC port, 12800 is HTTP, Kafka ignores this option and uses kafka_bootstrap_servers option. **This option should be changed accordingly with selected protocol** | -| protocol | SW_AGENT_PROTOCOL | | grpc | The protocol to communicate with the backend OAP, `http`, `grpc` or `kafka`, **we highly suggest using `grpc` in production as it's well optimized than `http`**. The `kafka` protocol provides an alternative way to submit data to the backend. | +| agent_protocol | SW_AGENT_AGENT_PROTOCOL | | grpc | The protocol to communicate with the backend OAP, `http`, `grpc` or `kafka`, **we highly suggest using `grpc` in production as it's well optimized than `http`**. The `kafka` protocol provides an alternative way to submit data to the backend. | | agent_name | SW_AGENT_AGENT_NAME | | Python Service Name | The name of your awesome Python service | | agent_instance_name | SW_AGENT_AGENT_INSTANCE_NAME | | str(uuid.uuid1()).replace('-', '') | The name of this particular awesome Python service instance | | agent_namespace | SW_AGENT_AGENT_NAMESPACE | | | The agent namespace of the Python service (available as tag and the suffix of service name) | diff --git a/docs/en/setup/Installation.md b/docs/en/setup/Installation.md index 0ef9ee1d..9a9c042a 100644 --- a/docs/en/setup/Installation.md +++ b/docs/en/setup/Installation.md @@ -6,7 +6,7 @@ You can install the SkyWalking Python agent via various ways described next. > **Already installed? Check out easy ways to start the agent in your application** -> [Non-intrusive](CLI.md) | [Intrusive ](Intrusive.md) | [Containerization](Container.md) +> [Non-intrusive ](CLI.md) | [Intrusive ](Intrusive.md) | [Containerization](Container.md) > **All available configurations are listed [here](Configuration.md)** diff --git a/docs/en/setup/Intrusive.md b/docs/en/setup/Intrusive.md index 743bb060..089579e7 100644 --- a/docs/en/setup/Intrusive.md +++ b/docs/en/setup/Intrusive.md @@ -7,52 +7,55 @@ which is by importing SkyWalking into your project and starting the agent. By default, SkyWalking Python agent uses gRPC protocol to report data to SkyWalking backend, in SkyWalking backend, the port of gRPC protocol is `11800`, and the port of HTTP protocol is `12800`, -You could configure `collector_address` (or environment variable `SW_AGENT_COLLECTOR_BACKEND_SERVICES`) -and set `protocol` (or environment variable `SW_AGENT_PROTOCOL` to one of +See all default configuration values in the [Configuration Vocabulary](Configuration.md) + +You could configure `agent_collector_backend_services` (or environment variable `SW_AGENT_COLLECTOR_BACKEND_SERVICES`) +and set `agent_protocol` (or environment variable `SW_AGENT_PROTOCOL` to one of `gprc`, `http` or `kafka` according to the protocol you would like to use. ### Report data via gRPC protocol (Default) -For example, if you want to use gRPC protocol to report data, configure `collector_address` +For example, if you want to use gRPC protocol to report data, configure `agent_collector_backend_services` (or environment variable `SW_AGENT_COLLECTOR_BACKEND_SERVICES`) to `:11800`, such as `127.0.0.1:11800`: ```python from skywalking import agent, config -config.init(collector_address='127.0.0.1:11800', agent_name='your awesome service') +config.init(agent_collector_backend_services='127.0.0.1:11800', agent_name='your awesome service', agent_instance_name='your-instance-name or ') agent.start() ``` ### Report data via HTTP protocol -However, if you want to use HTTP protocol to report data, configure `collector_address` +However, if you want to use HTTP protocol to report data, configure `agent_collector_backend_services` (or environment variable `SW_AGENT_COLLECTOR_BACKEND_SERVICES`) to `:12800`, -such as `127.0.0.1:12800`, further set `protocol` (or environment variable `SW_AGENT_PROTOCOL` to `http`): +such as `127.0.0.1:12800`, further set `agent_protocol` (or environment variable `SW_AGENT_PROTOCOL` to `http`): > Remember you should install `skywalking-python` with extra requires `http`, `pip install "apache-skywalking[http]`. ```python from skywalking import agent, config -config.init(collector_address='127.0.0.1:12800', agent_name='your awesome service', protocol='http') +config.init(agent_collector_backend_services='127.0.0.1:12800', agent_name='your awesome service', agent_protocol='http', agent_instance_name='your-instance-name or ') agent.start() ``` ### Report data via Kafka protocol +**Please make sure OAP is consuming the same Kafka topic as your agent produces to, `kafka_namespace` must match OAP side configuration `plugin.kafka.namespace`** Finally, if you want to use Kafka protocol to report data, configure `kafka_bootstrap_servers` (or environment variable `SW_KAFKA_BOOTSTRAP_SERVERS`) to `kafka-brokers`, -such as `127.0.0.1:9200`, further set `protocol` (or environment variable `SW_AGENT_PROTOCOL` to `kafka`): +such as `127.0.0.1:9200`, further set `agent_protocol` (or environment variable `SW_AGENT_PROTOCOL` to `kafka`): > Remember you should install `skywalking-python` with extra requires `kafka`, `pip install "apache-skywalking[kafka]"`. ```python from skywalking import agent, config -config.init(kafka_bootstrap_servers='127.0.0.1:9200', agent_name='your awesome service', protocol='kafka') +config.init(kafka_bootstrap_servers='127.0.0.1:9200', agent_name='your awesome service', agent_protocol='kafka', agent_instance_name='your-instance-name or ') agent.start() ``` diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md index 4afa888a..06c8e6ec 100644 --- a/docs/en/setup/Plugins.md +++ b/docs/en/setup/Plugins.md @@ -50,6 +50,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!) - The celery server running with "celery -A ..." should be run with the HTTP protocol as it uses multiprocessing by default which is not compatible with the gRPC protocol implementation in SkyWalking currently. Celery clients can use whatever protocol they want. +- Known incompatibility: Redis lib 4.0+ args length is no longer correct - The websocket instrumentation only traces client side connection handshake, the actual message exchange (send/recv) is not traced since injecting headers to socket message body is the only way to propagate the trace context, which requires customization of message structure diff --git a/docs/en/setup/advanced/LogReporter.md b/docs/en/setup/advanced/LogReporter.md index bbe2c440..53cd8733 100644 --- a/docs/en/setup/advanced/LogReporter.md +++ b/docs/en/setup/advanced/LogReporter.md @@ -2,18 +2,9 @@ This functionality reports logs collected from the Python logging module (in theory, also logging libraries depending on the core logging module) and loguru module. -To utilize this feature, you will need to add some new configurations to the agent initialization step. +From Python agent 1.0.0, the log reporter is automatically enabled and can be disabled through `agent_log_reporter_active=False` or `SW_AGENT_LOG_REPORTER_ACTIVE=False`. -## Enabling the feature -```Python -from skywalking import agent, config - -config.init(collector_address='127.0.0.1:11800', agent_name='your awesome service', - log_reporter_active=True) # defaults to grpc protocol -agent.start() -``` - -Log reporter supports all three protocols including `grpc`, `http` and `kafka`, which shares the same config `protocol` with trace reporter. +Log reporter supports all three protocols including `grpc`, `http` and `kafka`, which shares the same config `agent_protocol` with trace reporter. If chosen `http` protocol, the logs will be batch-reported to the collector REST endpoint `oap/v3/logs`. @@ -21,12 +12,14 @@ If chosen `kafka` protocol, please make sure to config [kafka-fetcher](https://skywalking.apache.org/docs/main/v9.1.0/en/setup/backend/kafka-fetcher/) on the OAP side, and make sure Python agent config `kafka_bootstrap_servers` points to your Kafka brokers. -`log_reporter_active=True` - Enables the log reporter. +**Please make sure OAP is consuming the same Kafka topic as your agent produces to, `kafka_namespace` must match OAP side configuration `plugin.kafka.namespace`** + +`agent_log_reporter_active=True` - Enables the log reporter. -`log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. +`agent_log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. Alternatively, you can pass configurations through environment variables. -Please refer to the [Environment Variables List](../Configuration.md) for the list of environment variables associated with the log reporter. +Please refer to the [Configuration Vocabulary](../Configuration.md) for the list of environment variables associated with the log reporter. ## Specify a logging level Only the logs with a level equal to or higher than the specified will be collected and reported. diff --git a/docs/en/setup/advanced/MeterReporter.md b/docs/en/setup/advanced/MeterReporter.md index f6a6d850..5697e65e 100644 --- a/docs/en/setup/advanced/MeterReporter.md +++ b/docs/en/setup/advanced/MeterReporter.md @@ -1,12 +1,15 @@ # Python Agent Meter Reporter -To enable or disable this feature, you will need to set some environment variables. - **Important Note**: Meter reporter is currently available to send in `gRPC` and `Kafka` protocol, `HTTP` protocol is not implemented yet (requires additional handler on SkyWalking OAP side). -## Enabling the feature (default) +## Enabling the feature (default is enabled) +**PVM Reporter is also by default enabled, meaning useful Python metrics such as thread count/GC info will be shown in OAP General Services - Instance - PVM Tab)** +If you really don't need such a feature, disable them through `config.agent_pvm_meter_reporter_active` or `SW_AGENT_PVM_METER_REPORTER_ACTIVE` + ```Python +config.agent_meter_reporter_active = True +# Or os.environ['SW_AGENT_METER_REPORTER_ACTIVE'] = 'True' ``` or @@ -88,7 +91,7 @@ h = builder.build() builder = Histogram.Builder('h3', [i / 10 for i in range(10)]) h = builder.build() -# Histogram h will record the time the with-wrapped codes consumed +# Histogram h will record the time the with-wprapped codes consumed with h.create_timer(): # some codes may consume a certain time ``` diff --git a/docs/en/setup/faq/How-to-use-with-uwsgi.md b/docs/en/setup/faq/How-to-use-with-uwsgi.md index 6364c64c..a56ae416 100644 --- a/docs/en/setup/faq/How-to-use-with-uwsgi.md +++ b/docs/en/setup/faq/How-to-use-with-uwsgi.md @@ -1,4 +1,4 @@ -# How to use with uWSGI ? +# How to use with uWSGI? [uWSGI](https://uwsgi-docs.readthedocs.io/en/latest/) is popular in the Python ecosystem. It is a lightweight, fast, and easy-to-use web server. @@ -16,7 +16,7 @@ from uwsgidecorators import postfork @postfork def init_tracing(): - config.init(collector_address='127.0.0.1:11800', agent_name='your awesome service') + config.init(agent_collector_backend_services='127.0.0.1:11800', agent_name='your awesome service') agent.start() diff --git a/docs/menu.yml b/docs/menu.yml index 1017bd14..a5232d80 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -22,9 +22,9 @@ catalog: path: "/en/setup/installation" - name: "Integration" catalog: - - name: "Non-Intrusive Setup" + - name: "Non-Intrusive Setup (Recommended)" path: "/en/setup/CLI" - - name: "Legacy Setup" + - name: "Legacy Setup (Fallback)" path: "/en/setup/Intrusive" - name: "Containerized Setup" path: "/en/setup/Container" @@ -38,7 +38,7 @@ catalog: path: "/en/setup/advanced/MeterReporter" - name: "Manual Trace Instrumentation" path: "/en/setup/advanced/API" - - name: "Plugins" + - name: "Supported Plugins" catalog: - name: "Supported Libraries" path: "/en/setup/Plugins" diff --git a/pyproject.toml b/pyproject.toml index 80365c68..208ca480 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ [tool.poetry] name = "apache-skywalking" version = "0.8.0" -description = "The Python Agent for Apache SkyWalking, which provides the native tracing/metrics/logging abilities for Python projects." +description = "The Python Agent for Apache SkyWalking, which provides the native tracing/metrics/logging/profiling abilities for Python projects." license = "Apache-2.0" authors = ["Apache Software Foundation "] maintainers = ["Apache SkyWalking Community "] diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 065c4e48..ba1ff0b0 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -16,30 +16,31 @@ # import atexit +import functools import os +import sys from queue import Queue, Full from threading import Thread, Event from typing import TYPE_CHECKING, Optional from skywalking import config, plugins from skywalking import loggings -from skywalking import profile from skywalking import meter +from skywalking import profile from skywalking.agent.protocol import Protocol from skywalking.command import command_service from skywalking.loggings import logger from skywalking.profile.profile_task import ProfileTask from skywalking.profile.snapshot import TracingThreadSnapshot -from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.protocol.language_agent.Meter_pb2 import MeterData +from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.utils.singleton import Singleton -import functools if TYPE_CHECKING: from skywalking.trace.context import Segment -def report_with_backoff(init_wait): +def report_with_backoff(reporter_name, init_wait): """ An exponential backoff for retrying reporters. """ @@ -54,13 +55,11 @@ def backoff_wrapper(self, *args, **kwargs): wait = base # reset to base wait time on success except Exception: # noqa wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum - logger.exception(f'Exception in reporter in pid {os.getpid()}, retry in {wait} seconds') - + logger.exception(f'Exception in {reporter_name} service in pid {os.getpid()}, ' + f'retry in {wait} seconds') self._finished.wait(wait) logger.info('finished reporter thread') - return backoff_wrapper - return backoff_decorator @@ -84,13 +83,13 @@ def __init__(self): def __bootstrap(self): # when forking, already instrumented modules must not be instrumented again # otherwise it will cause double instrumentation! (we should provide an un-instrument method) - if config.protocol == 'grpc': + if config.agent_protocol == 'grpc': from skywalking.agent.protocol.grpc import GrpcProtocol self.__protocol = GrpcProtocol() - elif config.protocol == 'http': + elif config.agent_protocol == 'http': from skywalking.agent.protocol.http import HttpProtocol self.__protocol = HttpProtocol() - elif config.protocol == 'kafka': + elif config.agent_protocol == 'kafka': from skywalking.agent.protocol.kafka import KafkaProtocol self.__protocol = KafkaProtocol() @@ -117,16 +116,16 @@ def __init_threading(self) -> None: __heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True) __heartbeat_thread.start() - self.__segment_queue = Queue(maxsize=config.trace_reporter_max_buffer_size) + self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size) __segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True) __segment_report_thread.start() - if config.meter_reporter_active: - self.__meter_queue = Queue(maxsize=config.meter_reporter_max_buffer_size) + if config.agent_meter_reporter_active: + self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size) __meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True) __meter_report_thread.start() - if config.pvm_meter_reporter_active: + if config.agent_pvm_meter_reporter_active: from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource from skywalking.meter.pvm.gc_data import GCDataSource from skywalking.meter.pvm.mem_usage import MEMUsageDataSource @@ -137,18 +136,18 @@ def __init_threading(self) -> None: GCDataSource().register() ThreadDataSource().register() - if config.log_reporter_active: - self.__log_queue = Queue(maxsize=config.log_reporter_max_buffer_size) + if config.agent_log_reporter_active: + self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size) __log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True) __log_report_thread.start() - if config.profiler_active: + if config.agent_profile_active: # Now only profiler receives commands from OAP __command_dispatch_thread = Thread(name='CommandDispatchThread', target=self.__command_dispatch, daemon=True) __command_dispatch_thread.start() - self.__snapshot_queue = Queue(maxsize=config.profile_snapshot_transport_buffer_size) + self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size) __query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command, daemon=True) @@ -158,7 +157,8 @@ def __init_threading(self) -> None: daemon=True) __send_profile_thread.start() - def __fork_before(self) -> None: + @staticmethod # for now + def __fork_before() -> None: """ This handles explicit fork() calls. The child process will not have a running thread, so we need to revive all of them. The parent process will continue to run as normal. @@ -172,6 +172,7 @@ def __fork_before(self) -> None: logger.warning('SkyWalking Python agent fork support is currently experimental, ' 'please report issues if you encounter any.') + @staticmethod # for now def __fork_after_in_parent(self) -> None: """ Something to do after fork() in parent process @@ -192,11 +193,27 @@ def start(self) -> None: When os.fork(), the service instance should be changed to a new one by appending pid. """ + python_version: tuple = sys.version_info[:2] + if python_version[0] < 3 and python_version[1] < 7: + # agent may or may not work for Python 3.6 and below + # since 3.6 is EOL, we will not officially support it + logger.warning('SkyWalking Python agent does not support Python 3.6 and below, ' + 'please upgrade to Python 3.7 or above.') + if python_version[0] == 3 and python_version[1] > 10: + logger.warning('SkyWalking Python agent does not support Python 3.11 and above, ' + 'though it is likely to work properly, official support is not offered yet.') # This is required for grpcio to work with fork() # https://github.com/grpc/grpc/blob/master/doc/fork_support.md # This is not available in Python 3.7 due to hanging issue # https://github.com/grpc/grpc/issues/18075 - if config.protocol == 'grpc' and config.experimental_fork_support: + if config.agent_protocol == 'grpc' and config.agent_experimental_fork_support: + python_version: tuple = sys.version_info[:2] + if python_version[0] == 3 and python_version[1] == 7: + raise RuntimeError('gRPC fork support is not safe on Python 3.7 and can cause hanging. ' + 'See: https://github.com/grpc/grpc/issues/18075.' + 'Please either upgrade to Python 3.8+ (recommended), ' + 'or use HTTP/Kafka protocol, or disable experimental fork support.') + os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' os.environ['GRPC_POLL_STRATEGY'] = 'poll' @@ -205,7 +222,7 @@ def start(self) -> None: self.__started = True # Install logging plugins # TODO - Add support for printing traceID/ context in logs - if config.log_reporter_active: + if config.agent_log_reporter_active: from skywalking import log log.install() # Here we install all other lib plugins on first time start (parent process) @@ -220,7 +237,7 @@ def start(self) -> None: # Fork support is controlled by config.agent_fork_support :default: False # Important: This does not impact pre-forking server support (uwsgi, gunicorn, etc...) # This is only for explicit long-running fork() calls. - config.service_instance = f'{config.service_instance}-child-{os.getpid()}' + config.agent_instance_name = f'{config.agent_instance_name}-child-{os.getpid()}' self.started_pid = os.getpid() @@ -243,7 +260,7 @@ def start(self) -> None: atexit.register(self.__fini) - if config.experimental_fork_support: + if config.agent_experimental_fork_support: if hasattr(os, 'register_at_fork'): os.register_at_fork(before=self.__fork_before, after_in_parent=self.__fork_after_in_parent, after_in_child=self.__fork_after_in_child) @@ -256,55 +273,59 @@ def __fini(self): self.__protocol.report_segment(self.__segment_queue, False) self.__segment_queue.join() - if config.log_reporter_active: + if config.agent_log_reporter_active: self.__protocol.report_log(self.__log_queue, False) self.__log_queue.join() - if config.profiler_active: + if config.agent_profile_active: self.__protocol.report_snapshot(self.__snapshot_queue, False) self.__snapshot_queue.join() - if config.meter_reporter_active: + if config.agent_meter_reporter_active: self.__protocol.report_meter(self.__meter_queue, False) self.__meter_queue.join() self._finished.set() - def stop(self): + def stop(self) -> None: + """ + Stops the agent and reset the started flag. + """ atexit.unregister(self.__fini) self.__fini() self.__started = False - @report_with_backoff(init_wait=config.heartbeat_period) - def __heartbeat(self): + @report_with_backoff(reporter_name='heartbeat', init_wait=config.agent_collector_heartbeat_period) + def __heartbeat(self) -> None: self.__protocol.heartbeat() - @report_with_backoff(init_wait=0) - def __report_segment(self): + @report_with_backoff(reporter_name='segment', init_wait=0) + def __report_segment(self) -> None: if not self.__segment_queue.empty(): self.__protocol.report_segment(self.__segment_queue) - @report_with_backoff(init_wait=0) - def __report_log(self): + @report_with_backoff(reporter_name='log', init_wait=0) + def __report_log(self) -> None: if not self.__log_queue.empty(): self.__protocol.report_log(self.__log_queue) - @report_with_backoff(init_wait=config.meter_reporter_period) - def __report_meter(self): + @report_with_backoff(reporter_name='meter', init_wait=config.agent_meter_reporter_period) + def __report_meter(self) -> None: if not self.__meter_queue.empty(): self.__protocol.report_meter(self.__meter_queue) - @report_with_backoff(init_wait=0.5) - def __send_profile_snapshot(self): + @report_with_backoff(reporter_name='profile_snapshot', init_wait=0.5) + def __send_profile_snapshot(self) -> None: if not self.__snapshot_queue.empty(): self.__protocol.report_snapshot(self.__snapshot_queue) - @report_with_backoff(init_wait=config.get_profile_task_interval) - def __query_profile_command(self): + @report_with_backoff(reporter_name='query_profile_command', + init_wait=config.agent_collector_get_profile_task_interval) + def __query_profile_command(self) -> None: self.__protocol.query_profile_commands() @staticmethod - def __command_dispatch(): + def __command_dispatch() -> None: # command dispatch will stuck when there are no commands command_service.dispatch() @@ -343,5 +364,6 @@ def notify_profile_finish(self, task: ProfileTask): # Export for user (backwards compatibility) +# so users still use `from skywalking import agent` agent = SkyWalkingAgent() start = agent.start diff --git a/skywalking/config.py b/skywalking/config.py index 67c5ac60..a6ded2ac 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -49,7 +49,7 @@ # The protocol to communicate with the backend OAP, `http`, `grpc` or `kafka`, **we highly suggest using `grpc` in # production as it's well optimized than `http`**. The `kafka` protocol provides an alternative way to submit data to # the backend. -protocol: str = os.getenv('SW_AGENT_PROTOCOL', 'grpc').lower() +agent_protocol: str = os.getenv('SW_AGENT_PROTOCOL', 'grpc').lower() # The name of your awesome Python service agent_name: str = os.getenv('SW_AGENT_NAME', 'Python Service Name') # The name of this particular awesome Python service instance @@ -233,12 +233,12 @@ def finalize_feature() -> None: """ global agent_profile_active, agent_meter_reporter_active - if protocol == 'http' and (agent_profile_active or agent_meter_reporter_active): + if agent_protocol == 'http' and (agent_profile_active or agent_meter_reporter_active): agent_profile_active = False agent_meter_reporter_active = False warnings.warn('HTTP protocol does not support meter reporter and profiler. Please use gRPC protocol if you ' 'would like to use both features.') - elif protocol == 'kafka' and agent_profile_active: + elif agent_protocol == 'kafka' and agent_profile_active: agent_profile_active = False warnings.warn('Kafka protocol does not support profiler. Please use gRPC protocol if you would like to use ' 'this feature.') diff --git a/skywalking/plugins/sw_kafka.py b/skywalking/plugins/sw_kafka.py index 60f3d9c0..da685caa 100644 --- a/skywalking/plugins/sw_kafka.py +++ b/skywalking/plugins/sw_kafka.py @@ -74,7 +74,7 @@ def _sw__poll_once(this, timeout_ms, max_records, update_offsets=True): def _sw_send_func(_send): def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None): # ignore trace, log and meter reporter - skywalking self request - if config.protocol == 'kafka' and \ + if config.agent_protocol == 'kafka' and \ (config.kafka_topic_segment == topic or config.kafka_topic_log == topic or config.kafka_topic_management == topic diff --git a/skywalking/plugins/sw_redis.py b/skywalking/plugins/sw_redis.py index 0dc95dda..87b9aad6 100644 --- a/skywalking/plugins/sw_redis.py +++ b/skywalking/plugins/sw_redis.py @@ -22,105 +22,24 @@ link_vector = ['https://github.com/andymccurdy/redis-py/'] support_matrix = { 'redis': { - '>=3.7': ['3.5'] # "4.0" next, incompatible to current instrumentation + '>=3.7': ['3.5'] } } -note = """""" +note = """Known incompatibility: Redis lib 4.0+ args length is no longer correct""" -OPERATIONS_WRITE = set({'GETSET', - 'SET', - 'SETBIT', - 'SETEX ', - 'SETNX ', - 'SETRANGE', - 'STRLEN ', - 'MSET', - 'MSETNX ', - 'PSETEX', - 'INCR ', - 'INCRBY ', - 'INCRBYFLOAT', - 'DECR ', - 'DECRBY ', - 'APPEND ', - 'HMSET', - 'HSET', - 'HSETNX ', - 'HINCRBY', - 'HINCRBYFLOAT', - 'HDEL', - 'RPOPLPUSH', - 'RPUSH', - 'RPUSHX', - 'LPUSH', - 'LPUSHX', - 'LREM', - 'LTRIM', - 'LSET', - 'BRPOPLPUSH', - 'LINSERT', - 'SADD', - 'SDIFF', - 'SDIFFSTORE', - 'SINTERSTORE', - 'SISMEMBER', - 'SREM', - 'SUNION', - 'SUNIONSTORE', - 'SINTER', - 'ZADD', - 'ZINCRBY', - 'ZINTERSTORE', - 'ZRANGE', - 'ZRANGEBYLEX', - 'ZRANGEBYSCORE', - 'ZRANK', - 'ZREM', - 'ZREMRANGEBYLEX', - 'ZREMRANGEBYRANK', - 'ZREMRANGEBYSCORE', - 'ZREVRANGE', - 'ZREVRANGEBYSCORE', - 'ZREVRANK', - 'ZUNIONSTORE', - 'XADD', - 'XDEL', - 'DEL', - 'XTRIM'}) +OPERATIONS_WRITE = {'GETSET', 'SET', 'SETBIT', 'SETEX ', 'SETNX ', 'SETRANGE', 'STRLEN ', 'MSET', 'MSETNX ', 'PSETEX', + 'INCR ', 'INCRBY ', 'INCRBYFLOAT', 'DECR ', 'DECRBY ', 'APPEND ', 'HMSET', 'HSET', 'HSETNX ', + 'HINCRBY', 'HINCRBYFLOAT', 'HDEL', 'RPOPLPUSH', 'RPUSH', 'RPUSHX', 'LPUSH', 'LPUSHX', 'LREM', + 'LTRIM', 'LSET', 'BRPOPLPUSH', 'LINSERT', 'SADD', 'SDIFF', 'SDIFFSTORE', 'SINTERSTORE', 'SISMEMBER', + 'SREM', 'SUNION', 'SUNIONSTORE', 'SINTER', 'ZADD', 'ZINCRBY', 'ZINTERSTORE', 'ZRANGE', + 'ZRANGEBYLEX', 'ZRANGEBYSCORE', 'ZRANK', 'ZREM', 'ZREMRANGEBYLEX', 'ZREMRANGEBYRANK', + 'ZREMRANGEBYSCORE', 'ZREVRANGE', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZUNIONSTORE', 'XADD', 'XDEL', + 'DEL', 'XTRIM'} -OPERATIONS_READ = set({'GETRANGE', - 'GETBIT ', - 'MGET', - 'HVALS', - 'HKEYS', - 'HLEN', - 'HEXISTS', - 'HGET', - 'HGETALL', - 'HMGET', - 'BLPOP', - 'BRPOP', - 'LINDEX', - 'LLEN', - 'LPOP', - 'LRANGE', - 'RPOP', - 'SCARD', - 'SRANDMEMBER', - 'SPOP', - 'SSCAN', - 'SMOVE', - 'ZLEXCOUNT', - 'ZSCORE', - 'ZSCAN', - 'ZCARD', - 'ZCOUNT', - 'XGET', - 'GET', - 'XREAD', - 'XLEN', - 'XRANGE', - 'XREVRANGE'}) +OPERATIONS_READ = {'GETRANGE', 'GETBIT ', 'MGET', 'HVALS', 'HKEYS', 'HLEN', 'HEXISTS', 'HGET', 'HGETALL', 'HMGET', + 'BLPOP', 'BRPOP', 'LINDEX', 'LLEN', 'LPOP', 'LRANGE', 'RPOP', 'SCARD', 'SRANDMEMBER', 'SPOP', + 'SSCAN', 'SMOVE', 'ZLEXCOUNT', 'ZSCORE', 'ZSCAN', 'ZCARD', 'ZCOUNT', 'XGET', 'GET', 'XREAD', 'XLEN', + 'XRANGE', 'XREVRANGE'} def install(): diff --git a/skywalking/plugins/sw_requests.py b/skywalking/plugins/sw_requests.py index 743bb32b..6d8c4e18 100644 --- a/skywalking/plugins/sw_requests.py +++ b/skywalking/plugins/sw_requests.py @@ -43,7 +43,7 @@ def _sw_request(this: Session, method, url, url_param = sw_urlparse(url) # ignore trace skywalking self request - if config.protocol == 'http' and config.agent_collector_backend_services.rstrip('/').endswith(url_param.netloc): + if config.agent_protocol == 'http' and config.agent_collector_backend_services.rstrip('/').endswith(url_param.netloc): return _request(this, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, diff --git a/skywalking/plugins/sw_tornado.py b/skywalking/plugins/sw_tornado.py index fd3337ec..ccf5db94 100644 --- a/skywalking/plugins/sw_tornado.py +++ b/skywalking/plugins/sw_tornado.py @@ -84,7 +84,7 @@ async def _sw_get_response(self, *args, **kwargs): peer = '' span.tag(TagHttpMethod(method)) - span.tag(TagHttpURL(f'{request.protocol}://{request.host}{request.path}')) + span.tag(TagHttpURL(f'{request.agent_protocol}://{request.host}{request.path}')) result = old_execute(self, *args, **kwargs) if isawaitable(result): result = await result @@ -118,7 +118,7 @@ def _sw_get_response(self, *args, **kwargs): peer = '' span.tag(TagHttpMethod(method)) - span.tag(TagHttpURL(f'{request.protocol}://{request.host}{request.path}')) + span.tag(TagHttpURL(f'{request.agent_protocol}://{request.host}{request.path}')) result = yield from old_execute(self, *args, **kwargs) span.tag(TagHttpStatusCode(self._status_code)) if self._status_code >= 400: From 2f694fc8b734f3bea606ed0cc1116c9ddcb3db3c Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Wed, 15 Feb 2023 19:00:30 +0000 Subject: [PATCH 13/26] Add more time to CI and fix wrong config --- .github/workflows/CI.yaml | 2 +- poetry.lock | 86 ++++++++++++++++---------------- skywalking/plugins/sw_tornado.py | 4 +- 3 files changed, 46 insertions(+), 46 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index bf513046..608a9d18 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -156,7 +156,7 @@ jobs: ( always() && ! cancelled() ) && ((github.event_name == 'schedule' && github.repository == 'apache/skywalking-python') || needs.changes.outputs.agent == 'true') runs-on: ubuntu-latest - timeout-minutes: 15 + timeout-minutes: 20 strategy: matrix: python-version: [ "3.7", "3.8", "3.9", "3.10" ] diff --git a/poetry.lock b/poetry.lock index dbe6e3fa..8790512f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2250,48 +2250,48 @@ files = [ [[package]] name = "pydantic" -version = "1.10.4" +version = "1.10.5" description = "Data validation and settings management using python type hints" category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "pydantic-1.10.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b5635de53e6686fe7a44b5cf25fcc419a0d5e5c1a1efe73d49d48fe7586db854"}, - {file = "pydantic-1.10.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:6dc1cc241440ed7ca9ab59d9929075445da6b7c94ced281b3dd4cfe6c8cff817"}, - {file = "pydantic-1.10.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51bdeb10d2db0f288e71d49c9cefa609bca271720ecd0c58009bd7504a0c464c"}, - {file = "pydantic-1.10.4-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:78cec42b95dbb500a1f7120bdf95c401f6abb616bbe8785ef09887306792e66e"}, - {file = "pydantic-1.10.4-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:8775d4ef5e7299a2f4699501077a0defdaac5b6c4321173bcb0f3c496fbadf85"}, - {file = "pydantic-1.10.4-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:572066051eeac73d23f95ba9a71349c42a3e05999d0ee1572b7860235b850cc6"}, - {file = "pydantic-1.10.4-cp310-cp310-win_amd64.whl", hash = "sha256:7feb6a2d401f4d6863050f58325b8d99c1e56f4512d98b11ac64ad1751dc647d"}, - {file = "pydantic-1.10.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:39f4a73e5342b25c2959529f07f026ef58147249f9b7431e1ba8414a36761f53"}, - {file = "pydantic-1.10.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:983e720704431a6573d626b00662eb78a07148c9115129f9b4351091ec95ecc3"}, - {file = "pydantic-1.10.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:75d52162fe6b2b55964fbb0af2ee58e99791a3138588c482572bb6087953113a"}, - {file = "pydantic-1.10.4-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fdf8d759ef326962b4678d89e275ffc55b7ce59d917d9f72233762061fd04a2d"}, - {file = "pydantic-1.10.4-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:05a81b006be15655b2a1bae5faa4280cf7c81d0e09fcb49b342ebf826abe5a72"}, - {file = "pydantic-1.10.4-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d88c4c0e5c5dfd05092a4b271282ef0588e5f4aaf345778056fc5259ba098857"}, - {file = "pydantic-1.10.4-cp311-cp311-win_amd64.whl", hash = "sha256:6a05a9db1ef5be0fe63e988f9617ca2551013f55000289c671f71ec16f4985e3"}, - {file = "pydantic-1.10.4-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:887ca463c3bc47103c123bc06919c86720e80e1214aab79e9b779cda0ff92a00"}, - {file = "pydantic-1.10.4-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fdf88ab63c3ee282c76d652fc86518aacb737ff35796023fae56a65ced1a5978"}, - {file = "pydantic-1.10.4-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a48f1953c4a1d9bd0b5167ac50da9a79f6072c63c4cef4cf2a3736994903583e"}, - {file = "pydantic-1.10.4-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:a9f2de23bec87ff306aef658384b02aa7c32389766af3c5dee9ce33e80222dfa"}, - {file = "pydantic-1.10.4-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:cd8702c5142afda03dc2b1ee6bc358b62b3735b2cce53fc77b31ca9f728e4bc8"}, - {file = "pydantic-1.10.4-cp37-cp37m-win_amd64.whl", hash = "sha256:6e7124d6855b2780611d9f5e1e145e86667eaa3bd9459192c8dc1a097f5e9903"}, - {file = "pydantic-1.10.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0b53e1d41e97063d51a02821b80538053ee4608b9a181c1005441f1673c55423"}, - {file = "pydantic-1.10.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:55b1625899acd33229c4352ce0ae54038529b412bd51c4915349b49ca575258f"}, - {file = "pydantic-1.10.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:301d626a59edbe5dfb48fcae245896379a450d04baeed50ef40d8199f2733b06"}, - {file = "pydantic-1.10.4-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b6f9d649892a6f54a39ed56b8dfd5e08b5f3be5f893da430bed76975f3735d15"}, - {file = "pydantic-1.10.4-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:d7b5a3821225f5c43496c324b0d6875fde910a1c2933d726a743ce328fbb2a8c"}, - {file = "pydantic-1.10.4-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:f2f7eb6273dd12472d7f218e1fef6f7c7c2f00ac2e1ecde4db8824c457300416"}, - {file = "pydantic-1.10.4-cp38-cp38-win_amd64.whl", hash = "sha256:4b05697738e7d2040696b0a66d9f0a10bec0efa1883ca75ee9e55baf511909d6"}, - {file = "pydantic-1.10.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a9a6747cac06c2beb466064dda999a13176b23535e4c496c9d48e6406f92d42d"}, - {file = "pydantic-1.10.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:eb992a1ef739cc7b543576337bebfc62c0e6567434e522e97291b251a41dad7f"}, - {file = "pydantic-1.10.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:990406d226dea0e8f25f643b370224771878142155b879784ce89f633541a024"}, - {file = "pydantic-1.10.4-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2e82a6d37a95e0b1b42b82ab340ada3963aea1317fd7f888bb6b9dfbf4fff57c"}, - {file = "pydantic-1.10.4-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:9193d4f4ee8feca58bc56c8306bcb820f5c7905fd919e0750acdeeeef0615b28"}, - {file = "pydantic-1.10.4-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2b3ce5f16deb45c472dde1a0ee05619298c864a20cded09c4edd820e1454129f"}, - {file = "pydantic-1.10.4-cp39-cp39-win_amd64.whl", hash = "sha256:9cbdc268a62d9a98c56e2452d6c41c0263d64a2009aac69246486f01b4f594c4"}, - {file = "pydantic-1.10.4-py3-none-any.whl", hash = "sha256:4948f264678c703f3877d1c8877c4e3b2e12e549c57795107f08cf70c6ec7774"}, - {file = "pydantic-1.10.4.tar.gz", hash = "sha256:b9a3859f24eb4e097502a3be1fb4b2abb79b6103dd9e2e0edb70613a4459a648"}, + {file = "pydantic-1.10.5-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5920824fe1e21cbb3e38cf0f3dd24857c8959801d1031ce1fac1d50857a03bfb"}, + {file = "pydantic-1.10.5-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:3bb99cf9655b377db1a9e47fa4479e3330ea96f4123c6c8200e482704bf1eda2"}, + {file = "pydantic-1.10.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2185a3b3d98ab4506a3f6707569802d2d92c3a7ba3a9a35683a7709ea6c2aaa2"}, + {file = "pydantic-1.10.5-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f582cac9d11c227c652d3ce8ee223d94eb06f4228b52a8adaafa9fa62e73d5c9"}, + {file = "pydantic-1.10.5-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:c9e5b778b6842f135902e2d82624008c6a79710207e28e86966cd136c621bfee"}, + {file = "pydantic-1.10.5-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:72ef3783be8cbdef6bca034606a5de3862be6b72415dc5cb1fb8ddbac110049a"}, + {file = "pydantic-1.10.5-cp310-cp310-win_amd64.whl", hash = "sha256:45edea10b75d3da43cfda12f3792833a3fa70b6eee4db1ed6aed528cef17c74e"}, + {file = "pydantic-1.10.5-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:63200cd8af1af2c07964546b7bc8f217e8bda9d0a2ef0ee0c797b36353914984"}, + {file = "pydantic-1.10.5-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:305d0376c516b0dfa1dbefeae8c21042b57b496892d721905a6ec6b79494a66d"}, + {file = "pydantic-1.10.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1fd326aff5d6c36f05735c7c9b3d5b0e933b4ca52ad0b6e4b38038d82703d35b"}, + {file = "pydantic-1.10.5-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6bb0452d7b8516178c969d305d9630a3c9b8cf16fcf4713261c9ebd465af0d73"}, + {file = "pydantic-1.10.5-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:9a9d9155e2a9f38b2eb9374c88f02fd4d6851ae17b65ee786a87d032f87008f8"}, + {file = "pydantic-1.10.5-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:f836444b4c5ece128b23ec36a446c9ab7f9b0f7981d0d27e13a7c366ee163f8a"}, + {file = "pydantic-1.10.5-cp311-cp311-win_amd64.whl", hash = "sha256:8481dca324e1c7b715ce091a698b181054d22072e848b6fc7895cd86f79b4449"}, + {file = "pydantic-1.10.5-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:87f831e81ea0589cd18257f84386bf30154c5f4bed373b7b75e5cb0b5d53ea87"}, + {file = "pydantic-1.10.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ce1612e98c6326f10888df951a26ec1a577d8df49ddcaea87773bfbe23ba5cc"}, + {file = "pydantic-1.10.5-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:58e41dd1e977531ac6073b11baac8c013f3cd8706a01d3dc74e86955be8b2c0c"}, + {file = "pydantic-1.10.5-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:6a4b0aab29061262065bbdede617ef99cc5914d1bf0ddc8bcd8e3d7928d85bd6"}, + {file = "pydantic-1.10.5-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:36e44a4de37b8aecffa81c081dbfe42c4d2bf9f6dff34d03dce157ec65eb0f15"}, + {file = "pydantic-1.10.5-cp37-cp37m-win_amd64.whl", hash = "sha256:261f357f0aecda005934e413dfd7aa4077004a174dafe414a8325e6098a8e419"}, + {file = "pydantic-1.10.5-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:b429f7c457aebb7fbe7cd69c418d1cd7c6fdc4d3c8697f45af78b8d5a7955760"}, + {file = "pydantic-1.10.5-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:663d2dd78596c5fa3eb996bc3f34b8c2a592648ad10008f98d1348be7ae212fb"}, + {file = "pydantic-1.10.5-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51782fd81f09edcf265823c3bf43ff36d00db246eca39ee765ef58dc8421a642"}, + {file = "pydantic-1.10.5-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c428c0f64a86661fb4873495c4fac430ec7a7cef2b8c1c28f3d1a7277f9ea5ab"}, + {file = "pydantic-1.10.5-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:76c930ad0746c70f0368c4596020b736ab65b473c1f9b3872310a835d852eb19"}, + {file = "pydantic-1.10.5-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:3257bd714de9db2102b742570a56bf7978e90441193acac109b1f500290f5718"}, + {file = "pydantic-1.10.5-cp38-cp38-win_amd64.whl", hash = "sha256:f5bee6c523d13944a1fdc6f0525bc86dbbd94372f17b83fa6331aabacc8fd08e"}, + {file = "pydantic-1.10.5-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:532e97c35719f137ee5405bd3eeddc5c06eb91a032bc755a44e34a712420daf3"}, + {file = "pydantic-1.10.5-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ca9075ab3de9e48b75fa8ccb897c34ccc1519177ad8841d99f7fd74cf43be5bf"}, + {file = "pydantic-1.10.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd46a0e6296346c477e59a954da57beaf9c538da37b9df482e50f836e4a7d4bb"}, + {file = "pydantic-1.10.5-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3353072625ea2a9a6c81ad01b91e5c07fa70deb06368c71307529abf70d23325"}, + {file = "pydantic-1.10.5-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:3f9d9b2be177c3cb6027cd67fbf323586417868c06c3c85d0d101703136e6b31"}, + {file = "pydantic-1.10.5-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:b473d00ccd5c2061fd896ac127b7755baad233f8d996ea288af14ae09f8e0d1e"}, + {file = "pydantic-1.10.5-cp39-cp39-win_amd64.whl", hash = "sha256:5f3bc8f103b56a8c88021d481410874b1f13edf6e838da607dcb57ecff9b4594"}, + {file = "pydantic-1.10.5-py3-none-any.whl", hash = "sha256:7c5b94d598c90f2f46b3a983ffb46ab806a67099d118ae0da7ef21a2a4033b28"}, + {file = "pydantic-1.10.5.tar.gz", hash = "sha256:9e337ac83686645a46db0e825acceea8e02fca4062483f40e9ae178e8bd1103a"}, ] [package.dependencies] @@ -2724,14 +2724,14 @@ files = [ [[package]] name = "setuptools" -version = "67.2.0" +version = "67.3.2" description = "Easily download, build, install, upgrade, and uninstall Python packages" category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "setuptools-67.2.0-py3-none-any.whl", hash = "sha256:16ccf598aab3b506593c17378473978908a2734d7336755a8769b480906bec1c"}, - {file = "setuptools-67.2.0.tar.gz", hash = "sha256:b440ee5f7e607bb8c9de15259dba2583dd41a38879a7abc1d43a71c59524da48"}, + {file = "setuptools-67.3.2-py3-none-any.whl", hash = "sha256:bb6d8e508de562768f2027902929f8523932fcd1fb784e6d573d2cafac995a48"}, + {file = "setuptools-67.3.2.tar.gz", hash = "sha256:95f00380ef2ffa41d9bba85d95b27689d923c93dfbafed4aecd7cf988a25e012"}, ] [package.extras] @@ -2973,14 +2973,14 @@ files = [ [[package]] name = "typing-extensions" -version = "4.4.0" +version = "4.5.0" description = "Backported and Experimental Type Hints for Python 3.7+" category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "typing_extensions-4.4.0-py3-none-any.whl", hash = "sha256:16fa4864408f655d35ec496218b85f79b3437c829e93320c7c9215ccfd92489e"}, - {file = "typing_extensions-4.4.0.tar.gz", hash = "sha256:1511434bb92bf8dd198c12b1cc812e800d4181cfcb867674e0f8279cc93087aa"}, + {file = "typing_extensions-4.5.0-py3-none-any.whl", hash = "sha256:fb33085c39dd998ac16d1431ebc293a8b3eedd00fd4a32de0ff79002c19511b4"}, + {file = "typing_extensions-4.5.0.tar.gz", hash = "sha256:5cb5f4a79139d699607b3ef622a1dedafa84e115ab0024e0d9c044a9479ca7cb"}, ] [[package]] diff --git a/skywalking/plugins/sw_tornado.py b/skywalking/plugins/sw_tornado.py index ccf5db94..fd3337ec 100644 --- a/skywalking/plugins/sw_tornado.py +++ b/skywalking/plugins/sw_tornado.py @@ -84,7 +84,7 @@ async def _sw_get_response(self, *args, **kwargs): peer = '' span.tag(TagHttpMethod(method)) - span.tag(TagHttpURL(f'{request.agent_protocol}://{request.host}{request.path}')) + span.tag(TagHttpURL(f'{request.protocol}://{request.host}{request.path}')) result = old_execute(self, *args, **kwargs) if isawaitable(result): result = await result @@ -118,7 +118,7 @@ def _sw_get_response(self, *args, **kwargs): peer = '' span.tag(TagHttpMethod(method)) - span.tag(TagHttpURL(f'{request.agent_protocol}://{request.host}{request.path}')) + span.tag(TagHttpURL(f'{request.protocol}://{request.host}{request.path}')) result = yield from old_execute(self, *args, **kwargs) span.tag(TagHttpStatusCode(self._status_code)) if self._status_code >= 400: From dc2d545c00badb573f0b8b080c9b08e642ff744a Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Wed, 15 Feb 2023 20:04:49 +0000 Subject: [PATCH 14/26] Finalize --- demo/README.md | 8 +++ demo/__init__.py | 14 +++++ demo/docker-compose.yaml | 106 ++++++++++++++++++++++++++++++++++ demo/flask_consumer_fork.py | 47 +++++++++++++++ demo/flask_provider_single.py | 40 +++++++++++++ docs/en/setup/Installation.md | 3 + skywalking/agent/__init__.py | 3 +- skywalking/config.py | 2 +- 8 files changed, 221 insertions(+), 2 deletions(-) create mode 100644 demo/README.md create mode 100644 demo/__init__.py create mode 100644 demo/docker-compose.yaml create mode 100644 demo/flask_consumer_fork.py create mode 100644 demo/flask_provider_single.py diff --git a/demo/README.md b/demo/README.md new file mode 100644 index 00000000..0e84fe55 --- /dev/null +++ b/demo/README.md @@ -0,0 +1,8 @@ +# Manual Test + +Edge cases on advanced features would benefit from a manual testing process. + +This directory holds some utils and scripts that are convenient for such use cases. + +## Docker-compose.yaml +This docker-compose.yaml spins up a fresh Apache SkyWalking instance along with UI (localhost:8080) and SW_CTL CLI for you to verify. diff --git a/demo/__init__.py b/demo/__init__.py new file mode 100644 index 00000000..ae1e83ee --- /dev/null +++ b/demo/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/demo/docker-compose.yaml b/demo/docker-compose.yaml new file mode 100644 index 00000000..ed1d4851 --- /dev/null +++ b/demo/docker-compose.yaml @@ -0,0 +1,106 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +services: + oap: + container_name: oap + image: apache/skywalking-oap-server:9.3.0 + # Python agent supports gRPC/ HTTP/ Kafka reporting + expose: + - 11800 # gRPC + - 12800 # HTTP + networks: + - manual + environment: + SW_KAFKA_FETCHER: default + SW_KAFKA_FETCHER_SERVERS: kafka:9092 + SW_KAFKA_FETCHER_PARTITIONS: 2 + SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1 + healthcheck: + test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/11800" ] + interval: 5s + timeout: 60s + retries: 120 + ports: + - "12800:12800" + - "11800:11800" + depends_on: + - kafka + + + ui: + image: apache/skywalking-ui:9.3.0 + container_name: ui + depends_on: + oap: + condition: service_healthy + networks: + - manual + ports: + - "8080:8080" + environment: + SW_OAP_ADDRESS: "http://oap:12800" + + zookeeper: + container_name: zk + image: confluentinc/cp-zookeeper:latest + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + networks: + - manual + + kafka: + container_name: kafka + image: confluentinc/cp-kafka + expose: + - 9092 + - 9094 + ports: + - 9092:9092 + - 9094:9094 + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + networks: + - manual + +# +# kafka-ui: +# image: provectuslabs/kafka-ui +# container_name: kafka-ui +# ports: +# - "8088:8080" +# restart: always +# environment: +# - KAFKA_CLUSTERS_0_NAME=local +# - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 +# depends_on: +# - kafka +# networks: +# - manual + +networks: + manual: diff --git a/demo/flask_consumer_fork.py b/demo/flask_consumer_fork.py new file mode 100644 index 00000000..6981f9ac --- /dev/null +++ b/demo/flask_consumer_fork.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from flask import Flask +from skywalking import agent, config +import requests + +# Profiling only available in gRPC, meter only in kafka + grpc +config.init(agent_collector_backend_services='localhost:11800', agent_protocol='grpc', + agent_name='great-app-consumer-grpc', + kafka_bootstrap_servers='localhost:9094', # If you use kafka, set this + agent_instance_name='instance-01', + agent_experimental_fork_support=True, agent_logging_level='DEBUG', agent_log_reporter_active=True, + agent_meter_reporter_active=True, + agent_profile_active=True) + +agent.start() + +parent_pid = os.getpid() +pid = os.fork() + +app = Flask(__name__) + + +@app.route('/', methods=['POST', 'GET']) +def application(): + res = requests.get('http://localhost:9999') + return res.json() + + +if __name__ == '__main__': + PORT = 9097 if pid == 0 else 9098 # 0 is child process + app.run(host='0.0.0.0', port=PORT, debug=False) # RELOADER IS ALSO FORKED diff --git a/demo/flask_provider_single.py b/demo/flask_provider_single.py new file mode 100644 index 00000000..54576f57 --- /dev/null +++ b/demo/flask_provider_single.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from flask import Flask, jsonify +from skywalking import agent, config + +config.init(agent_collector_backend_services='localhost:11800', agent_protocol='grpc', + agent_name='great-app-provider-grpc', + kafka_bootstrap_servers='localhost:9094', # If you use kafka, set this + agent_instance_name='instance-01', + agent_experimental_fork_support=True, + agent_logging_level='DEBUG', + agent_log_reporter_active=True, + agent_meter_reporter_active=True, + agent_profile_active=True) + + +agent.start() + +app = Flask(__name__) + + +@app.route('/', methods=['POST', 'GET']) +def application(): + return jsonify({'status': 'ok'}) + + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=9999, debug=True, use_reloader=False) diff --git a/docs/en/setup/Installation.md b/docs/en/setup/Installation.md index 9a9c042a..ff42bc62 100644 --- a/docs/en/setup/Installation.md +++ b/docs/en/setup/Installation.md @@ -36,6 +36,9 @@ from where you can use `pip` to install: # Install the latest version, using the default gRPC protocol to report data to OAP pip install "apache-skywalking" +# Install support for every protocol (gRPC, HTTP, Kafka) +pip install "apache-skywalking[all]" + # Install the latest version, using the http protocol to report data to OAP pip install "apache-skywalking[http]" diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index ba1ff0b0..5e65ac8e 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -173,7 +173,7 @@ def __fork_before() -> None: 'please report issues if you encounter any.') @staticmethod # for now - def __fork_after_in_parent(self) -> None: + def __fork_after_in_parent() -> None: """ Something to do after fork() in parent process """ @@ -184,6 +184,7 @@ def __fork_after_in_child(self) -> None: Simply restart the agent after we detect a fork() call """ self.start() + logger.info('SkyWalking Python agent spawned in child after fork() call.') def start(self) -> None: """ diff --git a/skywalking/config.py b/skywalking/config.py index a6ded2ac..ef05df30 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -213,7 +213,7 @@ def init(**kwargs) -> None: """ Used to initialize the configuration of the SkyWalking Python Agent. Refer to the official online documentation - https://skywalking.apache.org/docs/skywalking-python/next/en/setup/configurations/ + https://skywalking.apache.org/docs/skywalking-python/next/en/setup/configuration/ for more information on the configuration options. Args: From a97f12c0cb38a2bc3bc3726c35d6fd4c088194c1 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Wed, 15 Feb 2023 20:05:50 +0000 Subject: [PATCH 15/26] Modify changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3d4788f..ccb9eb24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ - Add support for the tags of Virtual Cache for Redis (#263) - Add a new configuration `kafka_namespace` to prefix the kafka topic names (#277) - Add log reporter support for loguru (#276) - - Add **experimental** support for explicit os.fork(), restarts agent in new process (#278) + - Add **experimental** support for explicit os.fork(), restarts agent in new process (#284) - Plugins: - Add aioredis, aiormq, amqp, asyncpg, aio-pika, kombu RMQ plugins (#230 Missing test coverage) From b381739ba04056715be7fa1ae5dd98f9a98fca66 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Wed, 15 Feb 2023 20:07:19 +0000 Subject: [PATCH 16/26] Modify changelog --- .github/workflows/CI.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 608a9d18..2ad95638 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -197,7 +197,7 @@ jobs: timeout-minutes: 10 strategy: matrix: - python-image-variant: [ "3.7", "3.7-slim" ] + python-image-variant: [ "3.7", "3.8-slim", "3.9-slim", "3.10-slim", "3.11-slim" ] fail-fast: false env: BASE_PYTHON_IMAGE: ${{ matrix.python-image-variant }} @@ -228,7 +228,7 @@ jobs: timeout-minutes: 20 strategy: matrix: - python-image-variant: [ "3.7", "3.7-slim" ] + python-image-variant: [ "3.7", "3.8-slim", "3.9-slim", "3.10-slim", "3.11-slim" ] case: - name: gRPC path: tests/e2e/case/grpc/e2e.yaml From 7f23a7670014d6fc869b36c1ae8583a15ce820be Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Wed, 15 Feb 2023 20:07:40 +0000 Subject: [PATCH 17/26] Modify changelog --- .github/workflows/CI.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 2ad95638..bde337e2 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -197,7 +197,7 @@ jobs: timeout-minutes: 10 strategy: matrix: - python-image-variant: [ "3.7", "3.8-slim", "3.9-slim", "3.10-slim", "3.11-slim" ] + python-image-variant: [ "3.7", "3.8-slim", "3.9-slim", "3.10-slim" ] fail-fast: false env: BASE_PYTHON_IMAGE: ${{ matrix.python-image-variant }} @@ -228,7 +228,7 @@ jobs: timeout-minutes: 20 strategy: matrix: - python-image-variant: [ "3.7", "3.8-slim", "3.9-slim", "3.10-slim", "3.11-slim" ] + python-image-variant: [ "3.7", "3.8-slim", "3.9-slim", "3.10-slim" ] case: - name: gRPC path: tests/e2e/case/grpc/e2e.yaml From c2afcadb4ed6106d6f7a2745278f55ccb34aea54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Superskyyy=20=28AWAY=2C=20busy=20graduating=20=7C=20Debug?= =?UTF-8?q?=20=E4=BA=BA=29?= Date: Wed, 15 Feb 2023 16:34:20 -0500 Subject: [PATCH 18/26] Revert --- .lift/config.toml | 2 +- tests/e2e/script/prepare/install-yq.sh | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.lift/config.toml b/.lift/config.toml index c663402b..8c2da8f0 100644 --- a/.lift/config.toml +++ b/.lift/config.toml @@ -16,4 +16,4 @@ # specific language governing permissions and limitations # under the License. # -ignoreRules = [ "Unused ignore", "Invalid decoration", "blacklist", "Missing argument", "hardcoded_bind_all_interfaces" ] +ignoreRules = [ "Unused ignore", "Invalid decoration", "blacklist", "Missing argument", "hardcoded_bind_all_interfaces", "B104", "B201" ] diff --git a/tests/e2e/script/prepare/install-yq.sh b/tests/e2e/script/prepare/install-yq.sh index cc103baf..2f69839f 100644 --- a/tests/e2e/script/prepare/install-yq.sh +++ b/tests/e2e/script/prepare/install-yq.sh @@ -23,5 +23,8 @@ BASE_DIR=$1 BIN_DIR=$2 if ! command -v yq &> /dev/null; then - snap install yq + mkdir -p $BASE_DIR/yq && cd $BASE_DIR/yq + curl -kLo yq.tar.gz https://github.com/mikefarah/yq/archive/v4.29.2.tar.gz + tar -zxf yq.tar.gz --strip=1 + go install && go build -ldflags -s && cp yq $BIN_DIR/ fi From 1e4cd1cbee8fce36553fd67eec21e6154adfb2d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Superskyyy=20=28AWAY=2C=20busy=20graduating=20=7C=20Debug?= =?UTF-8?q?=20=E4=BA=BA=29?= Date: Wed, 15 Feb 2023 16:47:40 -0500 Subject: [PATCH 19/26] Fix doc --- .github/workflows/CI.yaml | 8 ++++---- docs/en/setup/faq/How-to-use-with-uwsgi.md | 2 ++ tests/e2e/script/env | 6 +++--- tests/plugin/base.py | 6 ------ 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index bde337e2..c9bc89a3 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -184,7 +184,7 @@ jobs: - name: Run unit tests run: | make env - poetry run pytest -x -v ${{ matrix.test-path }} + poetry run pytest -v ${{ matrix.test-path }} docker-e2e: # build docker image for E2E tests, single Python version for now. @@ -197,7 +197,7 @@ jobs: timeout-minutes: 10 strategy: matrix: - python-image-variant: [ "3.7", "3.8-slim", "3.9-slim", "3.10-slim" ] + python-image-variant: [ "3.7-slim", "3.8-slim", "3.9-slim", "3.10-slim" ] fail-fast: false env: BASE_PYTHON_IMAGE: ${{ matrix.python-image-variant }} @@ -228,7 +228,7 @@ jobs: timeout-minutes: 20 strategy: matrix: - python-image-variant: [ "3.7", "3.8-slim", "3.9-slim", "3.10-slim" ] + python-image-variant: [ "3.7-slim", "3.8-slim", "3.9-slim", "3.10-slim" ] case: - name: gRPC path: tests/e2e/case/grpc/e2e.yaml @@ -254,7 +254,7 @@ jobs: - name: Load docker images run: find docker-images -name "*.tar" -exec docker load -i {} \; - name: Run E2E Tests - uses: superskyyy/skywalking-infra-e2e@main + uses: apache/skywalking-infra-e2e@2631e76926604c4e30ca170bed916804c86980b6 with: log-dir: /tmp/e2e-logs e2e-file: ${{ matrix.case.path }} diff --git a/docs/en/setup/faq/How-to-use-with-uwsgi.md b/docs/en/setup/faq/How-to-use-with-uwsgi.md index a56ae416..d6d83619 100644 --- a/docs/en/setup/faq/How-to-use-with-uwsgi.md +++ b/docs/en/setup/faq/How-to-use-with-uwsgi.md @@ -16,6 +16,8 @@ from uwsgidecorators import postfork @postfork def init_tracing(): + # Important: The import of skywalking should be inside the postfork function + from skywalking import agent, config config.init(agent_collector_backend_services='127.0.0.1:11800', agent_name='your awesome service') agent.start() diff --git a/tests/e2e/script/env b/tests/e2e/script/env index adb6ad0e..91aa97fe 100644 --- a/tests/e2e/script/env +++ b/tests/e2e/script/env @@ -13,6 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# 2022-11-05 commit -SW_CTL_COMMIT=521843f963917aa806740a9ad09c65aa59aca179 -SW_OAP_COMMIT=93d021ab0bbffa6cfa73adacdcbbf9e25f8016be \ No newline at end of file +# 2022-02-15 commit +SW_CTL_COMMIT=0883266bfaa36612927b69e35781b64ea181758d +SW_OAP_COMMIT=574b83f095861d4199fdb78aa52923765cf921a1 \ No newline at end of file diff --git a/tests/plugin/base.py b/tests/plugin/base.py index 5d1c2826..b61ece0b 100644 --- a/tests/plugin/base.py +++ b/tests/plugin/base.py @@ -57,12 +57,6 @@ def validate(self, expected_file_name=None): )) print('diff list: ') - print('----------') - print('-actual') - print(actual_data.splitlines(keepends=True)) - print('----------') - print('-expected') - print(yaml.dump(yaml.load(expected_data, Loader=Loader)).splitlines(keepends=True)) sys.stdout.writelines(diff_list) assert response.status_code == 200 From 5a92d93d51825dcb14d8eab812469e6c4407f330 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Superskyyy=20=28AWAY=2C=20busy=20graduating=20=7C=20Debug?= =?UTF-8?q?=20=E4=BA=BA=29?= Date: Wed, 15 Feb 2023 17:01:52 -0500 Subject: [PATCH 20/26] FIx redis --- skywalking/plugins/sw_redis.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/skywalking/plugins/sw_redis.py b/skywalking/plugins/sw_redis.py index 87b9aad6..beab3df0 100644 --- a/skywalking/plugins/sw_redis.py +++ b/skywalking/plugins/sw_redis.py @@ -22,7 +22,7 @@ link_vector = ['https://github.com/andymccurdy/redis-py/'] support_matrix = { 'redis': { - '>=3.7': ['3.5'] + '>=3.7': ['3.5.*', '4.5.1'] } } note = """Known incompatibility: Redis lib 4.0+ args length is no longer correct""" @@ -49,7 +49,14 @@ def install(): def _sw_send_command(this: Connection, *args, **kwargs): peer = f'{this.host}:{this.port}' - cmd, key = args[0], args[1] + + if len(args) == 1: + cmd = args[0] + key = '' + elif len(args) > 1: + cmd, key = args[0], args[1] + else: # just to be safe + cmd, key = '' if cmd in OPERATIONS_WRITE: op = 'write' From 2122b88b328a677a780248d840df7e1c747f3c56 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Wed, 15 Feb 2023 22:33:14 +0000 Subject: [PATCH 21/26] Lint --- docs/en/setup/Plugins.md | 2 +- poetry.lock | 2 +- pyproject.toml | 2 +- skywalking/plugins/sw_redis.py | 4 ++-- skywalking/plugins/sw_websockets.py | 1 - tests/plugin/http/sw_websockets/services/consumer.py | 4 ++-- tests/plugin/web/sw_fastapi/services/consumer.py | 4 ++-- 7 files changed, 9 insertions(+), 10 deletions(-) diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md index 06c8e6ec..c854cdd0 100644 --- a/docs/en/setup/Plugins.md +++ b/docs/en/setup/Plugins.md @@ -39,7 +39,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!) | [pymysql](https://pymysql.readthedocs.io/en/latest/) | Python >=3.7 - ['1.0']; | `sw_pymysql` | | [pyramid](https://trypyramid.com) | Python >=3.7 - ['1.10', '2.0']; | `sw_pyramid` | | [pika](https://pika.readthedocs.io) | Python >=3.7 - ['1.2']; | `sw_rabbitmq` | -| [redis](https://github.com/andymccurdy/redis-py/) | Python >=3.7 - ['3.5']; | `sw_redis` | +| [redis](https://github.com/andymccurdy/redis-py/) | Python >=3.7 - ['3.5.*', '4.5.1']; | `sw_redis` | | [requests](https://requests.readthedocs.io/en/master/) | Python >=3.7 - ['2.26', '2.25']; | `sw_requests` | | [sanic](https://sanic.readthedocs.io/en/latest) | Python >=3.10 - NOT SUPPORTED YET; Python >=3.7 - ['20.12']; | `sw_sanic` | | [tornado](https://www.tornadoweb.org) | Python >=3.7 - ['6.0', '6.1']; | `sw_tornado` | diff --git a/poetry.lock b/poetry.lock index 8790512f..615d0af8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3639,4 +3639,4 @@ kafka = ["kafka-python"] [metadata] lock-version = "2.0" python-versions = ">=3.7, <3.11" -content-hash = "1d42154067478257d0ac1a7c065cd03e81219992808471f6b04cd45d53cc6f3f" +content-hash = "b7a473e551e75a52dc16fcbf17e54b42f5515f4f78d909d0d1ee4762c92bdff4" diff --git a/pyproject.toml b/pyproject.toml index 208ca480..969e544c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -130,7 +130,7 @@ loguru = "^0.6.0" httpx = "^0.23.3" [tool.poetry.group.lint.dependencies] -pylint = '*' +pylint = '2.13.9' flake8 = "^5.0.4" # isort = "^5.10.1" unify = "^0.5" diff --git a/skywalking/plugins/sw_redis.py b/skywalking/plugins/sw_redis.py index beab3df0..72621b1e 100644 --- a/skywalking/plugins/sw_redis.py +++ b/skywalking/plugins/sw_redis.py @@ -55,8 +55,8 @@ def _sw_send_command(this: Connection, *args, **kwargs): key = '' elif len(args) > 1: cmd, key = args[0], args[1] - else: # just to be safe - cmd, key = '' + else: # just to be safe + cmd = key = '' if cmd in OPERATIONS_WRITE: op = 'write' diff --git a/skywalking/plugins/sw_websockets.py b/skywalking/plugins/sw_websockets.py index 084160b5..128fde3f 100644 --- a/skywalking/plugins/sw_websockets.py +++ b/skywalking/plugins/sw_websockets.py @@ -73,7 +73,6 @@ async def _sw_protocol_handshake_client(self, wsuri, finally: span.tag(TagHttpStatusMsg(status_msg)) - WebSocketClientProtocol.handshake = _sw_protocol_handshake_client # To trace per message transactions diff --git a/tests/plugin/http/sw_websockets/services/consumer.py b/tests/plugin/http/sw_websockets/services/consumer.py index 21e96405..204d7f7a 100644 --- a/tests/plugin/http/sw_websockets/services/consumer.py +++ b/tests/plugin/http/sw_websockets/services/consumer.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import websockets +from websockets.client import connect import asyncio @@ -26,7 +26,7 @@ @app.get('/ws') async def websocket_ping(): - async with websockets.connect('ws://provider:9091/ws', extra_headers=None) as websocket: + async with connect('ws://provider:9091/ws', extra_headers=None) as websocket: await websocket.send('Ping') response = await websocket.recv() diff --git a/tests/plugin/web/sw_fastapi/services/consumer.py b/tests/plugin/web/sw_fastapi/services/consumer.py index 627bf6cc..39d144a3 100644 --- a/tests/plugin/web/sw_fastapi/services/consumer.py +++ b/tests/plugin/web/sw_fastapi/services/consumer.py @@ -15,7 +15,7 @@ # limitations under the License. # import requests -import websockets +from websockets.client import connect import asyncio @@ -32,7 +32,7 @@ async def application(): return {'http': res.json(), 'websocket': websocket_pong} async def websocket_ping(): - async with websockets.connect('ws://provider:9091/ws', extra_headers=None) as websocket: + async with connect('ws://provider:9091/ws', extra_headers=None) as websocket: await websocket.send('Ping') response = await websocket.recv() From 4542e4e73ec2140697a46b63d4eb339ba449ef98 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Thu, 16 Feb 2023 04:57:09 +0000 Subject: [PATCH 22/26] intentionally cause hanging epoll1 --- .github/workflows/CI.yaml | 6 +++--- skywalking/agent/__init__.py | 14 +++++++------- tests/plugin/base.py | 9 ++++++--- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index c9bc89a3..9dcf9be6 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -25,9 +25,9 @@ on: schedule: - cron: '0 18 * * *' -concurrency: - group: CI-plugin-e2e-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true +#concurrency: +# group: CI-plugin-e2e-${{ github.event.pull_request.number || github.ref }} +# cancel-in-progress: true jobs: license-and-lint: diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 5e65ac8e..3f04b929 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -200,19 +200,19 @@ def start(self) -> None: # since 3.6 is EOL, we will not officially support it logger.warning('SkyWalking Python agent does not support Python 3.6 and below, ' 'please upgrade to Python 3.7 or above.') - if python_version[0] == 3 and python_version[1] > 10: - logger.warning('SkyWalking Python agent does not support Python 3.11 and above, ' - 'though it is likely to work properly, official support is not offered yet.') - # This is required for grpcio to work with fork() + # Below is required for grpcio to work with fork() # https://github.com/grpc/grpc/blob/master/doc/fork_support.md - # This is not available in Python 3.7 due to hanging issue + # This is not available in Python 3.7 due to frequent hanging issue + # It doesn't mean other Python versions will not hang, but chances seem low # https://github.com/grpc/grpc/issues/18075 + os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' + os.environ['GRPC_POLL_STRATEGY'] = 'epoll1' if config.agent_protocol == 'grpc' and config.agent_experimental_fork_support: python_version: tuple = sys.version_info[:2] if python_version[0] == 3 and python_version[1] == 7: - raise RuntimeError('gRPC fork support is not safe on Python 3.7 and can cause hanging. ' + raise RuntimeError('gRPC fork support is not safe on Python 3.7 and can cause subprocess hanging. ' 'See: https://github.com/grpc/grpc/issues/18075.' - 'Please either upgrade to Python 3.8+ (recommended), ' + 'Please either upgrade to Python 3.8+ (though hanging could still happen but rare), ' 'or use HTTP/Kafka protocol, or disable experimental fork support.') os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' diff --git a/tests/plugin/base.py b/tests/plugin/base.py index b61ece0b..64634e38 100644 --- a/tests/plugin/base.py +++ b/tests/plugin/base.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import time import inspect import os import sys @@ -38,13 +38,16 @@ def validate(self, expected_file_name=None): if expected_file_name is None: expected_file_name = os.path.join(dirname(inspect.getfile(self.__class__)), 'expected.data.yml') - # time.sleep(10) - with open(expected_file_name) as expected_data_file: expected_data = os.linesep.join(expected_data_file.readlines()) response = requests.post(url='http://localhost:12800/dataValidate', data=expected_data) + if response.status_code != 200: + # heuristically retry once + time.sleep(10) + response = requests.post(url='http://localhost:12800/dataValidate', data=expected_data) + if response.status_code != 200: res = requests.get('http://localhost:12800/receiveData') From 23fb2b92322ab6406054212463f550f125be83d5 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Thu, 16 Feb 2023 04:57:28 +0000 Subject: [PATCH 23/26] intentionally cause hanging poll --- skywalking/agent/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 3f04b929..4f96deb0 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -206,7 +206,7 @@ def start(self) -> None: # It doesn't mean other Python versions will not hang, but chances seem low # https://github.com/grpc/grpc/issues/18075 os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' - os.environ['GRPC_POLL_STRATEGY'] = 'epoll1' + os.environ['GRPC_POLL_STRATEGY'] = 'poll' if config.agent_protocol == 'grpc' and config.agent_experimental_fork_support: python_version: tuple = sys.version_info[:2] if python_version[0] == 3 and python_version[1] == 7: From 55af1c68720adbd0e8798970c3802fd4daf9d50b Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Thu, 16 Feb 2023 04:57:47 +0000 Subject: [PATCH 24/26] intentionally cause hanging default --- skywalking/agent/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 4f96deb0..2eab05dc 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -206,7 +206,7 @@ def start(self) -> None: # It doesn't mean other Python versions will not hang, but chances seem low # https://github.com/grpc/grpc/issues/18075 os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' - os.environ['GRPC_POLL_STRATEGY'] = 'poll' + # os.environ['GRPC_POLL_STRATEGY'] = 'poll' if config.agent_protocol == 'grpc' and config.agent_experimental_fork_support: python_version: tuple = sys.version_info[:2] if python_version[0] == 3 and python_version[1] == 7: From cfff511a6c43c51958eecc9543eabe75415271c7 Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Thu, 16 Feb 2023 05:40:54 +0000 Subject: [PATCH 25/26] Revert test and finalize --- .github/workflows/CI.yaml | 6 +++--- skywalking/agent/__init__.py | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index ca9d45c4..c4d4a8ec 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -25,9 +25,9 @@ on: schedule: - cron: '0 18 * * *' -#concurrency: -# group: CI-plugin-e2e-${{ github.event.pull_request.number || github.ref }} -# cancel-in-progress: true +concurrency: + group: CI-plugin-e2e-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true jobs: license-and-lint: diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 2eab05dc..e115328b 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -205,8 +205,6 @@ def start(self) -> None: # This is not available in Python 3.7 due to frequent hanging issue # It doesn't mean other Python versions will not hang, but chances seem low # https://github.com/grpc/grpc/issues/18075 - os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' - # os.environ['GRPC_POLL_STRATEGY'] = 'poll' if config.agent_protocol == 'grpc' and config.agent_experimental_fork_support: python_version: tuple = sys.version_info[:2] if python_version[0] == 3 and python_version[1] == 7: From 75d6d4cd0629c7ba70f152b7d5ebd680f2338a4b Mon Sep 17 00:00:00 2001 From: Superskyyy Date: Thu, 16 Feb 2023 05:52:18 +0000 Subject: [PATCH 26/26] Revert test and finalize --- CHANGELOG.md | 9 ++++----- skywalking/plugins/sw_redis.py | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d19d993..34843623 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,12 +3,11 @@ ### 1.0.0 - **Important Note and Breaking Changes:** - - Python 3.6 is no longer supported and may not function properly, Python 3.11 support is added and tested. - - A number of common configuration options (environment variables) are renamed to follow the convention of Java agent, + - **BREAKING**: Python 3.6 is no longer supported and may not function properly, Python 3.11 support is added and tested. + - **BREAKING**: A number of common configuration options and environment variables are renamed to follow the convention of Java agent, please check with the latest official documentation before upgrading. (#273, #282) - https://skywalking.apache.org/docs/skywalking-python/v1.0.0/en/setup/configuration/ - - All agent core capabilities are now covered by test cases and enabled by default (Trace, Log, PVM runtime metrics, Profiler) + - **BREAKING**: All agent core capabilities are now covered by test cases and enabled by default (Trace, Log, PVM runtime metrics, Profiler) - Feature: @@ -20,7 +19,7 @@ - Add support for the tags of Virtual Cache for Redis (#263) - Add a new configuration `kafka_namespace` to prefix the kafka topic names (#277) - Add log reporter support for loguru (#276) - - Add **experimental** support for explicit os.fork(), restarts agent in new process (#284) + - Add **experimental** support for explicit os.fork(), restarts agent in new process (#286) - Plugins: - Add aioredis, aiormq, amqp, asyncpg, aio-pika, kombu RMQ plugins (#230 Missing test coverage) diff --git a/skywalking/plugins/sw_redis.py b/skywalking/plugins/sw_redis.py index 72621b1e..ad5ae36c 100644 --- a/skywalking/plugins/sw_redis.py +++ b/skywalking/plugins/sw_redis.py @@ -25,7 +25,7 @@ '>=3.7': ['3.5.*', '4.5.1'] } } -note = """Known incompatibility: Redis lib 4.0+ args length is no longer correct""" +note = """""" OPERATIONS_WRITE = {'GETSET', 'SET', 'SETBIT', 'SETEX ', 'SETNX ', 'SETRANGE', 'STRLEN ', 'MSET', 'MSETNX ', 'PSETEX', 'INCR ', 'INCRBY ', 'INCRBYFLOAT', 'DECR ', 'DECRBY ', 'APPEND ', 'HMSET', 'HSET', 'HSETNX ',