Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
# limitations under the License.
#

from skywalking.loggings import logger
import atexit
from queue import Queue
from threading import Thread, Event
from typing import TYPE_CHECKING

from skywalking import config, plugins
from skywalking import config, plugins, loggings
from skywalking.loggings import logger
from skywalking.agent.protocol import Protocol

if TYPE_CHECKING:
Expand Down Expand Up @@ -66,20 +67,27 @@ def __init():
plugins.install()


def __fini():
__protocol.report(__queue, False)
__queue.join()


def start():
global __started
if __started:
raise RuntimeError('the agent can only be started once')
from skywalking import loggings
loggings.init()
config.finalize()
__started = True
__init()
__heartbeat_thread.start()
__report_thread.start()
atexit.register(__fini)


def stop():
atexit.unregister(__fini)
__fini()
__finished.set()


Expand Down
2 changes: 1 addition & 1 deletion skywalking/agent/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ def connected(self):
def heartbeat(self):
raise NotImplementedError()

def report(self, queue: Queue):
def report(self, queue: Queue, block: bool = True):
raise NotImplementedError()
4 changes: 2 additions & 2 deletions skywalking/agent/protocol/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ def on_error(self):
self.channel.unsubscribe(self._cb)
self.channel.subscribe(self._cb, try_to_connect=True)

def report(self, queue: Queue):
def report(self, queue: Queue, block: bool = True):
def generator():
while True:
segment = queue.get() # type: Segment
segment = queue.get(block=block) # type: Segment

logger.debug('reporting segment %s', segment)

Expand Down
4 changes: 2 additions & 2 deletions skywalking/agent/protocol/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ def heartbeat(self):
def connected(self):
return True

def report(self, queue: Queue):
def report(self, queue: Queue, block: bool = True):
def generator():
while True:
segment = queue.get() # type: Segment
segment = queue.get(block=block) # type: Segment

logger.debug('reporting segment %s', segment)

Expand Down
4 changes: 2 additions & 2 deletions skywalking/agent/protocol/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def connected(self):
def heartbeat(self):
self.service_management.send_heart_beat()

def report(self, queue: Queue):
def report(self, queue: Queue, block: bool = True):
def generator():
while True:
segment = queue.get() # type: Segment
segment = queue.get(block=block) # type: Segment

logger.debug('reporting segment %s', segment)

Expand Down