Skip to content

Commit 9c501a9

Browse files
committed
Add support for Comet-style server side filtering
See https://comet.transientskp.org/en/stable/filtering.html.
1 parent e453216 commit 9c501a9

File tree

3 files changed

+47
-9
lines changed

3 files changed

+47
-9
lines changed

README.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,24 @@ def handler(payload, root):
6060
gcn.listen(handler=handler)
6161
```
6262

63-
## Filtering
63+
## Server-Side Filtering
64+
65+
VOEvent brokers that are powered by [Comet](https://comet.transientskp.org/) support [server-side filtering of alerts](https://comet.transientskp.org/en/stable/filtering.html). You configure the server-side filtering when you connect by supplying an [XPath expression](https://www.w3schools.com/xml/xpath_syntax.asp) in the optional `filter` argument for `gcn.listen`:
66+
67+
```python
68+
gcn.listen(handler=handler, filter='insert-filter-here')
69+
```
70+
71+
Here is a cheat sheet for some common filter expressions.
72+
73+
| Filter expression | What it does |
74+
| - | - |
75+
| `//Param[@name="Packet_Type" and @value="115"]` | Pass only alerts of notice type 115 (`FERMI_GBM_FIN_POS`) |
76+
| `//Param[@name="Packet_Type" and @value="115"] and //Error2Radius<=6` | Pass only alerts of notice type 115 (`FERMI_GBM_FIN_POS`) with error radius less than or equal to 6° |
77+
| `//Param[@name="Packet_Type" and (@value="112" or @value="115")]` | Pass only alerts of notice type 112 (`FERMI_GBM_GND_POS`) or 115 (`FERMI_GBM_FIN_POS`) |
78+
| `starts-with(@ivorn, "ivo://gwnet/") and @role!="test"` | Pass only LIGO-Virgo-KAGRA gravitational-wave alerts that are not test alerts |
79+
80+
## Client-Side Filtering
6481

6582
You can also filter events by notice type using
6683
`gcn.include_notice_types` or `gcn.exclude_notice_types`.

gcn/cmdline.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ def listen_main(args=None):
7676
'provided, then loop over hosts until a connection '
7777
'with one of them is established. '
7878
'(default: %(default)s)')
79+
parser.add_argument(
80+
'--filter', help='Optional XPath expression for server-side event '
81+
'filtering. Only supported by Comet brokers. See '
82+
'https://comet.transientskp.org/en/stable/filtering.html for '
83+
'supported syntax')
7984
parser.add_argument('--version', action='version',
8085
version='pygcn ' + __version__)
8186
args = parser.parse_args(args)
@@ -85,7 +90,7 @@ def listen_main(args=None):
8590

8691
# Listen for GCN notices (until interrupted or killed)
8792
host, port = [list(_) for _ in zip(*args.addr)]
88-
listen(host=host, port=port, handler=handlers.archive)
93+
listen(host=host, port=port, handler=handlers.archive, filter=args.filter)
8994

9095

9196
def serve_main(args=None):

gcn/voeventclient.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import struct
2626
import time
2727
import itertools
28+
from xml.sax.saxutils import quoteattr
2829

2930
from lxml.etree import fromstring, XMLSyntaxError
3031

@@ -42,7 +43,7 @@
4243

4344
def _get_now_iso8601():
4445
"""Get current date-time in ISO 8601 format."""
45-
return datetime.datetime.now().isoformat()
46+
return datetime.datetime.now().isoformat() + "Z"
4647

4748

4849
def _open_socket(hosts_ports, iamalive_timeout, max_reconnect_timeout, log):
@@ -121,7 +122,7 @@ def _send_packet(sock, payload):
121122
sock.sendall(_size_struct.pack(len(payload)) + payload)
122123

123124

124-
def _form_response(role, origin, response, timestamp):
125+
def _form_response(role, origin, response, timestamp, meta=''):
125126
"""Form a VOEvent Transport Protocol packet suitable for sending an `ack`
126127
or `iamalive` response."""
127128
return (
@@ -133,11 +134,12 @@ def _form_response(role, origin, response, timestamp):
133134
'Transport/v1.1 '
134135
'http://telescope-networks.org/schema/Transport-v1.1.xsd"><Origin>' +
135136
origin + '</Origin><Response>' + response +
136-
'</Response><TimeStamp>' + timestamp +
137-
'</TimeStamp></trn:Transport>').encode('UTF-8')
137+
'</Response>' + '<Meta>' + meta + '</Meta>'
138+
+ '<TimeStamp>' + timestamp + '</TimeStamp></trn:Transport>'
139+
).encode('UTF-8')
138140

139141

140-
def _ingest_packet(sock, ivorn, handler, log):
142+
def _ingest_packet(sock, ivorn, handler, log, filter):
141143
"""Ingest one VOEvent Transport Protocol packet and act on it, first
142144
sending the appropriate response and then calling the handler if the
143145
payload is a VOEvent."""
@@ -163,6 +165,13 @@ def _ingest_packet(sock, ivorn, handler, log):
163165
root.find("Origin").text, ivorn,
164166
_get_now_iso8601()))
165167
log.debug("sent iamalive response")
168+
elif root.attrib["role"] == "authenticate" and filter is not None:
169+
log.debug("received authenticate message")
170+
_send_packet(sock, _form_response("authenticate",
171+
root.find("Origin").text, ivorn,
172+
_get_now_iso8601(),
173+
f'<Param name="xpath-filter" value={filter}/>'))
174+
log.debug("sent authenticate response")
166175
else:
167176
log.error(
168177
'received transport message with unrecognized role: %s',
@@ -212,7 +221,7 @@ def _validate_host_port(host, port):
212221

213222
def listen(host=("45.58.43.186", "68.169.57.253"), port=8099,
214223
ivorn="ivo://python_voeventclient/anonymous", iamalive_timeout=150,
215-
max_reconnect_timeout=1024, handler=None, log=None):
224+
max_reconnect_timeout=1024, handler=None, log=None, filter=None):
216225
"""Connect to a VOEvent Transport Protocol server on the given `host` and
217226
`port`, then listen for VOEvents until interrupted (i.e., by a keyboard
218227
interrupt, `SIGINTR`, or `SIGTERM`).
@@ -233,10 +242,17 @@ def listen(host=("45.58.43.186", "68.169.57.253"), port=8099,
233242
used for reporting the client's status. If `log` is not provided, a default
234243
logger will be used.
235244
245+
If `filter` is provided, then it is passed to the server as an
246+
`XPath filtering expression
247+
<https://comet.transientskp.org/en/stable/filtering.html>`_.
248+
236249
Note that this function does not return."""
237250
if log is None:
238251
log = logging.getLogger('gcn.listen')
239252

253+
if filter is not None:
254+
filter = quoteattr(filter)
255+
240256
hosts_ports = itertools.cycle(zip(*_validate_host_port(host, port)))
241257

242258
while True:
@@ -246,7 +262,7 @@ def listen(host=("45.58.43.186", "68.169.57.253"), port=8099,
246262

247263
try:
248264
while True:
249-
_ingest_packet(sock, ivorn, handler, log)
265+
_ingest_packet(sock, ivorn, handler, log, filter)
250266
except socket.timeout:
251267
log.warn("timed out")
252268
except socket.error:

0 commit comments

Comments
 (0)