Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion skywalking/agent/protocol/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import logging
from skywalking.loggings import logger
import traceback
from queue import Queue
Expand Down Expand Up @@ -62,7 +63,7 @@ def connected(self):
return self.state == grpc.ChannelConnectivity.READY

def on_error(self):
traceback.print_exc()
traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None
self.channel.unsubscribe(self._cb)
self.channel.subscribe(self._cb, try_to_connect=True)

Expand Down
45 changes: 35 additions & 10 deletions skywalking/trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#

import threading
from typing import List

from skywalking import agent, config
Expand Down Expand Up @@ -111,12 +110,13 @@ def start(self, span: Span):
self.spans.append(span)

def stop(self, span: Span) -> bool:
assert span is self.spans[-1]
idx = self.spans.index(span) # span SHOULD always be at end but in async-world it MAY not be, so don't crash

span.finish(self.segment) and self.spans.pop()
if span.finish(self.segment):
del self.spans[idx]

if len(self.spans) == 0:
_thread_local.context = None
_local().context = None
agent.archive(self.segment)

return len(self.spans) == 0
Expand Down Expand Up @@ -212,13 +212,38 @@ def continued(self, snapshot: 'Snapshot'):
self._correlation.update(snapshot.correlation)


_thread_local = threading.local()
_thread_local.context = None
try: # attempt to use async-local instead of thread-local context
import contextvars

__local = contextvars.ContextVar('local')

class AsyncLocal:
pass

def _local():
try:
return __local.get()

except LookupError:
local = AsyncLocal()
__local.set(local)

return local

except ImportError:
import threading

__local = threading.local()

def _local():
return __local


def get_context() -> SpanContext:
if not hasattr(_thread_local, 'context'):
_thread_local.context = None
_thread_local.context = _thread_local.context or (SpanContext() if agent.connected() else NoopContext())
local = _local()
context = getattr(local, 'context', False)

if not context:
context = local.context = (SpanContext() if agent.connected() else NoopContext())

return _thread_local.context
return context