Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/verification/tst04_console_HV.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ def connect_device(self) -> None:
ext_power_supply=self.use_external_power,
TX_test_mode=self.hw_simulate,
HV_test_mode=self.hw_simulate,
voltage_table_selection="evt0",
sequence_time_selection="stress_test"
)
tx_connected, hv_connected = self.interface.is_device_connected()

Expand All @@ -366,6 +368,8 @@ def connect_device(self) -> None:
ext_power_supply=self.use_external_power,
TX_test_mode=self.hw_simulate,
HV_test_mode=self.hw_simulate,
voltage_table_selection="evt0",
sequence_time_selection="stress_test"
)
tx_connected, hv_connected = self.interface.is_device_connected()

Expand Down
4 changes: 4 additions & 0 deletions examples/verification/tst05_thermal_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ def connect_device(self) -> None:
ext_power_supply=self.use_external_power,
TX_test_mode=self.hw_simulate,
HV_test_mode=self.hw_simulate,
voltage_table_selection="evt0",
sequence_time_selection="stress_test"
)
tx_connected, hv_connected = self.interface.is_device_connected()

Expand All @@ -335,6 +337,8 @@ def connect_device(self) -> None:
ext_power_supply=self.use_external_power,
TX_test_mode=self.hw_simulate,
HV_test_mode=self.hw_simulate,
voltage_table_selection="evt0",
sequence_time_selection="stress_test"
)
tx_connected, hv_connected = self.interface.is_device_connected()

Expand Down
4 changes: 4 additions & 0 deletions examples/verification/tst06_2x_burn_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ def connect_device(self) -> None:
ext_power_supply=self.use_external_power,
TX_test_mode=self.hw_simulate,
HV_test_mode=self.hw_simulate,
voltage_table_selection="evt0",
sequence_time_selection="stress_test"
)
tx_connected, hv_connected = self.interface.is_device_connected()

Expand All @@ -451,6 +453,8 @@ def connect_device(self) -> None:
ext_power_supply=self.use_external_power,
TX_test_mode=self.hw_simulate,
HV_test_mode=self.hw_simulate,
voltage_table_selection="evt0",
sequence_time_selection="stress_test"
)
tx_connected, hv_connected = self.interface.is_device_connected()

Expand Down
81 changes: 64 additions & 17 deletions src/openlifu/io/LIFUInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,31 @@
from openlifu.io.LIFUUart import LIFUUart
from openlifu.plan.solution import Solution

REF_MAX_SEQUENCE_TIMES = [2*60, 5*60, 20*60]
REF_MAX_SEQUENCE_TIMES = {
"default": [2*60, 5*60, 10*60], # users to use default values
"stress_test": [5*60, 9*60, 15*60] # QA to use stress test values
}

REF_MAX_DUTY_CYCLES = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5]
MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME = [
[65, 65, 65], # 0.05
[65, 65, 50], # 0.1
[50, 40, 35], # 0.2
[45, 35, 30], # 0.3
[35, 30, 25], # 0.4
[30, 25, 20] # 0.5
]

MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME = {
"evt2": [
[45, 45, 45], # 0.05
[40, 40, 40], # 0.1
[40, 40, 35], # 0.2
[40, 35, 30], # 0.3
[35, 30, 25], # 0.4
[30, 25, 20] # 0.5
],
"evt0": [
[65, 65, 65], # 0.05
[65, 65, 50], # 0.1
[50, 40, 35], # 0.2
[45, 35, 30], # 0.3
[35, 30, 25], # 0.4
[30, 25, 20] # 0.5
],
}

