diff --git a/devservices/commands/foreground.py b/devservices/commands/foreground.py index 737a807..be2c0ca 100644 --- a/devservices/commands/foreground.py +++ b/devservices/commands/foreground.py @@ -117,7 +117,6 @@ def foreground(args: Namespace) -> None: # Run the process in foreground pty.spawn(argv) - except SupervisorProcessError as e: capture_exception(e) console.failure(f"Error stopping {program_name} in supervisor: {str(e)}") diff --git a/devservices/commands/status.py b/devservices/commands/status.py index 59db155..7f907f8 100644 --- a/devservices/commands/status.py +++ b/devservices/commands/status.py @@ -8,6 +8,7 @@ from argparse import ArgumentParser from argparse import Namespace from collections import namedtuple +from datetime import timedelta from typing import TypedDict from sentry_sdk import capture_exception @@ -19,6 +20,7 @@ from devservices.constants import DEVSERVICES_DEPENDENCIES_CACHE_DIR from devservices.constants import DEVSERVICES_DEPENDENCIES_CACHE_DIR_KEY from devservices.constants import DEVSERVICES_DIR_NAME +from devservices.constants import PROGRAMS_CONF_FILE_NAME from devservices.exceptions import ConfigError from devservices.exceptions import ConfigNotFoundError from devservices.exceptions import DependencyError @@ -37,6 +39,8 @@ from devservices.utils.state import ServiceRuntime from devservices.utils.state import State from devservices.utils.state import StateTables +from devservices.utils.supervisor import ProcessInfo +from devservices.utils.supervisor import SupervisorManager BASE_INDENTATION = " " @@ -99,8 +103,19 @@ def status(args: Namespace) -> None: console.warning(f"Status unavailable. {service.name} is not running standalone") return # Since exit(0) is captured as an internal_error by sentry + programs_config_path = os.path.join( + service.repo_path, f"{DEVSERVICES_DIR_NAME}/{PROGRAMS_CONF_FILE_NAME}" + ) + process_statuses = {} + if os.path.exists(programs_config_path): + supervisor_manager = SupervisorManager( + programs_config_path, + service.name, + ) + process_statuses = supervisor_manager.get_all_process_info() + try: - status_tree = get_status_for_service(service) + status_tree = get_status_for_service(service, process_statuses) except DependencyError as de: capture_exception(de) console.failure( @@ -114,7 +129,9 @@ def status(args: Namespace) -> None: console.info(status_tree) -def get_status_for_service(service: Service) -> str: +def get_status_for_service( + service: Service, process_statuses: dict[str, ProcessInfo] +) -> str: state = State() modes = service.config.modes @@ -141,7 +158,10 @@ def get_status_for_service(service: Service) -> str: docker_compose_service_to_status = parse_docker_compose_status(status_json_results) status_tree = generate_service_status_tree( - service.name, dependency_graph, docker_compose_service_to_status + service.name, + process_statuses, + dependency_graph, + docker_compose_service_to_status, ) return status_tree @@ -188,6 +208,7 @@ def get_status_json_results( def generate_service_status_tree( service_name: str, + process_statuses: dict[str, ProcessInfo], dependency_graph: DependencyGraph, docker_compose_service_to_status: dict[str, ServiceStatusOutput], indentation: str = "", @@ -227,6 +248,7 @@ def generate_service_status_tree( output.append( process_service_with_containerized_runtime( dependency, + process_statuses, docker_compose_service_to_status, indentation + BASE_INDENTATION, dependency_graph, @@ -261,6 +283,7 @@ def process_service_with_local_runtime( def process_service_with_containerized_runtime( dependency: DependencyNode, + process_statuses: dict[str, ProcessInfo], docker_compose_service_to_status: dict[str, ServiceStatusOutput], indentation: str, dependency_graph: DependencyGraph, @@ -268,13 +291,14 @@ def process_service_with_containerized_runtime( if len(dependency_graph.graph[dependency]) > 0: return generate_service_status_tree( dependency.name, + process_statuses, dependency_graph, docker_compose_service_to_status, indentation, ) else: return generate_service_status_details( - dependency, docker_compose_service_to_status, indentation + dependency, process_statuses, docker_compose_service_to_status, indentation ) @@ -301,16 +325,23 @@ def parse_docker_compose_status( def generate_service_status_details( dependency: DependencyNode, + process_statuses: dict[str, ProcessInfo], docker_compose_service_to_status: dict[str, ServiceStatusOutput], indentation: str, ) -> str: output = [f"{indentation}{Color.BOLD}{dependency.name}{Color.RESET}:"] + # Handle supervisor dependencies + if dependency.dependency_type == DependencyType.SUPERVISOR: + return generate_supervisor_status_details( + dependency, process_statuses, indentation + ) + if dependency.name not in docker_compose_service_to_status: return "\n".join( [ *output, - f"{indentation}{BASE_INDENTATION}Type: container", + (f"{indentation}{BASE_INDENTATION}Type: container"), f"{indentation}{BASE_INDENTATION}Status: N/A", ] ) @@ -349,7 +380,7 @@ def handle_started_service(dependency: DependencyNode, indentation: str) -> str: f"{indentation}{BASE_INDENTATION}Runtime: local", ] ) - service_output = get_status_for_service(service_with_local_runtime) + service_output = get_status_for_service(service_with_local_runtime, {}) return "\n".join( [f"{indentation}{line}" for line in service_output.splitlines()], ) @@ -365,3 +396,56 @@ def format_health(health: str) -> str: else Color.YELLOW ) return f"{color}{health}{Color.RESET}" + + +def generate_supervisor_status_details( + dependency: DependencyNode, + process_statuses: dict[str, ProcessInfo], + indentation: str, +) -> str: + """Generate status details for supervisor dependencies.""" + output = [f"{indentation}{Color.BOLD}{dependency.name}{Color.RESET}:"] + + process_info = process_statuses.get(dependency.name) + + if process_info is None: + return "\n".join( + [ + *output, + f"{indentation}{BASE_INDENTATION}Type: process", + f"{indentation}{BASE_INDENTATION}Status: N/A (process not found)", + ] + ) + + uptime_str = format_uptime(process_info["uptime"]) + + details = [ + "Type: process", + f"Status: {process_info['state_name'].lower()}", + f"PID: {process_info['pid'] if process_info['pid'] > 0 else 'N/A'}", + f"Uptime: {uptime_str}", + ] + + output.extend(f"{indentation}{BASE_INDENTATION}{detail}" for detail in details) + + return "\n".join(output) + + +def format_uptime(uptime_seconds: int) -> str: + """Format uptime seconds into a human-readable string.""" + SECONDS_PER_MINUTE = 60 + SECONDS_PER_HOUR = 60 * SECONDS_PER_MINUTE + + td = timedelta(seconds=uptime_seconds) + days = td.days + hours, remainder = divmod(td.seconds, SECONDS_PER_HOUR) + minutes, seconds = divmod(remainder, SECONDS_PER_MINUTE) + + if days > 0: + return f"{days}d {hours}h {minutes}m {seconds}s" + elif hours > 0: + return f"{hours}h {minutes}m {seconds}s" + elif minutes > 0: + return f"{minutes}m {seconds}s" + else: + return f"{seconds}s" diff --git a/devservices/utils/supervisor.py b/devservices/utils/supervisor.py index a831c61..de2c1d8 100644 --- a/devservices/utils/supervisor.py +++ b/devservices/utils/supervisor.py @@ -8,6 +8,7 @@ import time import xmlrpc.client from enum import IntEnum +from typing import TypedDict from sentry_sdk import capture_exception from supervisor.options import ServerOptions @@ -76,6 +77,20 @@ def make_connection( return UnixSocketHTTPConnection(self.socket_path) +class ProcessInfo(TypedDict): + """Status information for a supervisor process.""" + + name: str + state: int + state_name: str + description: str + pid: int + uptime: int + start_time: int + stop_time: int + group: str + + class SupervisorManager: def __init__(self, config_file_path: str, service_name: str) -> None: self.service_name = service_name @@ -276,3 +291,60 @@ def tail_program_logs(self, program_name: str) -> None: raise SupervisorError(f"Failed to tail logs for {program_name}: {str(e)}") except KeyboardInterrupt: pass + + def get_all_process_info(self) -> dict[str, ProcessInfo]: + """Get status information for all supervisor programs.""" + # Check if supervisor client is up first, return empty list if down + try: + client = self._get_rpc_client() + client.supervisor.getState() + except ( + xmlrpc.client.Fault, + SupervisorConnectionError, + socket.error, + ConnectionRefusedError, + ): + return {} + + try: + all_process_info = client.supervisor.getAllProcessInfo() + + # Validate that the response is a list before iterating for typechecking + if not isinstance(all_process_info, list): + return {} + + except xmlrpc.client.Fault as e: + raise SupervisorError(f"Failed to get programs status: {e.faultString}") + + process_statuses: dict[str, ProcessInfo] = {} + for process_info in all_process_info: + if not isinstance(process_info, dict): + continue + + # Extract basic fields with defaults + name = process_info.get("name", "") + state = process_info.get("state", SupervisorProcessState.UNKNOWN) + state_name = SupervisorProcessState(state).name + description = process_info.get("description", "") + pid = process_info.get("pid", 0) + group = process_info.get("group", "") + + # Calculate uptime for running processes + start_time = process_info.get("start", 0) + now = process_info.get("now", 0) + uptime = max(0, now - start_time) if start_time > 0 and now > 0 else 0 + + process_status: ProcessInfo = { + "name": name, + "state": state, + "state_name": state_name, + "description": description, + "pid": pid, + "uptime": uptime, + "start_time": start_time, + "stop_time": process_info.get("stop", 0), + "group": group, + } + process_statuses[name] = process_status + + return process_statuses diff --git a/tests/commands/test_status.py b/tests/commands/test_status.py index 52a577b..3478204 100644 --- a/tests/commands/test_status.py +++ b/tests/commands/test_status.py @@ -8,8 +8,10 @@ import pytest +from devservices.commands.status import format_uptime from devservices.commands.status import generate_service_status_details from devservices.commands.status import generate_service_status_tree +from devservices.commands.status import generate_supervisor_status_details from devservices.commands.status import get_status_json_results from devservices.commands.status import handle_started_service from devservices.commands.status import parse_docker_compose_status @@ -30,8 +32,11 @@ from devservices.utils.services import Service from devservices.utils.state import State from devservices.utils.state import StateTables +from devservices.utils.supervisor import ProcessInfo +from devservices.utils.supervisor import SupervisorProcessState from testing.utils import create_config_file from testing.utils import create_mock_git_repo +from testing.utils import create_programs_conf_file from testing.utils import run_git_command @@ -179,8 +184,9 @@ def test_generate_service_status_details() -> None: ], } } + process_statuses: dict[str, ProcessInfo] = {} result = generate_service_status_details( - dependency, docker_compose_service_to_status, "" + dependency, process_statuses, docker_compose_service_to_status, "" ) assert result == ( f"{Color.BOLD}test-service{Color.RESET}:\n" @@ -200,8 +206,9 @@ def test_generate_service_status_details_missing_status() -> None: dependency_type=DependencyType.SERVICE, ) docker_compose_service_to_status: dict[str, ServiceStatusOutput] = {} + process_statuses: dict[str, ProcessInfo] = {} result = generate_service_status_details( - dependency, docker_compose_service_to_status, "" + dependency, process_statuses, docker_compose_service_to_status, "" ) assert result == ( f"{Color.BOLD}test-service{Color.RESET}:\n" @@ -249,6 +256,7 @@ def test_generate_service_status_tree_no_child_service( } result = generate_service_status_tree( "parent-service", + {}, dependency_graph, docker_compose_service_to_status, "", @@ -333,6 +341,7 @@ def test_generate_service_status_tree_with_child_service( } result = generate_service_status_tree( "parent-service", + {}, dependency_graph, docker_compose_service_to_status, "", @@ -448,6 +457,7 @@ def test_generate_service_status_tree_with_nested_child_services( } result = generate_service_status_tree( "grandparent-service", + {}, dependency_graph, docker_compose_service_to_status, "", @@ -852,3 +862,315 @@ def test_status_service_not_running( captured = capsys.readouterr() assert "Status unavailable. test-service is not running standalone" in captured.out + + +def test_generate_supervisor_status_details_running_program() -> None: + dependency = DependencyNode( + name="test-program", + dependency_type=DependencyType.SUPERVISOR, + ) + process_statuses: dict[str, ProcessInfo] = { + "test-program": { + "name": "test-program", + "state": SupervisorProcessState.RUNNING, # RUNNING + "state_name": SupervisorProcessState.RUNNING.name, + "description": "Test program description", + "pid": 12345, + "uptime": 3661, # 1 hour, 1 minute, 1 second + "start_time": 1234567890, + "stop_time": 0, + "group": "test-group", + } + } + + result = generate_supervisor_status_details(dependency, process_statuses, "") + + assert result == ( + f"{Color.BOLD}test-program{Color.RESET}:\n" + " Type: process\n" + " Status: running\n" + " PID: 12345\n" + " Uptime: 1h 1m 1s" + ) + + +def test_generate_supervisor_status_details_stopped_program() -> None: + """Test supervisor status details for a stopped program.""" + dependency = DependencyNode( + name="stopped-program", + dependency_type=DependencyType.SUPERVISOR, + ) + process_statuses: dict[str, ProcessInfo] = { + "stopped-program": { + "name": "stopped-program", + "state": SupervisorProcessState.STOPPED, # STOPPED + "state_name": SupervisorProcessState.STOPPED.name, + "description": "", + "pid": 0, + "uptime": 0, + "start_time": 0, + "stop_time": 1234567890, + "group": "", + } + } + + result = generate_supervisor_status_details(dependency, process_statuses, " ") + + assert result == ( + f" {Color.BOLD}stopped-program{Color.RESET}:\n" + " Type: process\n" + " Status: stopped\n" + " PID: N/A\n" + " Uptime: 0s" + ) + + +def test_generate_supervisor_status_details_program_not_found() -> None: + """Test supervisor status details when program is not found.""" + dependency = DependencyNode( + name="missing-program", + dependency_type=DependencyType.SUPERVISOR, + ) + process_statuses: dict[str, ProcessInfo] = { + "other-program": { + "name": "other-program", + "state": SupervisorProcessState.RUNNING, + "state_name": SupervisorProcessState.RUNNING.name, + "description": "", + "pid": 12345, + "uptime": 100, + "start_time": 1234567890, + "stop_time": 0, + "group": "test", + } + } + + result = generate_supervisor_status_details(dependency, process_statuses, "") + + assert result == ( + f"{Color.BOLD}missing-program{Color.RESET}:\n" + " Type: process\n" + " Status: N/A (process not found)" + ) + + +def test_generate_supervisor_status_details_empty_programs_list() -> None: + """Test supervisor status details with empty programs list.""" + dependency = DependencyNode( + name="test-program", + dependency_type=DependencyType.SUPERVISOR, + ) + process_statuses: dict[str, ProcessInfo] = {} + + result = generate_supervisor_status_details(dependency, process_statuses, "") + + assert result == ( + f"{Color.BOLD}test-program{Color.RESET}:\n" + " Type: process\n" + " Status: N/A (process not found)" + ) + + +def test_generate_service_status_details_supervisor_dependency() -> None: + """Test that supervisor dependencies are handled correctly in generate_service_status_details.""" + dependency = DependencyNode( + name="test-supervisor-program", + dependency_type=DependencyType.SUPERVISOR, + ) + process_statuses: dict[str, ProcessInfo] = { + "test-supervisor-program": { + "name": "test-supervisor-program", + "state": SupervisorProcessState.RUNNING, + "state_name": SupervisorProcessState.RUNNING.name, + "description": "Test supervisor program", + "pid": 54321, + "uptime": 120, + "start_time": 1234567890, + "stop_time": 0, + "group": "supervisor-group", + } + } + docker_compose_service_to_status: dict[str, ServiceStatusOutput] = {} + + result = generate_service_status_details( + dependency, process_statuses, docker_compose_service_to_status, "" + ) + + assert result == ( + f"{Color.BOLD}test-supervisor-program{Color.RESET}:\n" + " Type: process\n" + " Status: running\n" + " PID: 54321\n" + " Uptime: 2m 0s" + ) + + +def test_format_uptime_zero_seconds() -> None: + """Test format_uptime with zero seconds.""" + assert format_uptime(0) == "0s" + + +def test_format_uptime_seconds_only() -> None: + """Test format_uptime with seconds only.""" + assert format_uptime(30) == "30s" + + +def test_format_uptime_minutes_and_seconds() -> None: + """Test format_uptime with minutes and seconds.""" + assert format_uptime(90) == "1m 30s" # 1 minute 30 seconds + + +def test_format_uptime_hours_minutes_seconds() -> None: + """Test format_uptime with hours, minutes, and seconds.""" + assert format_uptime(3661) == "1h 1m 1s" # 1 hour 1 minute 1 second + + +def test_format_uptime_days_hours_minutes_seconds() -> None: + """Test format_uptime with days, hours, minutes, and seconds.""" + assert format_uptime(90061) == "1d 1h 1m 1s" # 1 day 1 hour 1 minute 1 second + + +def test_format_uptime_exact_hour() -> None: + """Test format_uptime with exact hour.""" + assert format_uptime(3600) == "1h 0m 0s" + + +def test_format_uptime_exact_day() -> None: + """Test format_uptime with exact day.""" + assert format_uptime(86400) == "1d 0h 0m 0s" + + +def test_format_uptime_large_values() -> None: + """Test format_uptime with large values.""" + # 10 days, 5 hours, 30 minutes, 45 seconds + uptime = 10 * 86400 + 5 * 3600 + 30 * 60 + 45 + assert format_uptime(uptime) == "10d 5h 30m 45s" + + +@mock.patch("devservices.commands.status.SupervisorManager") +@mock.patch("devservices.commands.status.get_status_json_results") +def test_status_with_supervisor_programs( + mock_get_status_json_results: mock.Mock, + mock_supervisor_manager: mock.Mock, + capsys: pytest.CaptureFixture[str], + tmp_path: Path, +) -> None: + with ( + mock.patch("devservices.utils.state.STATE_DB_FILE", str(tmp_path / "state")), + mock.patch( + "devservices.utils.state.DEVSERVICES_LOCAL_DIR", str(tmp_path / "local") + ), + mock.patch( + "devservices.commands.status.DEVSERVICES_DEPENDENCIES_CACHE_DIR", + str(tmp_path / "dependency-dir"), + ), + mock.patch( + "devservices.utils.services.get_coderoot", + return_value=str(tmp_path / "code"), + ), + ): + state = State() + state.update_service_entry( + "test-service", "default", StateTables.STARTED_SERVICES + ) + + test_service_repo_path = tmp_path / "code" / "test-service" + create_mock_git_repo("blank_repo", test_service_repo_path) + + config = { + "x-sentry-service-config": { + "version": 0.1, + "service_name": "test-service", + "dependencies": { + "clickhouse": {"description": "Clickhouse"}, + "worker": {"description": "Background worker"}, + }, + "modes": {"default": ["clickhouse", "worker"]}, + }, + "services": { + "clickhouse": { + "image": "altinity/clickhouse-server:23.8.11.29.altinitystable" + }, + }, + } + create_config_file(test_service_repo_path, config) + + supervisor_program_config = """ +[program:worker] +command=python worker.py +autostart=false +autorestart=true +""" + + create_programs_conf_file(test_service_repo_path, supervisor_program_config) + + # Commit the config files so find_matching_service can discover them + run_git_command(["add", "."], cwd=test_service_repo_path) + run_git_command(["commit", "-m", "Add config"], cwd=test_service_repo_path) + + # Mock supervisor programs status + mock_process_statuses: dict[str, ProcessInfo] = { + "worker": { + "name": "worker", + "state": SupervisorProcessState.RUNNING, + "state_name": "RUNNING", + "description": "Background worker process", + "pid": 12345, + "uptime": 3600, # 1 hour + "start_time": 1234567890, + "stop_time": 0, + "group": "workers", + }, + } + + # Mock docker compose status for clickhouse (matching the config) + mock_docker_status = [ + subprocess.CompletedProcess( + args=[], + returncode=0, + stdout='{"Service": "clickhouse", "State": "running", "Health": "healthy", "Name": "test-service-clickhouse-1", "RunningFor": "2 hours ago", "Publishers": [{"URL": "127.0.0.1", "PublishedPort": 9000, "TargetPort": 9000, "Protocol": "tcp"}]}\n', + stderr="", + ) + ] + + # Set up mocks + mock_get_status_json_results.return_value = mock_docker_status + mock_supervisor_manager.return_value.get_all_process_info.return_value = ( + mock_process_statuses + ) + + # Change to service directory so find_matching_service can find the config + original_cwd = os.getcwd() + os.chdir(test_service_repo_path) + + try: + # Call the status function + args = Namespace(service_name="test-service") + status(args) + finally: + os.chdir(original_cwd) + + # Verify the output + captured = capsys.readouterr() + output = captured.out + + # Assert on the entire expected output block + expected_output = ( + f"{Color.BOLD}test-service{Color.RESET}:\n" + " Type: service\n" + " Runtime: local\n" + f" {Color.BOLD}clickhouse{Color.RESET}:\n" + " Type: container\n" + " Status: running\n" + f" Health: {Color.GREEN}healthy{Color.RESET}\n" + " Container: test-service-clickhouse-1\n" + " Uptime: 2 hours ago\n" + " Ports:\n" + " 127.0.0.1:9000 -> 9000/tcp\n" + f" {Color.BOLD}worker{Color.RESET}:\n" + " Type: process\n" + " Status: running\n" + " PID: 12345\n" + " Uptime: 1h 0m 0s\n" + ) + assert output == expected_output diff --git a/tests/utils/test_supervisor.py b/tests/utils/test_supervisor.py index 8ac8502..e2de9f2 100644 --- a/tests/utils/test_supervisor.py +++ b/tests/utils/test_supervisor.py @@ -365,7 +365,7 @@ def test_get_program_command_program_not_found( @mock.patch("devservices.utils.supervisor.subprocess.run") @mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy") -def tail_program_logs_success( +def test_tail_program_logs_success( mock_rpc_client: mock.MagicMock, mock_subprocess_run: mock.MagicMock, supervisor_manager: SupervisorManager, @@ -379,7 +379,8 @@ def tail_program_logs_success( "supervisorctl", "-c", supervisor_manager.config_file_path, - "fg", + "tail", + "-f", "test_program", ], check=True, @@ -513,3 +514,105 @@ def test_wait_for_supervisor_ready_timeout( assert mock_sleep.call_count == 5 assert mock_rpc_client.return_value.supervisor.getState.call_count == 5 + + +@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy") +def test_get_all_process_info_success( + mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager +) -> None: + """Test successful retrieval of all programs status.""" + mock_process_info = [ + { + "name": "program1", + "state": SupervisorProcessState.RUNNING, + "description": "Running program", + "pid": 1234, + "group": "default", + "start": 1000, + "now": 1100, + "stop": 0, + }, + { + "name": "program2", + "state": SupervisorProcessState.STOPPED, + "description": "Stopped program", + "pid": 0, + "group": "workers", + "start": 0, + "now": 1100, + "stop": 1050, + }, + ] + mock_rpc_client.return_value.supervisor.getAllProcessInfo.return_value = ( + mock_process_info + ) + + result = supervisor_manager.get_all_process_info() + + assert len(result) == 2 + + expected_results = { + "program1": { + "name": "program1", + "state": SupervisorProcessState.RUNNING, + "state_name": "RUNNING", + "description": "Running program", + "pid": 1234, + "group": "default", + "uptime": 100, # 1100 - 1000 + "start_time": 1000, + "stop_time": 0, + }, + "program2": { + "name": "program2", + "state": SupervisorProcessState.STOPPED, + "state_name": "STOPPED", + "description": "Stopped program", + "pid": 0, + "group": "workers", + "uptime": 0, # No uptime for stopped process + "start_time": 0, + "stop_time": 1050, + }, + } + + for expected, actual in zip(expected_results, result): + assert actual == expected + + +@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy") +def test_get_all_process_info_empty_list( + mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager +) -> None: + """Test handling of empty programs list.""" + mock_rpc_client.return_value.supervisor.getAllProcessInfo.return_value = [] + + result = supervisor_manager.get_all_process_info() + + assert result == {} + + +@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy") +def test_get_all_process_info_xmlrpc_fault( + mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager +) -> None: + mock_rpc_client.return_value.supervisor.getAllProcessInfo.side_effect = ( + xmlrpc.client.Fault(1, "Test error") + ) + + with pytest.raises( + SupervisorError, match="Failed to get programs status: Test error" + ): + supervisor_manager.get_all_process_info() + + +@mock.patch("devservices.utils.supervisor.xmlrpc.client.ServerProxy") +def test_get_all_process_info_connection_error( + mock_rpc_client: mock.MagicMock, supervisor_manager: SupervisorManager +) -> None: + mock_rpc_client.side_effect = SupervisorConnectionError("Connection failed") + + result = supervisor_manager.get_all_process_info() + + # Should return empty dict when supervisor is not running + assert result == {}