diff --git a/examples/verification/tst04_console_HV.py b/examples/verification/tst04_console_HV.py index cf2fd7a7..08a2a37a 100644 --- a/examples/verification/tst04_console_HV.py +++ b/examples/verification/tst04_console_HV.py @@ -340,6 +340,8 @@ def connect_device(self) -> None: ext_power_supply=self.use_external_power, TX_test_mode=self.hw_simulate, HV_test_mode=self.hw_simulate, + voltage_table_selection="evt0", + sequence_time_selection="stress_test" ) tx_connected, hv_connected = self.interface.is_device_connected() @@ -366,6 +368,8 @@ def connect_device(self) -> None: ext_power_supply=self.use_external_power, TX_test_mode=self.hw_simulate, HV_test_mode=self.hw_simulate, + voltage_table_selection="evt0", + sequence_time_selection="stress_test" ) tx_connected, hv_connected = self.interface.is_device_connected() diff --git a/examples/verification/tst05_thermal_stress.py b/examples/verification/tst05_thermal_stress.py index 50e1f84e..b6c74372 100644 --- a/examples/verification/tst05_thermal_stress.py +++ b/examples/verification/tst05_thermal_stress.py @@ -309,6 +309,8 @@ def connect_device(self) -> None: ext_power_supply=self.use_external_power, TX_test_mode=self.hw_simulate, HV_test_mode=self.hw_simulate, + voltage_table_selection="evt0", + sequence_time_selection="stress_test" ) tx_connected, hv_connected = self.interface.is_device_connected() @@ -335,6 +337,8 @@ def connect_device(self) -> None: ext_power_supply=self.use_external_power, TX_test_mode=self.hw_simulate, HV_test_mode=self.hw_simulate, + voltage_table_selection="evt0", + sequence_time_selection="stress_test" ) tx_connected, hv_connected = self.interface.is_device_connected() diff --git a/examples/verification/tst06_2x_burn_in.py b/examples/verification/tst06_2x_burn_in.py index 355af8ce..555a4042 100644 --- a/examples/verification/tst06_2x_burn_in.py +++ b/examples/verification/tst06_2x_burn_in.py @@ -425,6 +425,8 @@ def connect_device(self) -> None: ext_power_supply=self.use_external_power, TX_test_mode=self.hw_simulate, HV_test_mode=self.hw_simulate, + voltage_table_selection="evt0", + sequence_time_selection="stress_test" ) tx_connected, hv_connected = self.interface.is_device_connected() @@ -451,6 +453,8 @@ def connect_device(self) -> None: ext_power_supply=self.use_external_power, TX_test_mode=self.hw_simulate, HV_test_mode=self.hw_simulate, + voltage_table_selection="evt0", + sequence_time_selection="stress_test" ) tx_connected, hv_connected = self.interface.is_device_connected() diff --git a/src/openlifu/io/LIFUInterface.py b/src/openlifu/io/LIFUInterface.py index 30a6f7a9..db60732f 100644 --- a/src/openlifu/io/LIFUInterface.py +++ b/src/openlifu/io/LIFUInterface.py @@ -15,17 +15,31 @@ from openlifu.io.LIFUUart import LIFUUart from openlifu.plan.solution import Solution -REF_MAX_SEQUENCE_TIMES = [2*60, 5*60, 20*60] +REF_MAX_SEQUENCE_TIMES = { + "default": [2*60, 5*60, 10*60], # users to use default values + "stress_test": [5*60, 9*60, 15*60] # QA to use stress test values +} + REF_MAX_DUTY_CYCLES = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5] -MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME = [ - [65, 65, 65], # 0.05 - [65, 65, 50], # 0.1 - [50, 40, 35], # 0.2 - [45, 35, 30], # 0.3 - [35, 30, 25], # 0.4 - [30, 25, 20] # 0.5 - ] +MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME = { + "evt2": [ + [45, 45, 45], # 0.05 + [40, 40, 40], # 0.1 + [40, 40, 35], # 0.2 + [40, 35, 30], # 0.3 + [35, 30, 25], # 0.4 + [30, 25, 20] # 0.5 + ], + "evt0": [ + [65, 65, 65], # 0.05 + [65, 65, 50], # 0.1 + [50, 40, 35], # 0.2 + [45, 35, 30], # 0.3 + [35, 30, 25], # 0.4 + [30, 25, 20] # 0.5 + ], +} class LIFUInterfaceStatus(Enum): STATUS_COMMS_ERROR = -1 @@ -58,7 +72,9 @@ def __init__(self, HV_test_mode: bool = False, run_async: bool = False, ext_power_supply: bool = False, - module_invert: bool | List[bool] = False) -> None: + module_invert: bool | List[bool] = False, + voltage_table_selection: Optional[str] = None, + sequence_time_selection: Optional[str] = None) -> None: """ Initialize the LIFUInterface with given parameters and store them in the class. @@ -83,6 +99,11 @@ def __init__(self, self._hv_uart = None self.status = LIFUInterfaceStatus.STATUS_SYS_OFF + self.voltage_table = None + self.sequence_time = None + self.voltage_table_selection = voltage_table_selection + self.sequence_time_selection = sequence_time_selection + # Create a TXDevice instance as part of the interface logger.debug("Initializing TX Module of LIFUInterface with VID: %s, PID: %s, baudrate: %s, timeout: %s", vid, tx_pid, baudrate, timeout) self._tx_uart = LIFUUart(vid=vid, pid=tx_pid, baudrate=baudrate, timeout=timeout, desc="TX", demo_mode=TX_test_mode, async_mode=run_async) @@ -97,7 +118,6 @@ def __init__(self, self._hv_uart = LIFUUart(vid=vid, pid=con_pid, baudrate=baudrate, timeout=timeout, desc="HV", demo_mode=HV_test_mode, async_mode=run_async) self.hvcontroller = HVController(uart=self._hv_uart) - # Connect signals to internal handlers if self._async_mode: self._tx_uart.signal_connect.connect(self.signal_connect.emit) @@ -107,6 +127,31 @@ def __init__(self, self._hv_uart.signal_disconnect.connect(self.signal_disconnect.emit) self._hv_uart.signal_data_received.connect(self.signal_data_received.emit) + # Temporary fix for hardware variations between EVT0 and EVT2 + def _resolve_voltage_chart_evt_version(self, voltage_table: str) -> list[list[int]]: + if voltage_table is None: + try: + evt_version = "evt0" if self.hvcontroller.get_version().startswith("v1.1") else "evt2" + except Exception as e: + logger.error("Error getting console version: %s", e) + raise e + else: + evt_version = voltage_table.lower() + if evt_version not in MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME: + raise ValueError(f"Invalid voltage_table option '{voltage_table}'. Valid options are: {tuple(MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME.keys())}") + + return MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME[evt_version] + + # Restrict sequence time options for users vs QA + def _resolve_max_sequence_time_set(self, sequence_time: str) -> list[int]: + if sequence_time is None: + return REF_MAX_SEQUENCE_TIMES["default"] + else: + sequence_time = sequence_time.lower() + if sequence_time not in REF_MAX_SEQUENCE_TIMES: + raise ValueError(f"Invalid sequence_time option '{sequence_time}'. Valid options are: {tuple(REF_MAX_SEQUENCE_TIMES.keys())}") + return REF_MAX_SEQUENCE_TIMES[sequence_time] + async def start_monitoring(self, interval: int = 1) -> None: """Start monitoring for USB device connections.""" try: @@ -168,11 +213,11 @@ def get_max_voltage(self, solution: Solution | Dict) -> float: duty_cycle_index = np.where(duty_cycles_limits >= sequence_duty_cycle)[0][0] # Find the index of the duration in the reference list - duration_limits = np.array(REF_MAX_SEQUENCE_TIMES) + duration_limits = np.array(self.sequence_time) duration_index = np.where(duration_limits >= sequence_duration)[0][0] # Return the maximum voltage for the given duty cycle and duration - return MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME[duty_cycle_index][duration_index] + return self.voltage_table[duty_cycle_index][duration_index] def get_max_voltage_table(self) -> pd.DataFrame: """ @@ -184,10 +229,10 @@ def get_max_voltage_table(self) -> pd.DataFrame: data = { "Duty Cycle (%)": [f"<={100 * dc:0.1f}%" for dc in REF_MAX_DUTY_CYCLES], } - for i, duration in enumerate(REF_MAX_SEQUENCE_TIMES): + for i, duration in enumerate(self.sequence_time): col_name = f"<={duration // 60} min" data[col_name] = [ - MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME[j][i] for j in range(len(REF_MAX_DUTY_CYCLES)) + self.voltage_table[j][i] for j in range(len(REF_MAX_DUTY_CYCLES)) ] max_voltage = pd.DataFrame(data).set_index("Duty Cycle (%)") max_voltage.Name = "Maximum Voltage (V)" @@ -205,6 +250,8 @@ def check_solution(self, solution: Solution | Dict) -> None: if isinstance(solution, Solution): solution = solution.to_dict() + self.voltage_table = self._resolve_voltage_chart_evt_version(self.voltage_table_selection) + self.sequence_time = self._resolve_max_sequence_time_set(self.sequence_time_selection) sequence_duty_cycle = self.get_sequence_duty_cycle(solution) duty_cycles_limits = np.array(REF_MAX_DUTY_CYCLES) if sequence_duty_cycle > duty_cycles_limits.max(): @@ -212,12 +259,12 @@ def check_solution(self, solution: Solution | Dict) -> None: duty_cycle_index = np.where(duty_cycles_limits >= sequence_duty_cycle)[0][0] sequence_duration = self.get_sequence_duration(solution) - duration_limits = np.array(REF_MAX_SEQUENCE_TIMES) + duration_limits = np.array(self.sequence_time) if sequence_duration > duration_limits.max(): raise ValueError(f"Sequence duration ({sequence_duration:0.0f} s) exceeds maximum allowed duration ({duration_limits.max()} s).") duration_index = np.where(duration_limits >= sequence_duration)[0][0] - max_voltage = MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME[duty_cycle_index][duration_index] + max_voltage = self.voltage_table[duty_cycle_index][duration_index] if solution['voltage'] > max_voltage: raise ValueError(f"Voltage ({solution['voltage']:0.1f}V) exceeds maximum allowed voltage ({max_voltage:0.1f}V) for duty cycle ({100*sequence_duty_cycle:0.1f} <= {100*duty_cycles_limits[duty_cycle_index]}%) and sequence time ({sequence_duration:0.0f} <= {duration_limits[duration_index]}s).")