diff --git a/server/demo.conf b/server/demo.conf index a20757e..37a74fa 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -133,5 +133,5 @@ #pushMaxRetries = 10 # Whether to retry polling indefinitely until success. Default is False. -# Enabling this holds the per-IOC lock until CF recovers for that IOC; other IOCs are unaffected. +# Enabling this holds the global commit lock until CF recovers; all other IOC commits are blocked. #pushAlwaysRetry = False diff --git a/server/pyproject.toml b/server/pyproject.toml index 5e2458a..f13f816 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ optional-dependencies.metrics = [ "prometheus-client" ] optional-dependencies.test = [ "pytest>=8.3,<8.4", "pytest-cov>=6,<7", "testcontainers>=4.8.2,<4.9" ] urls.Repository = "https://github.com/ChannelFinder/recsync" +scripts.recceiver-clean = "recceiver.clean_tool:main" [tool.setuptools] packages = [ "recceiver", "recceiver.cf", "recceiver.protocol", "twisted.plugins" ] diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 712cb4d..d47bfff 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -136,11 +136,15 @@ def privilegedStartService(self): self._statusLoop.start(self.statusInterval, now=False) def _logStatus(self): - metrics.connections_active.set(self.tcpFactory.NActive) + nactive = self.tcpFactory.NActive + if nactive < 0: + log.warning("NActive is %d — connection accounting is corrupted", nactive) + nactive = max(0, nactive) + metrics.connections_active.set(nactive) metrics.connections_waiting.set(len(self.tcpFactory.Wait)) log.info( "status: connections active=%d/%d queued=%d", - self.tcpFactory.NActive, + nactive, self.tcpFactory.maxActive, len(self.tcpFactory.Wait), ) diff --git a/server/recceiver/cf/processor.py b/server/recceiver/cf/processor.py index c4ef5d0..4ae97c7 100755 --- a/server/recceiver/cf/processor.py +++ b/server/recceiver/cf/processor.py @@ -1,6 +1,5 @@ import datetime import logging -import threading import time from collections import defaultdict from typing import Callable, Dict, List, Optional, Set @@ -30,10 +29,6 @@ log = logging.getLogger(__name__) -class CFUpdateAbortedError(Exception): - """Raised when a CF push is abandoned after exhausting all retries.""" - - @implementer(interfaces.IProcessor) class CFProcessor(service.Service): """IProcessor plugin that synchronises IOC record data to Channelfinder. @@ -49,11 +44,8 @@ def __init__(self, name: Optional[str], conf: ConfigAdapter): self.iocs: Dict[str, IOCInfo] = {} self.client: Optional[ChannelFinderAdapter] = None self.current_time: Callable[[Optional[str]], str] = get_current_time - self.lock: DeferredLock = DeferredLock() # lifecycle lock: serialises start/stop - self._ioc_locks: Dict[str, DeferredLock] = {} + self.lock: DeferredLock = DeferredLock() self._ioc_channels: Dict[str, Set[str]] = defaultdict(set) # iocid → set of channel names - self._state_lock: threading.Lock = threading.Lock() - self._cancelled: Dict[str, bool] = {} self._statusLoop = None def startService(self): @@ -166,74 +158,67 @@ def stopService(self): def _stop_service_with_lock(self): """Stop the CFProcessor service with lock held. - If clean_on_stop is enabled, drain all in-flight per-IOC commits - first, then mark all channels inactive in a background thread. - Commits use _ioc_locks rather than self.lock and can still be running - when this is called; the drain waits for each lock before the sweep. + If clean_on_stop is enabled, mark all channels as inactive. + The sweep runs in a background thread to avoid blocking the reactor. + The lock is held throughout, preventing new commits from interleaving. """ log.info("CF_STOP with lock") if self.cf_config.clean_on_stop: - drains = [lock.run(lambda: None) for lock in self._ioc_locks.values()] - d = defer.DeferredList(drains, consumeErrors=True) - d.addCallback(lambda _: deferToThread(self.clean_service)) - return d + return deferToThread(self.clean_service) def _start_background_clean(self): log.info("CF Clean: background startup sweep beginning") deferToThread(self.clean_service).addErrback(lambda err: log.error("CF Clean background sweep failed: %s", err)) - def _get_ioc_lock(self, iocid: str) -> DeferredLock: - """Return the per-IOC DeferredLock, creating it on first use. + def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: + """Commit a transaction to Channelfinder.""" + return self.lock.run(self._commit_with_lock, transaction_record) + + def _commit_with_lock(self, transaction: interfaces.ITransaction) -> defer.Deferred: + """Bridge the blocking commit thread to a cancellable Deferred. - Must only be called from the reactor thread. + Spawns _commit_with_thread in a thread, then wires a separate cancellable + Deferred (d) around it so that cancelling d (e.g. on service stop) sets + self.cancelled=True, which _assert_not_cancelled picks up mid-push. """ - if iocid not in self._ioc_locks: - self._ioc_locks[iocid] = DeferredLock() - return self._ioc_locks[iocid] + self.cancelled = False - def _prune_ioc_state(self, result, iocid: str): - """Remove per-IOC bookkeeping once an IOC has fully departed. + t = deferToThread(self._commit_with_thread, transaction) - Called after the per-IOC lock is released. Only prunes when the lock - is free (no new commit queued) and the IOC is no longer in self.iocs. - Always returns result so it is safe to use with addBoth. - """ - lock = self._ioc_locks.get(iocid) - if lock is not None and not lock.locked and iocid not in self.iocs: - self._ioc_locks.pop(iocid, None) - self._cancelled.pop(iocid, None) - self._ioc_channels.pop(iocid, None) - return result + def cancel_commit(d: defer.Deferred): + self.cancelled = True + d.callback(None) - def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: - """Commit a transaction to Channelfinder. + d: defer.Deferred = defer.Deferred(cancel_commit) - Uses a per-IOC DeferredLock so commits from different IOCs run in - parallel while transactions from the same IOC stay serialised. - """ - iocid = f"{transaction_record.source_address.host}:{transaction_record.source_address.port}" - lock = self._get_ioc_lock(iocid) - d = lock.run(self._commit_with_lock, transaction_record, iocid) - d.addBoth(self._prune_ioc_state, iocid) - return d + def wait_for_thread(_ignored): + if self.cancelled: + return t - @defer.inlineCallbacks - def _commit_with_lock(self, transaction: interfaces.ITransaction, iocid: str) -> defer.Deferred: - self._cancelled[iocid] = False - try: - result = yield deferToThread(self._prepare_commit, transaction, iocid) - if result is None: - return # disconnect-before-upload: nothing to push - ioc_info, record_info_by_name, records_to_delete, iocs_snap, ciids_snap = result - yield self._push_to_cf_async(iocid, record_info_by_name, records_to_delete, ioc_info, iocs_snap, ciids_snap) - except defer.CancelledError as err: - log.debug("CF_COMMIT cancelled for %s: %s", iocid, err) - raise - except CFUpdateAbortedError as err: - log.exception("CF_COMMIT ABORTED after exhausting retries: %s", err) - except Exception as err: - log.exception("CF_COMMIT FAILURE: %s", err) - raise + d.addCallback(wait_for_thread) + + def chain_error(err): + """Handle errors from the commit thread. + + Note this is not foolproof as the thread may still be running. + """ + if not err.check(defer.CancelledError): + log.error("CF_COMMIT FAILURE: %s", err) + if self.cancelled: + if not err.check(defer.CancelledError): + raise defer.CancelledError() + return err + else: + d.callback(None) + + def chain_result(result): + if self.cancelled: + raise defer.CancelledError(f"CF Processor is cancelled, due to {result}") + else: + d.callback(None) + + t.addCallbacks(chain_result, chain_error) + return d def transaction_to_record_infos( self, ioc_info: IOCInfo, transaction: interfaces.ITransaction @@ -346,14 +331,12 @@ def _remove_aliases(self, aliases: List[str], iocid: str) -> None: for alias in aliases: self.remove_channel(alias, iocid) - def _prepare_commit(self, transaction: interfaces.ITransaction, iocid: str): - """Build IOC/record state and update shared dicts under _state_lock. + def _commit_with_thread(self, transaction: interfaces.ITransaction): + """Execute a commit transaction in a background thread. - Returns a tuple of (ioc_info, record_info_by_name, records_to_delete, - iocs_snapshot, channel_ioc_ids_snapshot) ready for the CF push, or - None if the IOC disconnected before completing its initial upload. - - Runs in a thread pool thread; must not touch the reactor. + Extracts IOC identity from transaction metadata, updates in-memory + channel/IOC state, then calls _push_to_cf. Raises CancelledError on + push failure so the Twisted caller sees a failed Deferred. """ host = transaction.source_address.host port = transaction.source_address.port @@ -401,81 +384,36 @@ def _prepare_commit(self, transaction: interfaces.ITransaction, iocid: str): ) record_infos = self.transaction_to_record_infos(ioc_info, transaction) + records_to_delete = list(transaction.records_to_delete) log.debug("Delete records: %s", records_to_delete) - record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) + record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) if not transaction.connected and ioc_info.id not in self.iocs: log.warning( "IOC at %s:%d disconnected before completing initial upload (0 channels registered)", host, port, ) - return None - - with self._state_lock: - self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) - iocs_snap = dict(self.iocs) - # Snapshot only channels this IOC owns plus any being deleted; that - # covers everything _handle_channel_is_old can look up without - # cloning the full channel map on every commit. - channels_to_snapshot = set(records_to_delete) | self._ioc_channels.get(iocid, set()) - ciids_snap = {k: list(self.channel_ioc_ids[k]) for k in channels_to_snapshot if k in self.channel_ioc_ids} - - return ioc_info, record_info_by_name, records_to_delete, iocs_snap, ciids_snap - - @defer.inlineCallbacks - def _push_to_cf_async( - self, - iocid: str, - record_info_by_name: Dict[str, "RecordInfo"], - records_to_delete: List[str], - ioc_info: "IOCInfo", - iocs: Dict[str, "IOCInfo"], - channel_ioc_ids: Dict[str, List[str]], - ) -> defer.Deferred: - """Retry CF update until success, service stop, or retry limit. - - Each CF call runs in a thread pool thread via deferToThread. Retry - waits use task.deferLater so no thread is held between attempts. + return + self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) + poll_success = self._push_to_cf(record_info_by_name, records_to_delete, ioc_info) + if not poll_success: + if transaction.connected: + self._evict_ioc(ioc_info.id) + raise defer.CancelledError(f"Failed to commit transaction after polling retries: {transaction}") + + def _evict_ioc(self, iocid: str) -> None: + """Remove an IOC from in-memory state so its next commit is treated as initial. + + Called after a CF push exhausts retries: CF was never written, so + in-memory state would diverge from CF until the IOC reconnects. """ - from twisted.internet import reactor - - count = 0 - sleep = 1.0 - log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) - while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: - if not self.running or self._cancelled.get(iocid, False): - log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) - return - count += 1 - t0 = time.monotonic() - try: - yield deferToThread( - self._update_channelfinder, record_info_by_name, records_to_delete, ioc_info, iocs, channel_ioc_ids - ) - elapsed = time.monotonic() - t0 - metrics.cf_commit_duration_seconds.observe(elapsed) - metrics.cf_commits_total.labels(result="success").inc() - log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) - return - except defer.CancelledError: - self._cancelled[iocid] = True - raise - except RequestException: - elapsed = time.monotonic() - t0 - log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) - retry_seconds = min(60.0, sleep) - log.info("CF push retry in %s seconds", retry_seconds) - try: - yield task.deferLater(reactor, retry_seconds, lambda: None) - except defer.CancelledError: - self._cancelled[iocid] = True - raise - sleep *= 1.5 - metrics.cf_commits_total.labels(result="cancelled").inc() - log.error("CF push gave up after %d attempts: %s", count, ioc_info) - raise CFUpdateAbortedError(f"Failed to commit transaction after {count} attempts: {ioc_info}") + # list() snapshots the set; remove_channel mutates it during iteration + for ch in list(self._ioc_channels.get(iocid, set())): # NOSONAR + self.remove_channel(ch, iocid) + self.iocs.pop(iocid, None) + self._ioc_channels.pop(iocid, None) def remove_channel(self, record_name: str, iocid: str) -> None: """Unlink a channel from an IOC in channel_ioc_ids and decrement channelcount. @@ -533,8 +471,46 @@ def clean_channels(self, owner: str, channels: List[CFChannel]) -> None: log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(names)) self.client.update_property(CFProperty(CFPropertyName.PV_STATUS.value, owner, PVStatus.INACTIVE.value), names) - def _assert_not_cancelled(self, iocid: str, context: str) -> None: - if self._cancelled.get(iocid, False) or not self.running: + def _push_to_cf( + self, + record_info_by_name: Dict[str, RecordInfo], + records_to_delete: List[str], + ioc_info: IOCInfo, + ) -> bool: + """Call _update_channelfinder with exponential back-off. + + Returns True on success, False if the processor was stopped or the retry + budget (push_max_retries) was exhausted. Blocks the calling thread. + """ + log.info("CF push start: %s (%d channels)", ioc_info, len(record_info_by_name)) + count = 0 + sleep = 1.0 + while self.cf_config.push_always_retry or count < self.cf_config.push_max_retries: + if not self.running: + log.info("CF processor stopped; abandoning push for %s after %d attempt(s)", ioc_info, count) + return False + count += 1 + t0 = time.monotonic() + try: + self._update_channelfinder(record_info_by_name, records_to_delete, ioc_info) + elapsed = time.monotonic() - t0 + metrics.cf_commit_duration_seconds.observe(elapsed) + metrics.cf_commits_total.labels(result="success").inc() + log.info("CF push done in %.2fs: %s (%d channels)", elapsed, ioc_info, len(record_info_by_name)) + return True + except RequestException: + elapsed = time.monotonic() - t0 + log.exception("CF push failed after %.2fs (attempt %d): %s", elapsed, count, ioc_info) + retry_seconds = min(60, sleep) + log.info("CF push retry in %s seconds", retry_seconds) + time.sleep(retry_seconds) + sleep *= 1.5 + metrics.cf_commits_total.labels(result="cancelled").inc() + log.error("CF push gave up after %d attempts: %s", count, ioc_info) + return False + + def _assert_not_cancelled(self, context: str) -> None: + if self.cancelled: raise defer.CancelledError(f"Processor cancelled: {context}") def _update_channelfinder( @@ -542,13 +518,13 @@ def _update_channelfinder( record_info_by_name: Dict[str, RecordInfo], records_to_delete: List[str], ioc_info: IOCInfo, - iocs: Dict[str, IOCInfo], - channel_ioc_ids: Dict[str, List[str]], ) -> None: - """Push one IOC's changes to ChannelFinder. + """Compute and push the minimal CF diff for one IOC commit. - Uses iocs and channel_ioc_ids snapshots taken at commit time so reads - are consistent even when other IOC commits run concurrently in threads. + Queries CF for channels already registered under this IOC, classifies + each as old-only (re-assign or orphan), old-and-new (update in place), + or new-only (create or move from another IOC), then writes the result + via _cf_set_chunked. Raises CancelledError if self.cancelled is set. """ log.info("CF Update IOC: %s", ioc_info) log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) @@ -556,18 +532,18 @@ def _update_channelfinder( new_channels = set(record_info_by_name.keys()) iocid = ioc_info.id - if iocid not in iocs and record_info_by_name: - # Disconnect-before-upload is already logged in _prepare_commit. + if iocid not in self.iocs and record_info_by_name: + # Disconnect-before-upload is already logged in _commit_with_thread. log.warning( "IOC %s committed update without prior initial transaction (%d IOCs known)", ioc_info, - len(iocs), + len(self.iocs), ) if ioc_info.hostname is None or ioc_info.ioc_name is None: raise IOCMissingInfoError(ioc_info) - self._assert_not_cancelled(iocid, f"before fetching old channels for {ioc_info}") + self._assert_not_cancelled(f"before fetching old channels for {ioc_info}") channels: List[CFChannel] = [] log.debug("Find existing channels by IOCID: %s", ioc_info) @@ -583,12 +559,11 @@ def _update_channelfinder( channels, record_info_by_name, iocid, - iocs, - channel_ioc_ids, ) + # now pvNames contains a list of pv's new on this host/ioc existing_channels = self._get_existing_channels(new_channels) - self._assert_not_cancelled(iocid, f"after fetching existing channels for {ioc_info}") + self._assert_not_cancelled(f"after fetching existing channels for {ioc_info}") self._process_new_channels( new_channels, record_info_by_name, ioc_info, recceiverid, existing_channels, channels, iocid @@ -600,6 +575,7 @@ def _update_channelfinder( else: if old_channels and len(old_channels) != 0: self._cf_set_chunked(channels) + self._assert_not_cancelled(f"after setting channels for {ioc_info}") def _process_new_channels( self, @@ -611,6 +587,11 @@ def _process_new_channels( channels: List[CFChannel], iocid: str, ) -> None: + """Build CF channel objects for all channels new in this commit. + + Channels already in CF under a different IOC are updated in place; + genuinely new channels are created fresh. Appends results to channels. + """ for channel_name in new_channels: new_properties = create_ioc_properties( ioc_info.owner, @@ -651,8 +632,6 @@ def _handle_channels( channels: List[CFChannel], record_info_by_name: Dict[str, RecordInfo], iocid: str, - iocs: Dict[str, IOCInfo], - channel_ioc_ids: Dict[str, List[str]], ) -> None: """Handle channels already present in Channelfinder for this IOC. @@ -663,17 +642,26 @@ def _handle_channels( for cf_channel in old_channels: if not new_channels or cf_channel.name in records_to_delete: log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) - if cf_channel.name in channel_ioc_ids: - self._handle_channel_is_old( - cf_channel, ioc_info, recceiverid, channels, record_info_by_name, iocs, channel_ioc_ids - ) + if cf_channel.name in self.channel_ioc_ids: + last_ioc_id = self.channel_ioc_ids[cf_channel.name][-1] + last_ioc_info = self.iocs.get(last_ioc_id) + if last_ioc_info is None: + log.warning( + "Last IOC %s for channel %s not in local state; orphaning channel", + last_ioc_id, + cf_channel.name, + ) + self._orphan_channel(cf_channel, ioc_info, channels, record_info_by_name) + else: + self._handle_channel_is_old( + cf_channel, ioc_info, recceiverid, channels, record_info_by_name, last_ioc_id, last_ioc_info + ) else: self._orphan_channel(cf_channel, ioc_info, channels, record_info_by_name) - else: - if cf_channel.name in new_channels: - self._handle_channel_old_and_new( - cf_channel, iocid, ioc_info, channels, new_channels, record_info_by_name, old_channels - ) + elif cf_channel.name in new_channels: + self._handle_channel_old_and_new( + cf_channel, iocid, ioc_info, channels, new_channels, record_info_by_name, old_channels + ) def _handle_channel_is_old( self, @@ -682,14 +670,18 @@ def _handle_channel_is_old( recceiverid: str, channels: List[CFChannel], record_info_by_name: Dict[str, RecordInfo], - iocs: Dict[str, IOCInfo], - channel_ioc_ids: Dict[str, List[str]], + last_ioc_id: str, + last_ioc_info: IOCInfo, ) -> None: - """Channel exists in CF but not in this commit — re-assign to its last known IOC.""" - last_ioc_id = channel_ioc_ids[cf_channel.name][-1] - cf_channel.owner = iocs[last_ioc_id].owner + """Re-assign a CF channel to its last known IOC. + + Called when a channel exists in CF but not in the current commit and its last + known IOC is still in local state. Caller is responsible for the None check on + last_ioc_info and must orphan the channel instead when it is absent. + """ + cf_channel.owner = last_ioc_info.owner cf_channel.properties = _merge_property_lists( - create_default_properties(ioc_info, recceiverid, channel_ioc_ids, iocs, cf_channel), + create_default_properties(ioc_info, recceiverid, self.channel_ioc_ids, self.iocs, cf_channel), cf_channel, self.managed_properties, ) @@ -700,11 +692,16 @@ def _handle_channel_is_old( for alias_name in record_info_by_name[cf_channel.name].aliases: # Legacy alias handling retained to avoid changing runtime behavior. alias_channel = CFChannel(alias_name, "", []) - if alias_name in channel_ioc_ids: - last_alias_ioc_id = channel_ioc_ids[alias_name][-1] - alias_channel.owner = iocs[last_alias_ioc_id].owner + if alias_name in self.channel_ioc_ids: + last_alias_ioc_id = self.channel_ioc_ids[alias_name][-1] + last_alias_ioc_info = self.iocs.get(last_alias_ioc_id) + if last_alias_ioc_info is None: + continue + alias_channel.owner = last_alias_ioc_info.owner alias_channel.properties = _merge_property_lists( - create_default_properties(ioc_info, recceiverid, channel_ioc_ids, iocs, cf_channel), + create_default_properties( + ioc_info, recceiverid, self.channel_ioc_ids, self.iocs, cf_channel + ), alias_channel, self.managed_properties, ) diff --git a/server/recceiver/clean_tool.py b/server/recceiver/clean_tool.py new file mode 100644 index 0000000..4e63c7d --- /dev/null +++ b/server/recceiver/clean_tool.py @@ -0,0 +1,100 @@ +"""recceiver-clean — sweep Active CF channels to Inactive for a given recceiver ID. + +Run after a RecCeiver restart (with cleanOnStart=False) to mark channels Inactive +whose IOCs have not reconnected. + +Usage: + recceiver-clean -f /path/to/recceiver.conf [--recceiver-id ID] [--dry-run] +""" + +import argparse +import configparser +import logging +import sys + +from requests import RequestException + +from recceiver.cf.adapter import PyCFClientAdapter +from recceiver.cf.config import CFConfig +from recceiver.cf.model import CFProperty, CFPropertyName, PVStatus +from recceiver.processors import ConfigAdapter + +log = logging.getLogger(__name__) + + +def run_clean(cf_config: CFConfig, client=None, dry_run: bool = False) -> int: + """Mark all Active channels for cf_config.recceiver_id Inactive. + + Returns the total count of channels swept. Pass a pre-built client for testing. + """ + if client is None: + from channelfinder import ChannelFinderClient + + client = PyCFClientAdapter( + ChannelFinderClient( + BaseURL=cf_config.base_url, + username=cf_config.cf_username, + password=cf_config.cf_password, + verify_ssl=cf_config.verify_ssl, + ), + size_limit=int(cf_config.cf_query_limit), + ) + + total = 0 + # find_active_for_recceiver is paginated (bounded by size_limit). In live mode each + # update_property call marks the current page Inactive, so the next query returns a + # fresh batch; the loop drains all pages. In dry-run mode nothing is deactivated so + # the same batch would come back on every iteration — break after the first page. + while True: + channels = client.find_active_for_recceiver(cf_config.recceiver_id) + if not channels: + break + log.info( + "Found %d active channels for recceiver_id=%r%s", + len(channels), + cf_config.recceiver_id, + " (dry-run)" if dry_run else "", + ) + if not dry_run: + client.update_property( + CFProperty(CFPropertyName.PV_STATUS.value, cf_config.username, PVStatus.INACTIVE.value), + [ch.name for ch in channels], + ) + total += len(channels) + if dry_run: + break + return total + + +def main(argv=None): + parser = argparse.ArgumentParser(description="Mark active CF channels Inactive for a given recceiver ID.") + parser.add_argument("-f", "--config", required=True, help="Path to recceiver config file") + parser.add_argument("--recceiver-id", default=None, help="Override recceiver ID from config") + parser.add_argument("--dry-run", action="store_true", help="Print count without modifying CF") + args = parser.parse_args(argv) + + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") + + conf = configparser.ConfigParser() + conf.read(args.config) + cf_conf = CFConfig.loads(ConfigAdapter(conf, "cf")) + + if args.recceiver_id: + cf_conf.recceiver_id = args.recceiver_id + + if cf_conf.base_url is None: + print("ERROR: baseUrl must be configured in [cf] section", file=sys.stderr) + sys.exit(1) + + try: + count = run_clean(cf_conf, dry_run=args.dry_run) + except RequestException as err: + print(f"ERROR: CF request failed: {err}", file=sys.stderr) + sys.exit(2) + + action = "Would mark" if args.dry_run else "Marked" + print(f"{action} {count} channels Inactive for recceiver_id={cf_conf.recceiver_id!r}") + + +if __name__ == "__main__": + main() diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index 8fd1912..be4af97 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -29,6 +29,7 @@ def __init__(self, active=True): self.sess, self.active = None, active self.uploadSize, self.uploadStart = 0, 0 + self._ping_timer = None # connectionLost guards on this; connectionMade may not set it self.rxfn = collections.defaultdict(self.dfact) @@ -58,7 +59,7 @@ def connectionLost(self, reason=protocol.connectionDone): self.factory.isDone(self, self.active) if self._ping_timer and self._ping_timer.active(): self._ping_timer.cancel() - del self._ping_timer + self._ping_timer = None if self.sess: self.sess.close() del self.sess @@ -178,6 +179,7 @@ def recvDone(self, body): log.error("Ignoring done update") return self.getInitialState() self.factory.isDone(self, self.active) + self.active = False # slot freed; connectionLost must not free it again self.sess.done() if self.phase == 1: self.writePing() @@ -361,16 +363,18 @@ class CastFactory(protocol.ServerFactory): maxActive = 3 def __init__(self): - # Throttle concurrent uploading connections to control CF commit load. - # "Active" means currently uploading records; connections become - # "inactive" via isDone() once the upload completes. + # Flow control by limiting the number of concurrent + # "active" connections. Active means dumping lots of records. + # Connections become "inactive" by calling isDone(). self.NActive = 0 self.Wait = [] def isDone(self, proto, active): if not active: - # connection closed before activation - self.Wait.remove(proto) + # connection closed before activation; guard: proto may no longer be in Wait + # if recvDone already freed the slot and cleared self.active + if proto in self.Wait: + self.Wait.remove(proto) elif len(self.Wait) > 0: # Others are waiting waiting = self.Wait.pop(0) diff --git a/server/tests/unit/cf/test_processor.py b/server/tests/unit/cf/test_processor.py index 11ea132..2d29f36 100644 --- a/server/tests/unit/cf/test_processor.py +++ b/server/tests/unit/cf/test_processor.py @@ -1,16 +1,26 @@ -from unittest.mock import MagicMock +import time +import pytest from requests import RequestException from twisted.internet import defer from twisted.internet.address import IPv4Address -from twisted.internet.defer import DeferredLock from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus, RecordInfo -from recceiver.cf.processor import CFProcessor, CFUpdateAbortedError +from recceiver.cf.processor import CFProcessor +from recceiver.recast import Transaction from tests.unit.cf.conftest import DEFAULT_RECCEIVER_ID, make_channel, make_ioc from tests.unit.cf.mock_adapter import MockCFAdapter from tests.unit.conftest import make_adapter +_HOST_A = "1.2.3.4" # NOSONAR + + +def make_transaction(host: str, port: int) -> Transaction: + ep = IPv4Address("TCP", host, port) + t = Transaction(ep, id=0) + t.initial = True + return t + def make_processor() -> CFProcessor: return CFProcessor("test", make_adapter()) @@ -23,16 +33,6 @@ def make_processor_with_mock(): return proc, adapter -_HOST_A = "1.2.3.4" # NOSONAR -_HOST_B = "5.6.7.8" # NOSONAR - - -def make_transaction(host: str = _HOST_A, port: int = 5064) -> MagicMock: - t = MagicMock() - t.source_address = IPv4Address("TCP", host, port) - return t - - class TestRemoveChannel: def test_missing_iocid_does_not_raise(self): proc = make_processor() @@ -91,7 +91,7 @@ def test_is_no_op_when_no_active_channels(self): class TestUpdateChannelFinder: def _make_proc(self): proc, adapter = make_processor_with_mock() - proc.running = True + proc.cancelled = False proc.managed_properties = set() proc.record_property_names_list = set() proc.env_vars = {} @@ -102,7 +102,7 @@ def test_registers_new_channel_as_active(self): ioc = make_ioc() proc.iocs[ioc.id] = ioc - proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc, proc.iocs, proc.channel_ioc_ids) + proc._update_channelfinder({"PV:1": RecordInfo(pv_name="PV:1")}, [], ioc) assert "PV:1" in adapter._channels status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) @@ -128,181 +128,169 @@ def test_orphans_channel_absent_from_local_state(self): ] ) - proc._update_channelfinder({}, [], ioc, proc.iocs, proc.channel_ioc_ids) + proc._update_channelfinder({}, [], ioc) status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) assert status.value == PVStatus.INACTIVE.value + def test_handle_channel_is_old_missing_last_ioc_does_not_raise(self): + """If the last known IOC for a channel has departed, orphan the channel rather than raise.""" + from recceiver.cf.model import IOCInfo -class TestPushToCF: - """Tests for _push_to_cf_async: retry abandonment when stopped or cancelled.""" + proc, adapter = self._make_proc() - def _run_async_push(self, monkeypatch, processor, ioc, side_effect_fn): - iocid = ioc.id - processor._cancelled[iocid] = False + ioc_a = IOCInfo( + host="1.2.3.4", # NOSONAR + hostname="ioc-a.example.com", + ioc_name="IOC-A", + ioc_ip="1.2.3.4", # NOSONAR + owner="admin", + time="2026-01-01T00:00:00", + port=5064, + channelcount=0, + ) + ioc_b = IOCInfo( + host="5.6.7.8", # NOSONAR + hostname="ioc-b.example.com", + ioc_name="IOC-B", + ioc_ip="5.6.7.8", # NOSONAR + owner="admin", + time="2026-01-01T00:00:00", + port=5064, + channelcount=0, + ) + ioc_a_id = ioc_a.id # "1.2.3.4:5064" + ioc_b_id = ioc_b.id # "5.6.7.8:5064" - def sync_thread(fn, *args): - try: - fn(*args) - return defer.succeed(None) - except Exception as exc: - return defer.fail(exc) + # PV:1 was last seen under IOC-A, which has since departed + proc.channel_ioc_ids["PV:1"].append(ioc_a_id) + # ioc_a deliberately absent from proc.iocs + proc.iocs[ioc_b_id] = ioc_b - monkeypatch.setattr("recceiver.cf.processor.deferToThread", sync_thread) - monkeypatch.setattr("recceiver.cf.processor.task.deferLater", lambda *a, **kw: defer.succeed(None)) - monkeypatch.setattr(processor, "_update_channelfinder", side_effect_fn) + # CF has PV:1 registered under IOC-B (by iocid property) + adapter.set_channels( + [ + CFChannel( + "PV:1", + "admin", + [ + CFProperty(CFPropertyName.PV_STATUS.value, "admin", PVStatus.ACTIVE.value), + CFProperty(CFPropertyName.IOC_ID.value, "admin", ioc_b_id), + ], + ) + ] + ) - results, errors = [], [] - processor._push_to_cf_async(iocid, {}, [], ioc, {}, {}).addCallbacks(results.append, errors.append) - return results, errors + # IOC-B commits with no channels — PV:1 appears in old_channels, triggers _handle_channel_is_old + proc._update_channelfinder({}, [], ioc_b) - def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch): - processor = make_processor() - processor.running = True - processor.cf_config.push_always_retry = True - ioc = make_ioc() - call_count = [0] + # Must not raise KeyError; PV:1 should be orphaned (marked Inactive) + status = next(p for p in adapter._channels["PV:1"].properties if p.name == CFPropertyName.PV_STATUS.value) + assert status.value == PVStatus.INACTIVE.value - def failing_update(*args): - call_count[0] += 1 - processor.running = False - raise RequestException("CF unreachable") - results, errors = self._run_async_push(monkeypatch, processor, ioc, failing_update) - assert len(results) == 1 and len(errors) == 0 - assert call_count[0] == 1 +class TestPushToCF: + def test_abandons_push_when_processor_stops_during_retry(self, monkeypatch): + monkeypatch.setattr(time, "sleep", lambda _: None) - def test_abandons_push_when_ioc_cancelled_during_retry(self, monkeypatch): processor = make_processor() processor.running = True processor.cf_config.push_always_retry = True - ioc = make_ioc() - iocid = ioc.id - call_count = [0] - def failing_update(*args): - call_count[0] += 1 - processor._cancelled[iocid] = True + call_count = 0 + + def failing_update(record_info_by_name, records_to_delete, ioc_info): + nonlocal call_count + call_count += 1 + processor.running = False raise RequestException("CF unreachable") - results, errors = self._run_async_push(monkeypatch, processor, ioc, failing_update) - assert len(results) == 1 and len(errors) == 0 - assert call_count[0] == 1 + monkeypatch.setattr(processor, "_update_channelfinder", failing_update) + result = processor._push_to_cf({}, [], make_ioc()) + assert result is False + assert call_count == 1 -class TestPerIocLocking: - def test_different_iocs_get_different_locks(self): - proc = make_processor() - lock_a = proc._get_ioc_lock(f"{_HOST_A}:5064") - lock_b = proc._get_ioc_lock(f"{_HOST_B}:5064") - assert lock_a is not lock_b + def test_gives_up_after_max_retries(self, monkeypatch): + monkeypatch.setattr(time, "sleep", lambda _: None) - def test_same_ioc_gets_same_lock(self): - proc = make_processor() - lock1 = proc._get_ioc_lock(f"{_HOST_A}:5064") - lock2 = proc._get_ioc_lock(f"{_HOST_A}:5064") - assert lock1 is lock2 + processor = make_processor() + processor.running = True + processor.cf_config.push_always_retry = False + processor.cf_config.push_max_retries = 2 - def test_commit_routes_to_correct_iocid(self, monkeypatch): - proc = make_processor() - routed_iocids = [] + call_count = 0 - def fake_lock_run(fn, transaction, iocid): - routed_iocids.append(iocid) - return defer.succeed(None) + def failing_update(record_info_by_name, records_to_delete, ioc_info): + nonlocal call_count + call_count += 1 + raise RequestException("CF unreachable") - lock = DeferredLock() - monkeypatch.setattr(lock, "run", fake_lock_run) - monkeypatch.setattr(proc, "_get_ioc_lock", lambda _iocid: lock) + monkeypatch.setattr(processor, "_update_channelfinder", failing_update) + result = processor._push_to_cf({}, [], make_ioc()) - proc.commit(make_transaction(_HOST_A, 5064)) - assert routed_iocids == [f"{_HOST_A}:5064"] + assert result is False + assert call_count == 2 - def test_prune_removes_state_after_ioc_disconnects(self): - proc = make_processor() - iocid = f"{_HOST_A}:5064" - proc._ioc_locks[iocid] = DeferredLock() - proc._cancelled[iocid] = False - proc._ioc_channels[iocid].add("CHAN:1") + def test_cancelled_flag_raises_cancelled_error(self): + processor = make_processor() + processor.running = True + processor.cancelled = True + ioc = make_ioc() + processor.iocs[ioc.id] = ioc - # iocid is NOT in proc.iocs → IOC has disconnected - proc._prune_ioc_state(None, iocid) + with pytest.raises(defer.CancelledError): + processor._push_to_cf({}, [], ioc) - assert iocid not in proc._ioc_locks - assert iocid not in proc._cancelled - assert iocid not in proc._ioc_channels - def test_prune_preserves_state_while_ioc_is_active(self): +class TestEvictIoc: + def test_evict_ioc_removes_all_tracking_state(self): proc = make_processor() - iocid = f"{_HOST_A}:5064" - proc._ioc_locks[iocid] = DeferredLock() - proc._cancelled[iocid] = False - proc.iocs[iocid] = make_ioc() + ioc = make_ioc(channelcount=2) + iocid = ioc.id + proc.iocs[iocid] = ioc + proc._ioc_channels[iocid].add("PV:1") + proc._ioc_channels[iocid].add("PV:2") + proc.channel_ioc_ids["PV:1"].append(iocid) + proc.channel_ioc_ids["PV:2"].append(iocid) - proc._prune_ioc_state(None, iocid) + proc._evict_ioc(iocid) - assert iocid in proc._ioc_locks - assert iocid in proc._cancelled + assert iocid not in proc.iocs + assert iocid not in proc._ioc_channels + assert "PV:1" not in proc.channel_ioc_ids + assert "PV:2" not in proc.channel_ioc_ids - def test_prune_passes_result_through(self): + def test_evict_ioc_is_noop_for_unknown_ioc(self): proc = make_processor() - iocid = f"{_HOST_A}:5064" - assert proc._prune_ioc_state("sentinel", iocid) == "sentinel" - - -class TestCommitErrorHandling: - """Test _commit_with_lock error routing without running real threads. + proc._evict_ioc("1.2.3.4:5064") # NOSONAR — must not raise - _prepare_commit is simulated via a monkeypatched deferToThread; the CF - push phase is controlled by patching _push_to_cf_async directly so each - test exercises exactly one failure mode at a time. - """ - - def _run(self, monkeypatch, *, prepare_result, push_result=None): + def test_commit_thread_evicts_ioc_on_push_failure(self, monkeypatch): + """On retry exhaustion for a connected transaction, IOC is evicted from memory.""" proc = make_processor() proc.running = True - iocid = f"{_HOST_A}:5064" - monkeypatch.setattr("recceiver.cf.processor.deferToThread", lambda *_a, **_kw: prepare_result) - if push_result is not None: - monkeypatch.setattr(proc, "_push_to_cf_async", lambda *_a, **_kw: push_result) + ioc = make_ioc(channelcount=1) + iocid = ioc.id - results, errors = [], [] - proc._commit_with_lock(make_transaction(), iocid).addCallbacks(results.append, errors.append) - return results, errors + # Pre-populate state as update_ioc_infos would have done + proc.iocs[iocid] = ioc + proc._ioc_channels[iocid].add("PV:1") + proc.channel_ioc_ids["PV:1"].append(iocid) - def test_aborted_commit_resolves_chain(self, monkeypatch): - """CFUpdateAbortedError from exhausted retries lets the chain continue.""" - ioc = make_ioc() - results, errors = self._run( - monkeypatch, - prepare_result=defer.succeed((ioc, {}, [], {}, {})), - push_result=defer.fail(CFUpdateAbortedError("retries exhausted")), - ) - assert len(results) == 1 and len(errors) == 0 + # Skip update_ioc_infos (state already set up) and fail the CF push + monkeypatch.setattr(proc, "update_ioc_infos", lambda *a: None) + monkeypatch.setattr(proc, "_push_to_cf", lambda *a: False) + monkeypatch.setattr(proc, "transaction_to_record_infos", lambda *a: {}) + monkeypatch.setattr(CFProcessor, "record_info_by_name", staticmethod(lambda *a: {})) - def test_unexpected_error_errbacks_chain(self, monkeypatch): - ioc = make_ioc() - results, errors = self._run( - monkeypatch, - prepare_result=defer.succeed((ioc, {}, [], {}, {})), - push_result=defer.fail(RuntimeError("unexpected")), - ) - assert len(results) == 0 and len(errors) == 1 - assert errors[0].check(RuntimeError) + t = make_transaction(_HOST_A, 5064) + t.connected = True + t.records_to_delete = [] + t.client_infos = {"IOCNAME": "TEST-IOC", "HOSTNAME": "test.host"} - def test_service_stopped_cancel_errbacks_chain(self, monkeypatch): - results, errors = self._run( - monkeypatch, - prepare_result=defer.fail(defer.CancelledError("service stopped")), - ) - assert len(results) == 0 and len(errors) == 1 - assert errors[0].check(defer.CancelledError) + with pytest.raises(defer.CancelledError): + proc._commit_with_thread(t) - def test_successful_commit_resolves_chain(self, monkeypatch): - ioc = make_ioc() - results, errors = self._run( - monkeypatch, - prepare_result=defer.succeed((ioc, {}, [], {}, {})), - push_result=defer.succeed(None), - ) - assert len(results) == 1 and len(errors) == 0 + assert iocid not in proc.iocs + assert iocid not in proc._ioc_channels + assert "PV:1" not in proc.channel_ioc_ids diff --git a/server/tests/unit/test_application.py b/server/tests/unit/test_application.py index 69a8c69..4668f73 100644 --- a/server/tests/unit/test_application.py +++ b/server/tests/unit/test_application.py @@ -1,3 +1,6 @@ +import logging +from unittest.mock import MagicMock, patch + from recceiver.application import RecService from recceiver.recast import CollectionSession @@ -14,3 +17,43 @@ def test_commit_size_limit_reads_from_config(self): def test_commit_size_limit_zero_disables_splitting(self): svc = RecService({"commitSizeLimit": "0"}) assert svc.commitSizeLimit == 0 + + +class TestLogStatus: + def _make_service_with_nactive(self, nactive): + svc = RecService({}) + svc.tcpFactory = MagicMock() + svc.tcpFactory.NActive = nactive + svc.tcpFactory.maxActive = 20 + svc.tcpFactory.Wait = [] + return svc + + def test_negative_nactive_logs_warning(self, caplog): + svc = self._make_service_with_nactive(-5) + with caplog.at_level(logging.WARNING, logger="recceiver.application"): + svc._logStatus() + assert any("NActive" in r.message and "-5" in r.message for r in caplog.records) + + def test_negative_nactive_clamps_to_zero_in_log(self, caplog): + svc = self._make_service_with_nactive(-5) + with caplog.at_level(logging.INFO, logger="recceiver.application"): + svc._logStatus() + status_lines = [r.message for r in caplog.records if "connections active" in r.message] + assert len(status_lines) == 1 + assert "connections active=0/20" in status_lines[0] + + def test_negative_nactive_clamps_to_zero_in_metrics(self): + import recceiver.metrics as m + + svc = self._make_service_with_nactive(-3) + mock_gauge = MagicMock() + with patch.object(m, "connections_active", mock_gauge): + svc._logStatus() + mock_gauge.set.assert_called_once_with(0) + + def test_normal_nactive_passes_through(self, caplog): + svc = self._make_service_with_nactive(5) + with caplog.at_level(logging.INFO, logger="recceiver.application"): + svc._logStatus() + status_lines = [r.message for r in caplog.records if "connections active" in r.message] + assert "connections active=5/20" in status_lines[0] diff --git a/server/tests/unit/test_clean_tool.py b/server/tests/unit/test_clean_tool.py new file mode 100644 index 0000000..b3e28d3 --- /dev/null +++ b/server/tests/unit/test_clean_tool.py @@ -0,0 +1,60 @@ +from unittest.mock import MagicMock + +from recceiver.cf.config import CFConfig +from recceiver.cf.model import CFChannel, CFProperty, CFPropertyName, PVStatus +from recceiver.clean_tool import run_clean + + +def _make_channel(name): + return CFChannel(name, "admin", [CFProperty(CFPropertyName.PV_STATUS.value, "admin", PVStatus.ACTIVE.value)]) + + +def _make_cfg(): + return CFConfig(base_url="http://cf", username="admin", recceiver_id="rec1") + + +class TestRunClean: + def _make_client(self, *batches): + """Return a mock client yielding `batches` in sequence, then empty.""" + client = MagicMock() + client.find_active_for_recceiver.side_effect = list(batches) + [[]] + return client + + def test_marks_all_active_channels_inactive(self): + channels = [_make_channel("PV:1"), _make_channel("PV:2")] + client = self._make_client(channels) + + count = run_clean(_make_cfg(), client=client) + + assert count == 2 + client.update_property.assert_called_once() + prop, names = client.update_property.call_args[0] + assert prop.value == PVStatus.INACTIVE.value + assert set(names) == {"PV:1", "PV:2"} + + def test_returns_zero_when_no_active_channels(self): + client = self._make_client() + + count = run_clean(_make_cfg(), client=client) + + assert count == 0 + client.update_property.assert_not_called() + + def test_dry_run_does_not_call_update(self): + channels = [_make_channel("PV:1")] + client = self._make_client(channels) + + count = run_clean(_make_cfg(), client=client, dry_run=True) + + assert count == 1 + client.update_property.assert_not_called() + + def test_sweeps_multiple_pages(self): + batch1 = [_make_channel("PV:1"), _make_channel("PV:2")] + batch2 = [_make_channel("PV:3")] + client = self._make_client(batch1, batch2) + + count = run_clean(_make_cfg(), client=client) + + assert count == 3 + assert client.update_property.call_count == 2 diff --git a/server/tests/unit/test_recast.py b/server/tests/unit/test_recast.py index 49ab45b..3389b93 100644 --- a/server/tests/unit/test_recast.py +++ b/server/tests/unit/test_recast.py @@ -3,7 +3,7 @@ from twisted.internet import defer from twisted.internet.address import IPv4Address -from recceiver.recast import CastFactory, CollectionSession +from recceiver.recast import CastFactory, CastReceiver, CollectionSession def _make_session() -> CollectionSession: @@ -78,6 +78,46 @@ def test_fifo_order_preserved_across_multiple_waiters(self): assert len(factory.Wait) == 1 assert factory.Wait[0] is p3 + def test_connection_lost_on_throttled_connection_does_not_raise(self): + """A waiting (inactive) connection that closes before promotion must not raise.""" + factory = self._make_factory(max_active=1) + proto = CastReceiver(active=False) + proto.factory = factory + factory.Wait.append(proto) # register as a waiting connection, as buildProtocol would + proto.sess = None # no session opened for inactive connections + + # Must not raise AttributeError: 'CastReceiver' object has no attribute '_ping_timer' + proto.connectionLost() + + assert factory.Wait == [] + + def test_nactive_not_decremented_twice_after_successful_upload(self): + """NActive must only drop by 1 across the full recvDone→connectionLost cycle.""" + factory = self._make_factory(max_active=2) + p1 = factory.buildProtocol(None) + assert factory.NActive == 1 + + # recvDone fires: frees the slot + factory.isDone(p1, active=True) + p1.active = False # recvDone clears the flag after isDone + assert factory.NActive == 0 + + # connectionLost fires: must be a no-op (active is now False, not in Wait) + factory.isDone(p1, active=False) + assert factory.NActive == 0 # must NOT go to -1 + + def test_is_done_inactive_proto_not_in_wait_is_no_op(self): + """isDone with active=False for a proto not in Wait must not raise or modify state.""" + factory = self._make_factory(max_active=1) + p1 = factory.buildProtocol(None) + + factory.isDone(p1, active=True) # NActive → 0 + assert factory.NActive == 0 + + # proto is not in Wait — must not raise ValueError from list.remove() + factory.isDone(p1, active=False) + assert factory.NActive == 0 + class TestCollectionSessionAbort: def test_disconnect_commit_runs_after_data_commit_cancels(self):