Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/demo.conf
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,5 @@
#pushMaxRetries = 10

# Whether to retry polling indefinitely until success. Default is False.
# Enabling this holds the per-IOC lock until CF recovers for that IOC; other IOCs are unaffected.
# Enabling this holds the global commit lock until CF recovers; all other IOC commits are blocked.
#pushAlwaysRetry = False
1 change: 1 addition & 0 deletions server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
optional-dependencies.metrics = [ "prometheus-client" ]
optional-dependencies.test = [ "pytest>=8.3,<8.4", "pytest-cov>=6,<7", "testcontainers>=4.8.2,<4.9" ]
urls.Repository = "https://github.com/ChannelFinder/recsync"
scripts.recceiver-clean = "recceiver.clean_tool:main"

[tool.setuptools]
packages = [ "recceiver", "recceiver.cf", "recceiver.protocol", "twisted.plugins" ]
Expand Down
8 changes: 6 additions & 2 deletions server/recceiver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,15 @@ def privilegedStartService(self):
self._statusLoop.start(self.statusInterval, now=False)

def _logStatus(self):
metrics.connections_active.set(self.tcpFactory.NActive)
nactive = self.tcpFactory.NActive
if nactive < 0:
log.warning("NActive is %d — connection accounting is corrupted", nactive)
nactive = max(0, nactive)
metrics.connections_active.set(nactive)
metrics.connections_waiting.set(len(self.tcpFactory.Wait))
log.info(
"status: connections active=%d/%d queued=%d",
self.tcpFactory.NActive,
nactive,
self.tcpFactory.maxActive,
len(self.tcpFactory.Wait),
)
Expand Down
335 changes: 166 additions & 169 deletions server/recceiver/cf/processor.py
Comment thread
anderslindho marked this conversation as resolved.

Large diffs are not rendered by default.

100 changes: 100 additions & 0 deletions server/recceiver/clean_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""recceiver-clean — sweep Active CF channels to Inactive for a given recceiver ID.

Run after a RecCeiver restart (with cleanOnStart=False) to mark channels Inactive
whose IOCs have not reconnected.

Usage:
recceiver-clean -f /path/to/recceiver.conf [--recceiver-id ID] [--dry-run]
"""

import argparse
import configparser
import logging
import sys

from requests import RequestException

from recceiver.cf.adapter import PyCFClientAdapter
from recceiver.cf.config import CFConfig
from recceiver.cf.model import CFProperty, CFPropertyName, PVStatus
from recceiver.processors import ConfigAdapter

log = logging.getLogger(__name__)


def run_clean(cf_config: CFConfig, client=None, dry_run: bool = False) -> int:
"""Mark all Active channels for cf_config.recceiver_id Inactive.

Returns the total count of channels swept. Pass a pre-built client for testing.
"""
if client is None:
from channelfinder import ChannelFinderClient

client = PyCFClientAdapter(
Comment thread
anderslindho marked this conversation as resolved.
ChannelFinderClient(
BaseURL=cf_config.base_url,
username=cf_config.cf_username,
password=cf_config.cf_password,
verify_ssl=cf_config.verify_ssl,
),
size_limit=int(cf_config.cf_query_limit),
)
Comment thread
anderslindho marked this conversation as resolved.

total = 0
# find_active_for_recceiver is paginated (bounded by size_limit). In live mode each
# update_property call marks the current page Inactive, so the next query returns a
# fresh batch; the loop drains all pages. In dry-run mode nothing is deactivated so
# the same batch would come back on every iteration — break after the first page.
while True:
Comment thread
anderslindho marked this conversation as resolved.
channels = client.find_active_for_recceiver(cf_config.recceiver_id)
if not channels:
break
log.info(
"Found %d active channels for recceiver_id=%r%s",
len(channels),
cf_config.recceiver_id,
" (dry-run)" if dry_run else "",
)
if not dry_run:
client.update_property(
CFProperty(CFPropertyName.PV_STATUS.value, cf_config.username, PVStatus.INACTIVE.value),
[ch.name for ch in channels],
)
total += len(channels)
if dry_run:
break
Comment thread
anderslindho marked this conversation as resolved.
return total


def main(argv=None):
parser = argparse.ArgumentParser(description="Mark active CF channels Inactive for a given recceiver ID.")
parser.add_argument("-f", "--config", required=True, help="Path to recceiver config file")
parser.add_argument("--recceiver-id", default=None, help="Override recceiver ID from config")
parser.add_argument("--dry-run", action="store_true", help="Print count without modifying CF")
args = parser.parse_args(argv)

logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")

conf = configparser.ConfigParser()
conf.read(args.config)
cf_conf = CFConfig.loads(ConfigAdapter(conf, "cf"))

if args.recceiver_id:
cf_conf.recceiver_id = args.recceiver_id

if cf_conf.base_url is None:
print("ERROR: baseUrl must be configured in [cf] section", file=sys.stderr)
sys.exit(1)

try:
count = run_clean(cf_conf, dry_run=args.dry_run)
except RequestException as err:
print(f"ERROR: CF request failed: {err}", file=sys.stderr)
sys.exit(2)

action = "Would mark" if args.dry_run else "Marked"
print(f"{action} {count} channels Inactive for recceiver_id={cf_conf.recceiver_id!r}")


if __name__ == "__main__":
main()
16 changes: 10 additions & 6 deletions server/recceiver/recast.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, active=True):

self.sess, self.active = None, active
self.uploadSize, self.uploadStart = 0, 0
self._ping_timer = None # connectionLost guards on this; connectionMade may not set it

self.rxfn = collections.defaultdict(self.dfact)

Expand Down Expand Up @@ -58,7 +59,7 @@ def connectionLost(self, reason=protocol.connectionDone):
self.factory.isDone(self, self.active)
if self._ping_timer and self._ping_timer.active():
self._ping_timer.cancel()
del self._ping_timer
self._ping_timer = None
if self.sess:
self.sess.close()
del self.sess
Expand Down Expand Up @@ -178,6 +179,7 @@ def recvDone(self, body):
log.error("Ignoring done update")
return self.getInitialState()
self.factory.isDone(self, self.active)
self.active = False # slot freed; connectionLost must not free it again
self.sess.done()
if self.phase == 1:
self.writePing()
Expand Down Expand Up @@ -361,16 +363,18 @@ class CastFactory(protocol.ServerFactory):
maxActive = 3

def __init__(self):
# Throttle concurrent uploading connections to control CF commit load.
# "Active" means currently uploading records; connections become
# "inactive" via isDone() once the upload completes.
# Flow control by limiting the number of concurrent
# "active" connections. Active means dumping lots of records.
# Connections become "inactive" by calling isDone().
self.NActive = 0
self.Wait = []

def isDone(self, proto, active):
if not active:
# connection closed before activation
self.Wait.remove(proto)
# connection closed before activation; guard: proto may no longer be in Wait
# if recvDone already freed the slot and cleared self.active
if proto in self.Wait:
self.Wait.remove(proto)
elif len(self.Wait) > 0:
# Others are waiting
waiting = self.Wait.pop(0)
Expand Down
Loading
Loading