From c6546744eac5437814416b504d63251e2e650efc Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 11 Jun 2026 12:00:40 +0100 Subject: [PATCH 1/5] `STARBackend`: add basic 96-head aspirate/dispense (`H0 PA`/`PB`) Add head96_basic_aspirate / head96_basic_dispense: direct H0 PA/PB surface-following aspirate and dispense as the lowest-tier 96-head liquid-handling commands. They validate inputs against Head96Information ranges, pin the rigid head to the full channel pattern, and guard on tip presence (enforce_requires_tip). Add a STARChatterboxBackend head96_request_tip_presence stub that derives presence from the tip tracker and raises when tip tracking is disabled. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../backends/hamilton/STAR_backend.py | 154 ++++++++++++++++++ .../backends/hamilton/STAR_chatterbox.py | 13 ++ .../backends/hamilton/STAR_tests.py | 73 +++++++++ 3 files changed, 240 insertions(+) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 88f813c571e..f654e80fc39 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -8428,6 +8428,160 @@ async def discard_tips_core96( # -------------- 3.10.3 Liquid handling using CoRe 96 Head -------------- + @_requires_head96 + async def head96_basic_aspirate( + self, + volume: float, + *, + flow_rate: Optional[float] = None, + minimum_height: float, + surface_following_distance: float = 0.0, + enforce_minimum_height: bool = True, + enforce_requires_tip: bool = True, + ): + """Aspirate on the 96-head with surface following (the firmware drives Z and the dispensing drive + in parallel). + + The direct, full-control counterpart to `aspirate96`: it takes the height / surface-following / + flow directly rather than a resource and liquid class, and computes no positions. Acts on the whole + head (rigid - no per-channel selection) and assumes the head already holds tips. Values are given + in human units and converted to firmware increments. This is a basic, thin command: it does not + settle after aspirating (settling is a separate basic command). + + Args: + volume: The volume to aspirate through each channel, uL. + flow_rate: The dispensing-drive speed, uL/s; None leaves the firmware default. + minimum_height: The lowest Z (mm) the head descends to, the end of the aspiration stroke. + surface_following_distance: The Z travel during aspiration, mm; 0 keeps the head in place so it + cannot drive into the container bottom. + enforce_minimum_height: If True, clamp any deeper target to minimum_height; if False, surface + following may carry the head past it. + enforce_requires_tip: If True, raise if the head holds no tips; if False, allow aspirating air. + + Raises: + RuntimeError: If 96-head is not installed, or enforce_requires_tip and the head holds no tips. + AssertionError: If firmware info missing or a parameter is out of range. + """ + assert self._head96_information is not None, "96-head information not loaded; run setup()" + info = self._head96_information + volume_max = info.dispensing_drive_range[1] + flow_max = info.dispensing_drive_speed_range[1] + z_min, z_max = info.z_range + surface_following_max = self._head96_z_drive_increment_to_mm(9999) + + assert 0 <= volume <= volume_max, f"volume must be between 0 and {volume_max} uL" + if flow_rate is not None: + assert 0 <= flow_rate <= flow_max, f"flow_rate must be between 0 and {flow_max} uL/s" + assert 0 <= surface_following_distance <= surface_following_max, ( + f"surface_following_distance must be between 0 and {surface_following_max} mm" + ) + assert z_min <= minimum_height <= z_max, ( + f"minimum_height must be between {z_min} and {z_max} mm" + ) + + if enforce_requires_tip: + has_tips = await self.head96_request_tip_presence() + if not has_tips: + raise RuntimeError( + "96-head has no tips (firmware reports none); pick up tips before aspirating" + ) + + to_z = self._head96_z_drive_mm_to_increment + to_vol = self._head96_dispensing_drive_uL_to_increment + params = { + "pm": "F" * 24, # all 96 channels; the rigid head has no per-channel selection + "dj": "1" if enforce_minimum_height else "0", + "da": f"{to_vol(volume):05}", + } + if flow_rate is not None: + params["dv"] = f"{to_vol(flow_rate):05}" + params["dc"] = "00000" # pre-wetting off; its interaction with surface following is unverified + params["zd"] = f"{to_z(surface_following_distance):04}" + params["zh"] = f"{to_z(minimum_height):05}" + params["to"] = ( + "000" # settling_time not exposed here (firmware allows it); it is its own basic command + ) + return await self.send_command(module="H0", command="PA", **params) # type: ignore[arg-type] + + @_requires_head96 + async def head96_basic_dispense( + self, + volume: float, + *, + flow_rate: Optional[float] = None, + minimum_height: float, + stop_back_volume: float = 0.0, + surface_following_distance: float = 0.0, + stop_flow_rate: Optional[float] = None, + enforce_requires_tip: bool = True, + ): + """Dispense on the 96-head with surface following (the firmware drives Z and the dispensing drive + in parallel). + + The direct, full-control counterpart to `dispense96`. Acts on the whole head (rigid) and assumes + the head already holds tips. This is a basic, thin command: it does not settle after dispensing + (settling is a separate basic command). + + Args: + volume: The volume to dispense through each channel, uL. + flow_rate: The dispensing-drive speed, uL/s; None leaves the firmware default. + minimum_height: The lowest Z (mm) the head descends to, the end of the dispense stroke. + stop_back_volume: The volume drawn back at the end to stop dripping, uL. + surface_following_distance: The Z travel during dispense, mm. + stop_flow_rate: The dispensing-drive stop speed, uL/s; None leaves the firmware default. + enforce_requires_tip: If True, raise if the head holds no tips; if False, allow dispensing air. + + Raises: + RuntimeError: If 96-head is not installed, or enforce_requires_tip and the head holds no tips. + AssertionError: If firmware info missing or a parameter is out of range. + """ + assert self._head96_information is not None, "96-head information not loaded; run setup()" + info = self._head96_information + volume_max = info.dispensing_drive_range[1] + flow_max = info.dispensing_drive_speed_range[1] + z_min, z_max = info.z_range + surface_following_max = self._head96_z_drive_increment_to_mm(9999) + stop_back_max = self._head96_dispensing_drive_increment_to_uL(9999) + + assert 0 <= volume <= volume_max, f"volume must be between 0 and {volume_max} uL" + if flow_rate is not None: + assert 0 <= flow_rate <= flow_max, f"flow_rate must be between 0 and {flow_max} uL/s" + assert 0 <= stop_back_volume <= stop_back_max, ( + f"stop_back_volume must be between 0 and {stop_back_max} uL" + ) + assert 0 <= surface_following_distance <= surface_following_max, ( + f"surface_following_distance must be between 0 and {surface_following_max} mm" + ) + assert z_min <= minimum_height <= z_max, ( + f"minimum_height must be between {z_min} and {z_max} mm" + ) + if stop_flow_rate is not None: + assert 0 <= stop_flow_rate <= flow_max, ( + f"stop_flow_rate must be between 0 and {flow_max} uL/s" + ) + + if enforce_requires_tip: + has_tips = await self.head96_request_tip_presence() + if not has_tips: + raise RuntimeError( + "96-head has no tips (firmware reports none); pick up tips before dispensing" + ) + + to_z = self._head96_z_drive_mm_to_increment + to_vol = self._head96_dispensing_drive_uL_to_increment + params = { + "pm": "F" * 24, # all 96 channels; the rigid head has no per-channel selection + "db": f"{to_vol(volume):05}", + } + if flow_rate is not None: + params["dv"] = f"{to_vol(flow_rate):05}" + params["dd"] = f"{to_vol(stop_back_volume):04}" + params["ze"] = f"{to_z(surface_following_distance):04}" + params["zh"] = f"{to_z(minimum_height):05}" + if stop_flow_rate is not None: + params["du"] = f"{to_vol(stop_flow_rate):05}" + return await self.send_command(module="H0", command="PB", **params) # type: ignore[arg-type] + # # # Granular commands # # # async def head96_dispensing_drive_move_to_home_volume( diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index 731401dd0e4..c48e9df4c87 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -14,6 +14,7 @@ iSWAPInformation, ) from pylabrobot.resources.container import Container +from pylabrobot.resources.tip_tracker import does_tip_tracking from pylabrobot.resources.well import Well _DEFAULT_MACHINE_CONFIGURATION = MachineConfiguration( @@ -322,6 +323,18 @@ async def head96_request_firmware_version(self) -> datetime.date: """Return mock 96-head firmware version.""" return datetime.date(2023, 1, 1) + async def head96_request_tip_presence(self) -> int: + """Mock 96-head tip presence from the tip tracker: 1 if any channel holds a tip, else 0. + + Raises if tip tracking is disabled, since the tracker is then not updated and has no state to report. + """ + if not does_tip_tracking() or self.head96 is None: + raise RuntimeError( + "cannot report 96-head tip presence with tip tracking disabled in simulation; " + "enable it with set_tip_tracking(True) or call with enforce_requires_tip=False" + ) + return int(any(tracker.has_tip for tracker in self.head96.values())) + # # # # # # # # Extension: iSWAP # # # # # # # # async def request_iswap_initialization_status(self) -> bool: diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index ffcd377a030..242d003c699 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -38,6 +38,7 @@ CommandSyntaxError, HamiltonNoTipError, HardwareError, + Head96Information, PipChannelInformation, STARBackend, STARFirmwareError, @@ -154,6 +155,33 @@ def _any_write_and_read_command_call(cmd): ) +def _make_head96_information(star): + """A representative installed-96-head record (2021 legacy) for command tests.""" + fw = datetime.date(2021, 10, 22) + return Head96Information( + fw_version=fw, + x_offset=368.2, + supports_clot_monitoring_clld=False, + stop_disc_type="core_ii", + instrument_type="legacy", + head_type="96 head II", + y_range=star._head96_resolve_y_range(fw), + y_speed_range=star._head96_resolve_y_speed_range(fw), + z_range=star._head96_resolve_z_range("legacy"), + dispensing_drive_range=star._head96_resolve_dispensing_drive_range(fw), + dispensing_drive_speed_range=star._head96_resolve_dispensing_drive_speed_range(fw), + y_drive_speed_default=star._head96_resolve_y_drive_speed_default(fw), + y_drive_acceleration_default=star._head96_resolve_y_drive_acceleration_default(fw), + dispensing_drive_acceleration_default=star._head96_resolve_dispensing_drive_acceleration_default( + fw + ), + squeezer_drive_speed_default=star._head96_resolve_squeezer_drive_speed_default(fw), + squeezer_drive_acceleration_default=star._head96_resolve_squeezer_drive_acceleration_default( + fw + ), + ) + + class TestPipChannelInformationParsing(unittest.TestCase): """VW (pip channel hardware-configuration) response parsing. @@ -1104,6 +1132,51 @@ async def test_core_96_dispense(self): ] ) + async def test_head96_basic_aspirate(self): + self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR._write_and_read_command.reset_mock() + await self.STAR.head96_basic_aspirate( + volume=100, + minimum_height=230, + surface_following_distance=2, + flow_rate=50, + enforce_requires_tip=False, # isolate the wire string from the tip-presence round-trip + ) + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "H0PAid0001pmFFFFFFFFFFFFFFFFFFFFFFFFdj1da05170dv02585dc00000zd0400zh46000to000" + ) + ] + ) + + async def test_head96_basic_dispense(self): + self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR._write_and_read_command.reset_mock() + await self.STAR.head96_basic_dispense( + volume=100, + minimum_height=230, + stop_back_volume=5, + surface_following_distance=2, + flow_rate=50, + stop_flow_rate=20, + enforce_requires_tip=False, # isolate the wire string from the tip-presence round-trip + ) + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "H0PBid0001pmFFFFFFFFFFFFFFFFFFFFFFFFdb05170dv02585dd0259ze0400zh46000du01034" + ) + ] + ) + + async def test_head96_basic_aspirate_requires_tip(self): + """enforce_requires_tip raises when the head reports no tips.""" + self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=0) + with self.assertRaises(RuntimeError): + await self.STAR.head96_basic_aspirate(volume=100, minimum_height=230) + async def test_core_96_dispense_quadrant(self): """Test that each quadrant of a 384-well plate produces the correct firmware command. From 60c40e9006d8df9a348e5e5e794fe2f9e4895e5d Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 11 Jun 2026 12:37:07 +0100 Subject: [PATCH 2/5] `STARBackend`: resolve 96-head basic flow_rate to the head default and send it explicitly head96_basic_aspirate / head96_basic_dispense now resolve an omitted flow_rate to Head96Information.dispensing_drive_speed_default and always send dv, so the command is predictable regardless of the dispensing-drive speed register. Derive every bound from the Head96Information range tuples, correcting the flow_rate lower bound (the firmware dv floor is ~0.1 uL/s, not 0). Build the firmware command with direct send_command keyword arguments instead of an incremental dict, matching aspirate_core_96 and dropping the type: ignore; convert via descriptively-named increment locals. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../backends/hamilton/STAR_backend.py | 100 ++++++++++-------- .../backends/hamilton/STAR_tests.py | 15 +++ 2 files changed, 69 insertions(+), 46 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index f654e80fc39..07836fa43b3 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -8450,7 +8450,7 @@ async def head96_basic_aspirate( Args: volume: The volume to aspirate through each channel, uL. - flow_rate: The dispensing-drive speed, uL/s; None leaves the firmware default. + flow_rate: The dispensing-drive speed, uL/s; None uses the head's default speed. minimum_height: The lowest Z (mm) the head descends to, the end of the aspiration stroke. surface_following_distance: The Z travel during aspiration, mm; 0 keeps the head in place so it cannot drive into the container bottom. @@ -8464,14 +8464,17 @@ async def head96_basic_aspirate( """ assert self._head96_information is not None, "96-head information not loaded; run setup()" info = self._head96_information - volume_max = info.dispensing_drive_range[1] - flow_max = info.dispensing_drive_speed_range[1] + vol_min, vol_max = info.dispensing_drive_range + flow_min, flow_max = info.dispensing_drive_speed_range z_min, z_max = info.z_range surface_following_max = self._head96_z_drive_increment_to_mm(9999) + if flow_rate is None: + flow_rate = info.dispensing_drive_speed_default - assert 0 <= volume <= volume_max, f"volume must be between 0 and {volume_max} uL" - if flow_rate is not None: - assert 0 <= flow_rate <= flow_max, f"flow_rate must be between 0 and {flow_max} uL/s" + assert vol_min <= volume <= vol_max, f"volume must be between {vol_min} and {vol_max} uL" + assert flow_min <= flow_rate <= flow_max, ( + f"flow_rate must be between {flow_min} and {flow_max} uL/s" + ) assert 0 <= surface_following_distance <= surface_following_max, ( f"surface_following_distance must be between 0 and {surface_following_max} mm" ) @@ -8486,22 +8489,22 @@ async def head96_basic_aspirate( "96-head has no tips (firmware reports none); pick up tips before aspirating" ) - to_z = self._head96_z_drive_mm_to_increment - to_vol = self._head96_dispensing_drive_uL_to_increment - params = { - "pm": "F" * 24, # all 96 channels; the rigid head has no per-channel selection - "dj": "1" if enforce_minimum_height else "0", - "da": f"{to_vol(volume):05}", - } - if flow_rate is not None: - params["dv"] = f"{to_vol(flow_rate):05}" - params["dc"] = "00000" # pre-wetting off; its interaction with surface following is unverified - params["zd"] = f"{to_z(surface_following_distance):04}" - params["zh"] = f"{to_z(minimum_height):05}" - params["to"] = ( - "000" # settling_time not exposed here (firmware allows it); it is its own basic command + volume_increment = self._head96_dispensing_drive_uL_to_increment(volume) + flow_rate_increment = self._head96_dispensing_drive_uL_to_increment(flow_rate) + surface_following_increment = self._head96_z_drive_mm_to_increment(surface_following_distance) + minimum_height_increment = self._head96_z_drive_mm_to_increment(minimum_height) + return await self.send_command( + module="H0", + command="PA", + pm="F" * 24, # all 96 channels; the rigid head has no per-channel selection + dj="1" if enforce_minimum_height else "0", + da=f"{volume_increment:05}", + dv=f"{flow_rate_increment:05}", + dc="00000", # pre-wetting off; its interaction with surface following is unverified + zd=f"{surface_following_increment:04}", + zh=f"{minimum_height_increment:05}", + to="000", # settling_time not exposed here (firmware allows it); it is its own basic command ) - return await self.send_command(module="H0", command="PA", **params) # type: ignore[arg-type] @_requires_head96 async def head96_basic_dispense( @@ -8524,11 +8527,11 @@ async def head96_basic_dispense( Args: volume: The volume to dispense through each channel, uL. - flow_rate: The dispensing-drive speed, uL/s; None leaves the firmware default. + flow_rate: The dispensing-drive speed, uL/s; None uses the head's default speed. minimum_height: The lowest Z (mm) the head descends to, the end of the dispense stroke. stop_back_volume: The volume drawn back at the end to stop dripping, uL. surface_following_distance: The Z travel during dispense, mm. - stop_flow_rate: The dispensing-drive stop speed, uL/s; None leaves the firmware default. + stop_flow_rate: The dispensing-drive stop speed, uL/s; None uses the firmware default (0). enforce_requires_tip: If True, raise if the head holds no tips; if False, allow dispensing air. Raises: @@ -8537,15 +8540,20 @@ async def head96_basic_dispense( """ assert self._head96_information is not None, "96-head information not loaded; run setup()" info = self._head96_information - volume_max = info.dispensing_drive_range[1] - flow_max = info.dispensing_drive_speed_range[1] + vol_min, vol_max = info.dispensing_drive_range + flow_min, flow_max = info.dispensing_drive_speed_range z_min, z_max = info.z_range surface_following_max = self._head96_z_drive_increment_to_mm(9999) stop_back_max = self._head96_dispensing_drive_increment_to_uL(9999) + if flow_rate is None: + flow_rate = info.dispensing_drive_speed_default + if stop_flow_rate is None: + stop_flow_rate = 0.0 # firmware stop-speed default - assert 0 <= volume <= volume_max, f"volume must be between 0 and {volume_max} uL" - if flow_rate is not None: - assert 0 <= flow_rate <= flow_max, f"flow_rate must be between 0 and {flow_max} uL/s" + assert vol_min <= volume <= vol_max, f"volume must be between {vol_min} and {vol_max} uL" + assert flow_min <= flow_rate <= flow_max, ( + f"flow_rate must be between {flow_min} and {flow_max} uL/s" + ) assert 0 <= stop_back_volume <= stop_back_max, ( f"stop_back_volume must be between 0 and {stop_back_max} uL" ) @@ -8555,10 +8563,7 @@ async def head96_basic_dispense( assert z_min <= minimum_height <= z_max, ( f"minimum_height must be between {z_min} and {z_max} mm" ) - if stop_flow_rate is not None: - assert 0 <= stop_flow_rate <= flow_max, ( - f"stop_flow_rate must be between 0 and {flow_max} uL/s" - ) + assert 0 <= stop_flow_rate <= flow_max, f"stop_flow_rate must be between 0 and {flow_max} uL/s" if enforce_requires_tip: has_tips = await self.head96_request_tip_presence() @@ -8567,20 +8572,23 @@ async def head96_basic_dispense( "96-head has no tips (firmware reports none); pick up tips before dispensing" ) - to_z = self._head96_z_drive_mm_to_increment - to_vol = self._head96_dispensing_drive_uL_to_increment - params = { - "pm": "F" * 24, # all 96 channels; the rigid head has no per-channel selection - "db": f"{to_vol(volume):05}", - } - if flow_rate is not None: - params["dv"] = f"{to_vol(flow_rate):05}" - params["dd"] = f"{to_vol(stop_back_volume):04}" - params["ze"] = f"{to_z(surface_following_distance):04}" - params["zh"] = f"{to_z(minimum_height):05}" - if stop_flow_rate is not None: - params["du"] = f"{to_vol(stop_flow_rate):05}" - return await self.send_command(module="H0", command="PB", **params) # type: ignore[arg-type] + volume_increment = self._head96_dispensing_drive_uL_to_increment(volume) + flow_rate_increment = self._head96_dispensing_drive_uL_to_increment(flow_rate) + stop_back_increment = self._head96_dispensing_drive_uL_to_increment(stop_back_volume) + surface_following_increment = self._head96_z_drive_mm_to_increment(surface_following_distance) + minimum_height_increment = self._head96_z_drive_mm_to_increment(minimum_height) + stop_flow_rate_increment = self._head96_dispensing_drive_uL_to_increment(stop_flow_rate) + return await self.send_command( + module="H0", + command="PB", + pm="F" * 24, # all 96 channels; the rigid head has no per-channel selection + db=f"{volume_increment:05}", + dv=f"{flow_rate_increment:05}", + dd=f"{stop_back_increment:04}", + ze=f"{surface_following_increment:04}", + zh=f"{minimum_height_increment:05}", + du=f"{stop_flow_rate_increment:05}", + ) # # # Granular commands # # # diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index 242d003c699..540ff819979 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -1177,6 +1177,21 @@ async def test_head96_basic_aspirate_requires_tip(self): with self.assertRaises(RuntimeError): await self.STAR.head96_basic_aspirate(volume=100, minimum_height=230) + async def test_head96_basic_aspirate_default_flow_rate(self): + """Omitting flow_rate emits the head's default dispensing-drive speed (dv13500).""" + self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR._write_and_read_command.reset_mock() + await self.STAR.head96_basic_aspirate( + volume=100, minimum_height=230, surface_following_distance=2, enforce_requires_tip=False + ) + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "H0PAid0001pmFFFFFFFFFFFFFFFFFFFFFFFFdj1da05170dv13500dc00000zd0400zh46000to000" + ) + ] + ) + async def test_core_96_dispense_quadrant(self): """Test that each quadrant of a 384-well plate produces the correct firmware command. From d12ce8a4de767c31c549533d3f292d651adde6e5 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Jun 2026 07:33:41 +0100 Subject: [PATCH 3/5] `STARBackend`: test basic 96-head dispense defaults and aspirate range guard head96_basic_dispense with flow_rate and stop_flow_rate omitted emits the head default speed and a zero stop speed (plus the stop-back / surface-following defaults), and an out-of-range volume raises an AssertionError before any command is sent. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../backends/hamilton/STAR_tests.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index 540ff819979..568a94ae248 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -1192,6 +1192,30 @@ async def test_head96_basic_aspirate_default_flow_rate(self): ] ) + async def test_head96_basic_dispense_default_flow_rates(self): + """Omitting flow_rate and stop_flow_rate emits the head default speed (dv13500) and a zero stop + speed (du00000), with the stop-back and surface-following defaults (dd0000 / ze0000).""" + self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR._write_and_read_command.reset_mock() + await self.STAR.head96_basic_dispense( + volume=100, minimum_height=230, enforce_requires_tip=False + ) + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "H0PBid0001pmFFFFFFFFFFFFFFFFFFFFFFFFdb05170dv13500dd0000ze0000zh46000du00000" + ) + ] + ) + + async def test_head96_basic_aspirate_volume_out_of_range_raises(self): + """A volume beyond the dispensing-drive range raises before any command is sent.""" + self.STAR._head96_information = _make_head96_information(self.STAR) + with self.assertRaises(AssertionError): + await self.STAR.head96_basic_aspirate( + volume=100000, minimum_height=230, enforce_requires_tip=False + ) + async def test_core_96_dispense_quadrant(self): """Test that each quadrant of a 384-well plate produces the correct firmware command. From 337f83e3947c8717a1c02d09140765356884204f Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Jun 2026 09:11:40 +0100 Subject: [PATCH 4/5] `STARBackend`: 96-head basic minimum_height is tip-bottom, defaulting to the deck floor head96_basic_aspirate / head96_basic_dispense now take minimum_height in tip-bottom space (where the tip end stops) as Optional[float] = None, defaulting to the deck floor (MINIMUM_CHANNEL_Z_POSITION); the value is converted to the firmware stop-disk zh via the tip overhang. With no tip the overhang is 0, so minimum_height is guarded against the firmware Z floor (z_range[0]) directly. Giving the parameter a default drops the keyword-only marker. Extract _head96_tip_overhang() (stop disk - tip bottom, measured move-free) and route head96_move_tool_z and both basic commands through it instead of the inlined computation. The volume docstring now notes it is the raw piston (dispensing-drive) volume, not liquid-class corrected. Tests: the wire-string tests now exercise the no-tip path (overhang 0); added a with-tip overhang-conversion test and a minimum_height-defaults-to-floor test. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../backends/hamilton/STAR_backend.py | 102 +++++++++++------- .../backends/hamilton/STAR_tests.py | 40 +++++++ 2 files changed, 103 insertions(+), 39 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 07836fa43b3..ef186086891 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -8279,6 +8279,11 @@ async def head96_move_stop_disk_z( return resp + async def _head96_tip_overhang(self) -> float: + """The tip overhang (stop disk minus tip bottom) in mm, measured move-free; lets a tip-bottom Z + be expressed against the firmware stop-disk reference. Near 0 with no tip on.""" + return await self.head96_request_stop_disk_z() - (await self.head96_request_position()).z + @_requires_head96 async def head96_move_tool_z(self, z: float, speed: Optional[float] = None): """Move the 96-head tip bottom to an absolute Z position in mm. @@ -8305,9 +8310,7 @@ async def head96_move_tool_z(self, z: float, speed: Optional[float] = None): "without a tip attached." ) - tip_overhang = ( - await self.head96_request_stop_disk_z() - (await self.head96_request_position()).z - ) + tip_overhang = await self._head96_tip_overhang() # The move is in stop-disk space over z_range, so the reachable tip-bottom window is z_range # shifted down by the overhang, floored at the deck surface. Validate in tip-bottom terms. @@ -8432,9 +8435,8 @@ async def discard_tips_core96( async def head96_basic_aspirate( self, volume: float, - *, flow_rate: Optional[float] = None, - minimum_height: float, + minimum_height: Optional[float] = None, surface_following_distance: float = 0.0, enforce_minimum_height: bool = True, enforce_requires_tip: bool = True, @@ -8444,14 +8446,16 @@ async def head96_basic_aspirate( The direct, full-control counterpart to `aspirate96`: it takes the height / surface-following / flow directly rather than a resource and liquid class, and computes no positions. Acts on the whole - head (rigid - no per-channel selection) and assumes the head already holds tips. Values are given - in human units and converted to firmware increments. This is a basic, thin command: it does not - settle after aspirating (settling is a separate basic command). + head (rigid - no per-channel selection). Values are given in human units and converted to firmware + increments. This is a basic, thin command: it does not settle after aspirating (settling is a + separate basic command). Args: - volume: The volume to aspirate through each channel, uL. + volume: The piston (dispensing-drive) volume to aspirate per channel, uL; raw, not liquid-class + corrected. flow_rate: The dispensing-drive speed, uL/s; None uses the head's default speed. - minimum_height: The lowest Z (mm) the head descends to, the end of the aspiration stroke. + minimum_height: The lowest the tip end (tip-bottom) descends to, mm - the end of the stroke. + None defaults to the deck floor. With no tip on, this is the stop-disk position directly. surface_following_distance: The Z travel during aspiration, mm; 0 keeps the head in place so it cannot drive into the container bottom. enforce_minimum_height: If True, clamp any deeper target to minimum_height; if False, surface @@ -8478,21 +8482,31 @@ async def head96_basic_aspirate( assert 0 <= surface_following_distance <= surface_following_max, ( f"surface_following_distance must be between 0 and {surface_following_max} mm" ) - assert z_min <= minimum_height <= z_max, ( - f"minimum_height must be between {z_min} and {z_max} mm" - ) - if enforce_requires_tip: - has_tips = await self.head96_request_tip_presence() - if not has_tips: - raise RuntimeError( - "96-head has no tips (firmware reports none); pick up tips before aspirating" - ) + has_tips = bool(await self.head96_request_tip_presence()) + if enforce_requires_tip and not has_tips: + raise RuntimeError( + "96-head has no tips (firmware reports none); pick up tips before aspirating" + ) + + # minimum_height is a tip-bottom height: the tip overhang (stop disk - tip bottom) converts it to + # the firmware stop-disk zh, and the deck floor caps how deep it may go. With no tip there is no + # overhang, so minimum_height is the stop-disk position directly and is guarded against z_min. + overhang = 0.0 + if has_tips: + overhang = await self._head96_tip_overhang() + height_min = max(z_min - overhang, STARBackend.MINIMUM_CHANNEL_Z_POSITION) + height_max = z_max - overhang + if minimum_height is None: + minimum_height = height_min + assert height_min <= minimum_height <= height_max, ( + f"minimum_height must be between {height_min} and {height_max} mm" + ) volume_increment = self._head96_dispensing_drive_uL_to_increment(volume) flow_rate_increment = self._head96_dispensing_drive_uL_to_increment(flow_rate) surface_following_increment = self._head96_z_drive_mm_to_increment(surface_following_distance) - minimum_height_increment = self._head96_z_drive_mm_to_increment(minimum_height) + zh_increment = self._head96_z_drive_mm_to_increment(minimum_height + overhang) return await self.send_command( module="H0", command="PA", @@ -8502,7 +8516,7 @@ async def head96_basic_aspirate( dv=f"{flow_rate_increment:05}", dc="00000", # pre-wetting off; its interaction with surface following is unverified zd=f"{surface_following_increment:04}", - zh=f"{minimum_height_increment:05}", + zh=f"{zh_increment:05}", to="000", # settling_time not exposed here (firmware allows it); it is its own basic command ) @@ -8510,9 +8524,8 @@ async def head96_basic_aspirate( async def head96_basic_dispense( self, volume: float, - *, flow_rate: Optional[float] = None, - minimum_height: float, + minimum_height: Optional[float] = None, stop_back_volume: float = 0.0, surface_following_distance: float = 0.0, stop_flow_rate: Optional[float] = None, @@ -8521,14 +8534,15 @@ async def head96_basic_dispense( """Dispense on the 96-head with surface following (the firmware drives Z and the dispensing drive in parallel). - The direct, full-control counterpart to `dispense96`. Acts on the whole head (rigid) and assumes - the head already holds tips. This is a basic, thin command: it does not settle after dispensing - (settling is a separate basic command). + The direct, full-control counterpart to `dispense96`. Acts on the whole head (rigid). This is a + basic, thin command: it does not settle after dispensing (settling is a separate basic command). Args: - volume: The volume to dispense through each channel, uL. + volume: The piston (dispensing-drive) volume to dispense per channel, uL; raw, not liquid-class + corrected. flow_rate: The dispensing-drive speed, uL/s; None uses the head's default speed. - minimum_height: The lowest Z (mm) the head descends to, the end of the dispense stroke. + minimum_height: The lowest the tip end (tip-bottom) descends to, mm - the end of the stroke. + None defaults to the deck floor. With no tip on, this is the stop-disk position directly. stop_back_volume: The volume drawn back at the end to stop dripping, uL. surface_following_distance: The Z travel during dispense, mm. stop_flow_rate: The dispensing-drive stop speed, uL/s; None uses the firmware default (0). @@ -8560,23 +8574,33 @@ async def head96_basic_dispense( assert 0 <= surface_following_distance <= surface_following_max, ( f"surface_following_distance must be between 0 and {surface_following_max} mm" ) - assert z_min <= minimum_height <= z_max, ( - f"minimum_height must be between {z_min} and {z_max} mm" - ) assert 0 <= stop_flow_rate <= flow_max, f"stop_flow_rate must be between 0 and {flow_max} uL/s" - if enforce_requires_tip: - has_tips = await self.head96_request_tip_presence() - if not has_tips: - raise RuntimeError( - "96-head has no tips (firmware reports none); pick up tips before dispensing" - ) + has_tips = bool(await self.head96_request_tip_presence()) + if enforce_requires_tip and not has_tips: + raise RuntimeError( + "96-head has no tips (firmware reports none); pick up tips before dispensing" + ) + + # minimum_height is a tip-bottom height: the tip overhang (stop disk - tip bottom) converts it to + # the firmware stop-disk zh, and the deck floor caps how deep it may go. With no tip there is no + # overhang, so minimum_height is the stop-disk position directly and is guarded against z_min. + overhang = 0.0 + if has_tips: + overhang = await self._head96_tip_overhang() + height_min = max(z_min - overhang, STARBackend.MINIMUM_CHANNEL_Z_POSITION) + height_max = z_max - overhang + if minimum_height is None: + minimum_height = height_min + assert height_min <= minimum_height <= height_max, ( + f"minimum_height must be between {height_min} and {height_max} mm" + ) volume_increment = self._head96_dispensing_drive_uL_to_increment(volume) flow_rate_increment = self._head96_dispensing_drive_uL_to_increment(flow_rate) stop_back_increment = self._head96_dispensing_drive_uL_to_increment(stop_back_volume) surface_following_increment = self._head96_z_drive_mm_to_increment(surface_following_distance) - minimum_height_increment = self._head96_z_drive_mm_to_increment(minimum_height) + zh_increment = self._head96_z_drive_mm_to_increment(minimum_height + overhang) stop_flow_rate_increment = self._head96_dispensing_drive_uL_to_increment(stop_flow_rate) return await self.send_command( module="H0", @@ -8586,7 +8610,7 @@ async def head96_basic_dispense( dv=f"{flow_rate_increment:05}", dd=f"{stop_back_increment:04}", ze=f"{surface_following_increment:04}", - zh=f"{minimum_height_increment:05}", + zh=f"{zh_increment:05}", du=f"{stop_flow_rate_increment:05}", ) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index 568a94ae248..08bdd60ad38 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -1134,6 +1134,7 @@ async def test_core_96_dispense(self): async def test_head96_basic_aspirate(self): self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=0) self.STAR._write_and_read_command.reset_mock() await self.STAR.head96_basic_aspirate( volume=100, @@ -1152,6 +1153,7 @@ async def test_head96_basic_aspirate(self): async def test_head96_basic_dispense(self): self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=0) self.STAR._write_and_read_command.reset_mock() await self.STAR.head96_basic_dispense( volume=100, @@ -1180,6 +1182,7 @@ async def test_head96_basic_aspirate_requires_tip(self): async def test_head96_basic_aspirate_default_flow_rate(self): """Omitting flow_rate emits the head's default dispensing-drive speed (dv13500).""" self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=0) self.STAR._write_and_read_command.reset_mock() await self.STAR.head96_basic_aspirate( volume=100, minimum_height=230, surface_following_distance=2, enforce_requires_tip=False @@ -1196,6 +1199,7 @@ async def test_head96_basic_dispense_default_flow_rates(self): """Omitting flow_rate and stop_flow_rate emits the head default speed (dv13500) and a zero stop speed (du00000), with the stop-back and surface-following defaults (dd0000 / ze0000).""" self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=0) self.STAR._write_and_read_command.reset_mock() await self.STAR.head96_basic_dispense( volume=100, minimum_height=230, enforce_requires_tip=False @@ -1216,6 +1220,42 @@ async def test_head96_basic_aspirate_volume_out_of_range_raises(self): volume=100000, minimum_height=230, enforce_requires_tip=False ) + async def test_head96_basic_aspirate_tip_bottom_overhang(self): + """With a tip on, minimum_height is tip-bottom: zh = minimum_height + overhang.""" + self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=1) + self.STAR.head96_request_stop_disk_z = unittest.mock.AsyncMock(return_value=332.0) + self.STAR.head96_request_position = unittest.mock.AsyncMock( + return_value=Coordinate(0, 0, 245.0) + ) + self.STAR._write_and_read_command.reset_mock() + # overhang = 332 - 245 = 87; zh = (200 + 87) / 0.005 = 57400 + await self.STAR.head96_basic_aspirate( + volume=100, minimum_height=200, surface_following_distance=2 + ) + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "H0PAid0001pmFFFFFFFFFFFFFFFFFFFFFFFFdj1da05170dv13500dc00000zd0400zh57400to000" + ) + ] + ) + + async def test_head96_basic_aspirate_minimum_height_defaults_to_floor(self): + """Omitting minimum_height with no tip defaults to the firmware Z floor (z_range[0]).""" + self.STAR._head96_information = _make_head96_information(self.STAR) + self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=0) + self.STAR._write_and_read_command.reset_mock() + # no tip -> overhang 0 -> minimum_height defaults to z_range[0] = 180.5 mm -> zh 36100 + await self.STAR.head96_basic_aspirate(volume=100, enforce_requires_tip=False) + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "H0PAid0001pmFFFFFFFFFFFFFFFFFFFFFFFFdj1da05170dv13500dc00000zd0000zh36100to000" + ) + ] + ) + async def test_core_96_dispense_quadrant(self): """Test that each quadrant of a 384-well plate produces the correct firmware command. From 943e31ea5b7f7a059ed5203747762e5a3574a2ac Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Fri, 12 Jun 2026 23:15:04 +0100 Subject: [PATCH 5/5] `STARBackend`: add head96_request_tip_length mirroring request_tip_len_on_channel Replace the private overhang helper with a public head96_request_tip_length - the 96-head counterpart of the single-channel request_tip_len_on_channel: stop disk - (tip bottom - DEFAULT_TIP_FITTING_DEPTH), raising when no tips are present. head96_move_tool_z and both basic commands derive the stop-disk overhang as tip_length - DEFAULT_TIP_FITTING_DEPTH, matching the single-channel convention and reusing the shared fitting-depth constant. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../backends/hamilton/STAR_backend.py | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index ef186086891..c225fc9f862 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -8279,10 +8279,21 @@ async def head96_move_stop_disk_z( return resp - async def _head96_tip_overhang(self) -> float: - """The tip overhang (stop disk minus tip bottom) in mm, measured move-free; lets a tip-bottom Z - be expressed against the firmware stop-disk reference. Near 0 with no tip on.""" - return await self.head96_request_stop_disk_z() - (await self.head96_request_position()).z + async def head96_request_tip_length(self) -> float: + """Measures the length of the tips on the 96-head; the head's counterpart of + `request_tip_len_on_channel`. Raises if no tips are present. + + Returns: + The measured tip length in millimeters. + + Raises: + RuntimeError: If the 96-head holds no tips. + """ + if not await self.head96_request_tip_presence(): + raise RuntimeError("96-head has no tips (firmware reports none)") + stop_disk = await self.head96_request_stop_disk_z() + tip_bottom = (await self.head96_request_position()).z + return round(stop_disk - (tip_bottom - STARBackend.DEFAULT_TIP_FITTING_DEPTH), 1) @_requires_head96 async def head96_move_tool_z(self, z: float, speed: Optional[float] = None): @@ -8310,7 +8321,7 @@ async def head96_move_tool_z(self, z: float, speed: Optional[float] = None): "without a tip attached." ) - tip_overhang = await self._head96_tip_overhang() + tip_overhang = await self.head96_request_tip_length() - STARBackend.DEFAULT_TIP_FITTING_DEPTH # The move is in stop-disk space over z_range, so the reachable tip-bottom window is z_range # shifted down by the overhang, floored at the deck surface. Validate in tip-bottom terms. @@ -8494,7 +8505,7 @@ async def head96_basic_aspirate( # overhang, so minimum_height is the stop-disk position directly and is guarded against z_min. overhang = 0.0 if has_tips: - overhang = await self._head96_tip_overhang() + overhang = await self.head96_request_tip_length() - STARBackend.DEFAULT_TIP_FITTING_DEPTH height_min = max(z_min - overhang, STARBackend.MINIMUM_CHANNEL_Z_POSITION) height_max = z_max - overhang if minimum_height is None: @@ -8587,7 +8598,7 @@ async def head96_basic_dispense( # overhang, so minimum_height is the stop-disk position directly and is guarded against z_min. overhang = 0.0 if has_tips: - overhang = await self._head96_tip_overhang() + overhang = await self.head96_request_tip_length() - STARBackend.DEFAULT_TIP_FITTING_DEPTH height_min = max(z_min - overhang, STARBackend.MINIMUM_CHANNEL_Z_POSITION) height_max = z_max - overhang if minimum_height is None: