Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b3349bb
Allow threaded operation, including a handler that places the message…
dmopalmer Dec 1, 2018
6bcf153
Allow threaded operation, including a handler that places the message…
dmopalmer Dec 1, 2018
ed7e8c4
Merge branch 'threaded' of github.com:dmopalmer/pygcn into threaded
dmopalmer Dec 1, 2018
2725775
PEP-8 cleanup.
dmopalmer Dec 1, 2018
fa06b7f
Allow threaded operation, including a handler that places the message…
dmopalmer Dec 1, 2018
836f229
Allow threaded operation, including a handler that places the message…
dmopalmer Dec 1, 2018
19e8a5f
PEP-8 cleanup.
dmopalmer Dec 1, 2018
c4e3615
Merge branch 'threaded' of github.com:dmopalmer/pygcn into threaded
dmopalmer Feb 14, 2019
8b46d45
Merge branch 'master' of github.com:lpsinger/pygcn into threaded
dmopalmer Mar 7, 2019
0b6635b
Merge branch 'master' of github.com:lpsinger/pygcn into threaded
dmopalmer Dec 11, 2019
2fe080b
Removed accidental duplication. Fixed pep-8 (required by Travis).
dmopalmer Jan 8, 2020
9de70e9
Merge branch 'master' of github.com:lpsinger/pygcn into threaded
dmopalmer Oct 20, 2020
3828390
Merge branch 'main' of github.com:lpsinger/pygcn into threaded101
dmopalmer Jul 6, 2021
7b309e7
Added extra and keyword args to the wrappers.
dmopalmer Nov 18, 2021
6b98dfe
Merge branch 'handler_args' of github.com:dmopalmer/pygcn into thread…
dmopalmer Nov 18, 2021
b1f8369
Merge branch 'main' of github.com:lpsinger/pygcn into threaded102
dmopalmer Jan 30, 2023
4b88669
Merge branch 'nasa-gcn:main' into threaded102
dmopalmer Jan 30, 2023
86c4e5c
Removed stopevent capability to simplify threaded operation.
dmopalmer Apr 4, 2023
5db02d4
Readme change
dmopalmer Apr 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Allow threaded operation, including a handler that places the message…
…s on the queue and an event to stop the thread.
  • Loading branch information
dmopalmer committed Feb 14, 2019
commit 836f229e2df56c0133eae4f1f4a7ed0e4de94043
62 changes: 62 additions & 0 deletions gcn/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""
Utilities for command-line interface.
"""
from __future__ import print_function
import argparse
import collections
import logging
Expand Down Expand Up @@ -147,6 +148,67 @@ def inthandler(signum, frame):
print('\nFinishing')
thread.join()

def threaded_listen_main(args=None):
"""Example VOEvent listener that demonstrates threaded operation"""

# Command line interface
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('addr', default='68.169.57.253:8099',
action=HostPortAction,
help='Server host and port (default: %(default)s)')
parser.add_argument('--version', action='version',
version='pygcn ' + __version__)
parser.add_argument('--maxtime', default=None,
help='Time to process until returning (s)')
args = parser.parse_args(args)

if args.maxtime is not None:
args.maxtime = datetime.timedelta(seconds=float(args.maxtime))

# Set up logger
logging.basicConfig(level=logging.INFO)

# Listen for GCN notices (until interrupted, killed, or maxtime reached)
# in a second thread, while counting up seconds in the main thread.
messagequeue = queue.Queue()
stopevent = threading.Event()

def inthandler(signum, frame):
stopevent.set()

signal.signal(signal.SIGINT, handler=inthandler) # Keyboard etc interrupt
try:
listenargs = dict(host=args.addr.host, port=args.addr.port,
handler=handlers.queuehandlerfor(messagequeue),
stopevent=stopevent)
thread = threading.Thread(target=listen, kwargs=listenargs)
starttime = datetime.datetime.utcnow()
lasttime = starttime
thread.start()

while thread.is_alive():
try:
payload, root = messagequeue.get(timeout=1)
print('\r{} {}'
.format(datetime.datetime.utcnow().strftime("%H:%M:%S"),
root.attrib['ivorn']))
lasttime = datetime.datetime.utcnow()
except queue.Empty:
dt = (datetime.datetime.utcnow() - lasttime).total_seconds()
print('\r{:.0f}'.format(dt), end='\r')
if args.maxtime is not None:
if (datetime.datetime.utcnow() - starttime) > args.maxtime:
stopevent.set()
break
except Exception as e:
stopevent.set()
print(e, file=sys.stderr)
thread.join()
raise
print('\nFinishing')
thread.join()


def serve_main(args=None):
"""Rudimentary GCN server, for testing purposes. Serves just one connection
at a time, and repeats the same payloads in order, repeating, for each
Expand Down
4 changes: 3 additions & 1 deletion gcn/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,16 @@ def archive(payload, root):
f.write(payload)
logging.getLogger('gcn.handlers.archive').info("archived %s", ivorn)


def _queuehandler(payload, root, queue=None):
""" Place (payload, root) on queue for threaded operation.
This can be used in the following manner:
gcn.listen(handler = functools.partial(partialize_queue, queue=a_queue))
"""
if queue is None:
raise TypeError("The queue must be set (use queuehandlerfor())")
queue.put( (payload,root) )
queue.put((payload, root))


def queuehandlerfor(queue):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other handlers have function names that are verbs or verb phrases. Please rename this one to be consistent. Perhaps 'enqueueorput_queue`?

"""Create a handler that places (payload, root) on the given queue
Expand Down
25 changes: 25 additions & 0 deletions gcn/tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from .. import handlers
from .. import notice_types

try:
import queue
except:
import Queue as queue

payloads = [pkg_resources.resource_string(__name__, 'data/gbm_flt_pos.xml'),
pkg_resources.resource_string(__name__, 'data/kill_socket.xml')]
Expand Down Expand Up @@ -52,3 +56,24 @@ def test_archive(tmpdir):
assert (tmpdir / filename).exists()
finally:
os.chdir(old_dir)


def test_queuehandler():
queue_ = queue.Queue()
queuehandler = handlers.queuehandlerfor(queue_)
assert queue_.empty()

for payload in payloads:
queuehandler(payload, fromstring(payload))
qpayload, qtree = queue_.get()
assert qpayload == payload

assert queue_.empty()

for payload in payloads:
queuehandler(payload, fromstring(payload))
for payload in payloads:
qpayload, qtree = queue_.get()
assert qpayload == payload

assert queue_.empty()