class LIFUInterfaceStatus(Enum):
STATUS_COMMS_ERROR = -1
Expand Down Expand Up @@ -58,7 +72,9 @@ def __init__(self,
HV_test_mode: bool = False,
run_async: bool = False,
ext_power_supply: bool = False,
module_invert: bool | List[bool] = False) -> None:
module_invert: bool | List[bool] = False,
voltage_table_selection: Optional[str] = None,
sequence_time_selection: Optional[str] = None) -> None:
"""
Initialize the LIFUInterface with given parameters and store them in the class.

Expand All @@ -83,6 +99,11 @@ def __init__(self,
self._hv_uart = None
self.status = LIFUInterfaceStatus.STATUS_SYS_OFF

self.voltage_table = None
self.sequence_time = None
self.voltage_table_selection = voltage_table_selection
self.sequence_time_selection = sequence_time_selection

# Create a TXDevice instance as part of the interface
logger.debug("Initializing TX Module of LIFUInterface with VID: %s, PID: %s, baudrate: %s, timeout: %s", vid, tx_pid, baudrate, timeout)
self._tx_uart = LIFUUart(vid=vid, pid=tx_pid, baudrate=baudrate, timeout=timeout, desc="TX", demo_mode=TX_test_mode, async_mode=run_async)
Expand All @@ -97,7 +118,6 @@ def __init__(self,
self._hv_uart = LIFUUart(vid=vid, pid=con_pid, baudrate=baudrate, timeout=timeout, desc="HV", demo_mode=HV_test_mode, async_mode=run_async)
self.hvcontroller = HVController(uart=self._hv_uart)


# Connect signals to internal handlers
if self._async_mode:
self._tx_uart.signal_connect.connect(self.signal_connect.emit)
Expand All @@ -107,6 +127,31 @@ def __init__(self,
self._hv_uart.signal_disconnect.connect(self.signal_disconnect.emit)
self._hv_uart.signal_data_received.connect(self.signal_data_received.emit)

# Temporary fix for hardware variations between EVT0 and EVT2
def _resolve_voltage_chart_evt_version(self, voltage_table: str) -> list[list[int]]:
if voltage_table is None:
try:
evt_version = "evt0" if self.hvcontroller.get_version().startswith("v1.1") else "evt2"
except Exception as e:
logger.error("Error getting console version: %s", e)
raise e
else:
evt_version = voltage_table.lower()
if evt_version not in MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME:
raise ValueError(f"Invalid voltage_table option '{voltage_table}'. Valid options are: {tuple(MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME.keys())}")

return MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME[evt_version]

# Restrict sequence time options for users vs QA
def _resolve_max_sequence_time_set(self, sequence_time: str) -> list[int]:
if sequence_time is None:
return REF_MAX_SEQUENCE_TIMES["default"]
else:
sequence_time = sequence_time.lower()
if sequence_time not in REF_MAX_SEQUENCE_TIMES:
raise ValueError(f"Invalid sequence_time option '{sequence_time}'. Valid options are: {tuple(REF_MAX_SEQUENCE_TIMES.keys())}")
return REF_MAX_SEQUENCE_TIMES[sequence_time]

async def start_monitoring(self, interval: int = 1) -> None:
"""Start monitoring for USB device connections."""
try:
Expand Down Expand Up @@ -168,11 +213,11 @@ def get_max_voltage(self, solution: Solution | Dict) -> float:
duty_cycle_index = np.where(duty_cycles_limits >= sequence_duty_cycle)[0][0]

# Find the index of the duration in the reference list
duration_limits = np.array(REF_MAX_SEQUENCE_TIMES)
duration_limits = np.array(self.sequence_time)
duration_index = np.where(duration_limits >= sequence_duration)[0][0]

# Return the maximum voltage for the given duty cycle and duration
return MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME[duty_cycle_index][duration_index]
return self.voltage_table[duty_cycle_index][duration_index]

def get_max_voltage_table(self) -> pd.DataFrame:
"""
Expand All @@ -184,10 +229,10 @@ def get_max_voltage_table(self) -> pd.DataFrame:
data = {
"Duty Cycle (%)": [f"<={100 * dc:0.1f}%" for dc in REF_MAX_DUTY_CYCLES],
}
for i, duration in enumerate(REF_MAX_SEQUENCE_TIMES):
for i, duration in enumerate(self.sequence_time):
col_name = f"<={duration // 60} min"
data[col_name] = [
MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME[j][i] for j in range(len(REF_MAX_DUTY_CYCLES))
self.voltage_table[j][i] for j in range(len(REF_MAX_DUTY_CYCLES))
]
max_voltage = pd.DataFrame(data).set_index("Duty Cycle (%)")
max_voltage.Name = "Maximum Voltage (V)"
Expand All @@ -205,19 +250,21 @@ def check_solution(self, solution: Solution | Dict) -> None:
if isinstance(solution, Solution):
solution = solution.to_dict()

self.voltage_table = self._resolve_voltage_chart_evt_version(self.voltage_table_selection)
self.sequence_time = self._resolve_max_sequence_time_set(self.sequence_time_selection)
sequence_duty_cycle = self.get_sequence_duty_cycle(solution)
duty_cycles_limits = np.array(REF_MAX_DUTY_CYCLES)
if sequence_duty_cycle > duty_cycles_limits.max():
raise ValueError(f"Sequence duty cycle ({100*sequence_duty_cycle:0.1f} %) exceeds maximum allowed duty cycle ({100*duty_cycles_limits.max():0.1f} %).")
duty_cycle_index = np.where(duty_cycles_limits >= sequence_duty_cycle)[0][0]

sequence_duration = self.get_sequence_duration(solution)
duration_limits = np.array(REF_MAX_SEQUENCE_TIMES)
duration_limits = np.array(self.sequence_time)
if sequence_duration > duration_limits.max():
raise ValueError(f"Sequence duration ({sequence_duration:0.0f} s) exceeds maximum allowed duration ({duration_limits.max()} s).")
duration_index = np.where(duration_limits >= sequence_duration)[0][0]

max_voltage = MAX_VOLTAGE_BY_DUTY_CYCLE_AND_SEQUENCE_TIME[duty_cycle_index][duration_index]
max_voltage = self.voltage_table[duty_cycle_index][duration_index]
if solution['voltage'] > max_voltage:
raise ValueError(f"Voltage ({solution['voltage']:0.1f}V) exceeds maximum allowed voltage ({max_voltage:0.1f}V) for duty cycle ({100*sequence_duty_cycle:0.1f} <= {100*duty_cycles_limits[duty_cycle_index]}%) and sequence time ({sequence_duration:0.0f} <= {duration_limits[duration_index]}s).")

Expand Down