From 8d22e9e4ef72270f394ab7d6a50f3a15b8747de9 Mon Sep 17 00:00:00 2001 From: Aaron Niskin Date: Sun, 5 Oct 2025 16:52:14 -0700 Subject: [PATCH 1/3] wip --- CONTRIBUTING.md | 8 +- src/daggerml/core.py | 39 +++-- tests/conftest.py | 43 +++++ tests/test_core.py | 373 +++++++++++++++++++++---------------------- 4 files changed, 260 insertions(+), 203 deletions(-) create mode 100644 tests/conftest.py diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5e225ce..f40e9c4 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -38,7 +38,7 @@ requests and appreciate your help in improving this project. - Add or update unit tests for any new features or bug fixes. - Use [pytest](https://pytest.org/) for running tests. - The testing requirements are included in the `test` feature for the library. - - You can run tests using [hatch](https://hatch.pypa.io/): + - You can run tests using [hatch](https://hatch.pypa.io/): ``` hatch run pytest . ``` @@ -46,7 +46,7 @@ requests and appreciate your help in improving this project. ``` Python: Run Tests ``` - - Or install the `test` feature with pip and run tests: + - Or install the `test` feature with pip and run tests: ``` pip install -e [test] pytest . @@ -55,6 +55,10 @@ requests and appreciate your help in improving this project. ``` pytest -m "not slow" . ``` +- We mark tests that require `daggerml-cli` to be installed with `@pytest.mark.needs_dml`. You can exclude those tests with: + ``` + pytest -m "not needs_dml" . + ``` - Run all tests locally before submitting a pull request: - Ensure your code passes all tests and does not decrease code coverage. - If your changes introduce new dependencies, please update `pyproject.toml`, but we prefer to keep the dependencies to a minimum. diff --git a/src/daggerml/core.py b/src/daggerml/core.py index eeba04b..2c96cf2 100644 --- a/src/daggerml/core.py +++ b/src/daggerml/core.py @@ -277,14 +277,18 @@ def make_node(dag: "Dag", ref: Ref) -> "Node": """ info = dag.dml("node", "describe", ref.to) if info["data_type"] == "list": - return ListNode(dag, ref, _info=info) - if info["data_type"] == "dict": - return DictNode(dag, ref, _info=info) - if info["data_type"] == "set": - return ListNode(dag, ref, _info=info) - if info["data_type"] == "executable": - return ExecutableNode(dag, ref, _info=info) - return ScalarNode(dag, ref, _info=info) + node = ListNode(dag, ref, _info=info) + elif info["data_type"] == "dict": + node = DictNode(dag, ref, _info=info) + elif info["data_type"] == "set": + node = ListNode(dag, ref, _info=info) + elif info["data_type"] == "executable": + node = ExecutableNode(dag, ref, _info=info) + else: + node = ScalarNode(dag, ref, _info=info) + if info["doc"]: + object.__setattr__(node, "__doc__", info["doc"]) + return node @dataclass @@ -444,6 +448,7 @@ def call( doc: Optional[str] = None, sleep: Optional[callable] = None, timeout: int = -1, + **kw, ) -> "Node": """ Call a function node with arguments. @@ -462,6 +467,8 @@ def call( A nullary function that returns sleep time in milliseconds timeout : int, default=-1 Maximum time to wait in milliseconds. If <= 0, wait indefinitely. + **kw : dict + Keyword arguments override any prepop values in the Executable (fn). Returns ------- @@ -475,6 +482,16 @@ def call( Error If the function returns an error """ + if len(kw) > 0: + if isinstance(fn, Node): + fn = fn.value() + if set(kw) - set(fn.prepop): + extras = sorted(set(kw) - set(fn.prepop)) + msg = f"Function called with extraneous kwargs (not in `ex.prepop`): {extras}" + raise Error(msg, origin="dml", type="KeyError") + fn = Executable(uri=fn.uri, data=fn.data, adapter=fn.adapter, prepop={**fn.prepop, **kw}) + # FIXME: replace fails: `TypeError: Executable.__init__() missing 1 required positional argument: 'uri'` + # fn = replace(fn, prepop={**fn.prepop, **kw}) sleep = sleep or BackoffWithJitter() expr = [self.put(x) for x in [fn, *args]] end = current_time_millis() + timeout @@ -602,7 +619,7 @@ class ScalarNode(Node): class ExecutableNode(Node): - def __call__(self, *args, name=None, doc=None, sleep=None, timeout=-1) -> "Node": + def __call__(self, *args, name=None, doc=None, sleep=None, timeout=-1, **kw) -> "Node": """ Call this node as a function. @@ -618,6 +635,8 @@ def __call__(self, *args, name=None, doc=None, sleep=None, timeout=-1) -> "Node" A nullary function that returns sleep time in milliseconds timeout : int, default=-1 Maximum time to wait in milliseconds. -1 means wait forever. + **kw : dict + Keyword arguments override any prepop values in the Executable (fn). Returns ------- @@ -631,7 +650,7 @@ def __call__(self, *args, name=None, doc=None, sleep=None, timeout=-1) -> "Node" Error If the function returns an error """ - return self.dag.call(self, *args, name=name, doc=doc, sleep=sleep, timeout=timeout) + return self.dag.call(self, *args, name=name, doc=doc, sleep=sleep, timeout=timeout, **kw) class CollectionNode(Node): # noqa: F811 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..255f745 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,43 @@ +"""Common test fixtures for dml-util tests.""" + +import logging +import os +from unittest.mock import patch + +import pytest + +from daggerml import Dml + + +@pytest.fixture(autouse=True) +def clear_envvars(): + with patch.dict(os.environ): + # Clear AWS environment variables before any tests run + for k in os.environ: + if k.startswith("AWS_") or k.startswith("DML_"): + del os.environ[k] + os.environ["AWS_SHARED_CREDENTIALS_FILE"] = "/dev/null" + yield + + +@pytest.fixture(autouse=True) +def debug(clear_envvars): + """Fixture to set debug mode for tests.""" + with patch.dict(os.environ, {"DML_DEBUG": "1"}): + logging.basicConfig(level=logging.DEBUG) + yield + + +@pytest.fixture +def dml(tmpdir): + with Dml.temporary(cache_path=str(tmpdir)) as _dml: + with patch.dict(os.environ, DML_FN_CACHE_DIR=_dml.kwargs["config_dir"], **_dml.envvars): + yield _dml + + +@pytest.fixture +def fake_dml(): + # patches Dml and Dag so that neither does anything + with patch("daggerml.core.Dml", autospec=True) as mock_dml: + with patch("daggerml.core.Dag", autospec=True) as mock_dag: + yield mock_dml, mock_dag diff --git a/tests/test_core.py b/tests/test_core.py index 809f480..644ce84 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,6 +1,7 @@ import os +import re from tempfile import TemporaryDirectory -from unittest import TestCase, mock +from unittest import TestCase import pytest @@ -14,113 +15,120 @@ class TestSetAttrs: @pytest.mark.parametrize("x", [[0], (0,), [], ["asdf", None]]) # none contain 1 - def test_list_attrs(self, x): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with dml.new("d0", "d0") as dag: - n0 = dag.put(x) - assert n0.contains(1).value() is False - assert 1 not in n0 - assert len(n0) == len(x) - for index, item_node in enumerate(n0): - item = x[index] - assert item_node.value() == item - assert n0.contains(item).value() is True - assert item in n0 - assert n0[index].value() == item - assert n0.append(1).value() == [*x, 1] - assert n0.conj(1).value() == [*x, 1] + def test_list_attrs(self, x, dml): + dag = dml.new("d0", "d0") + n0 = dag.put(x) + assert n0.contains(1).value() is False + assert 1 not in n0 + assert len(n0) == len(x) + for index, item_node in enumerate(n0): + item = x[index] + assert item_node.value() == item + assert n0.contains(item).value() is True + assert item in n0 + assert n0[index].value() == item + assert n0.append(1).value() == [*x, 1] + assert n0.conj(1).value() == [*x, 1] @pytest.mark.parametrize("x", [{}, {"a": 1}, {"x": 42, "y": {"k0": None}}]) # none contain 'z' - def test_dict_attrs(self, x): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with dml.new("d0", "d0") as dag: - n0 = dag.put(x) - assert n0.contains("z").value() is False - assert "z" not in n0 - assert len(n0) == len(x) - assert n0.get("z", default=123).value() == 123 - for key in n0: - item = x[key] - assert n0[key].value() == item - assert n0.contains(key).value() is True - assert key in n0 - assert n0.get(key).value() == item - assert [(k, v.value()) for k, v in n0.items()] == list(x.items()) - assert n0.keys() == list(x.keys()) - assert [x.value() for x in n0.values()] == list(x.values()) - assert n0.assoc("y", 3).value() == {**x, "y": 3} - assert n0.update({"z": 1, "a": 2}).value() == {**x, "z": 1, "a": 2} - - def test_load_reboot(self): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with dml.new("d0", "d0") as dag: - dag.put(42, name="n0") - dag.commit("foo") - with dml.new("d1", "d1") as dag: - node = dag.load("d0", name="n1") - assert node.dag == dag - assert node.value() == "foo" - assert node.load().n0.value() == 42 - assert dag.load("d0", key="n0").value() == 42 - - def test_node_call_w_literal_deps(self): + def test_dict_attrs(self, x, dml): + dag = dml.new("d0", "d0") + n0 = dag.put(x) + assert n0.contains("z").value() is False + assert "z" not in n0 + assert len(n0) == len(x) + assert n0.get("z", default=123).value() == 123 + for key in n0: + item = x[key] + assert n0[key].value() == item + assert n0.contains(key).value() is True + assert key in n0 + assert n0.get(key).value() == item + assert [(k, v.value()) for k, v in n0.items()] == list(x.items()) + assert n0.keys() == list(x.keys()) + assert [x.value() for x in n0.values()] == list(x.values()) + assert n0.assoc("y", 3).value() == {**x, "y": 3} + assert n0.update({"z": 1, "a": 2}).value() == {**x, "z": 1, "a": 2} + + def test_load_reboot(self, dml): + with dml.new("d0", "d0") as dag: + dag.put(42, name="n0") + dag.commit("foo") + with dml.new("d1", "d1") as dag: + node = dag.load("d0", name="n1") + assert node.dag == dag + assert node.value() == "foo" + assert node.load().n0.value() == 42 + assert dag.load("d0", key="n0").value() == 42 + + def test_node_call_w_literal_deps(self, dml): nums = [1, 2, 3] - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with mock.patch.dict(os.environ, DML_FN_CACHE_DIR=dml.kwargs["config_dir"]): - with dml.new("d0", "d0") as dag: - fn = Executable( - "./tests/assets/fns/sum.py", - adapter="dml-python-fork-adapter", - prepop={"x": 10}, - ) - result = dag.call(fn, *nums) - assert result.value() == sum(nums) - assert "x" in result.load().keys() - assert result.load().x.value() == 10 - - def test_node_call_w_node_deps(self): + dag = dml.new("d0", "d0") + fn = Executable( + "./tests/assets/fns/sum.py", + adapter="dml-python-fork-adapter", + prepop={"x": 10}, + ) + result = dag.call(fn, *nums) + assert result.value() == sum(nums) + assert "x" in result.load().keys() + assert result.load().x.value() == 10 + + def test_node_call_w_node_deps(self, dml): nums = [1, 2, 3] - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with mock.patch.dict(os.environ, DML_FN_CACHE_DIR=dml.kwargs["config_dir"]): - with dml.new("d0", "d0") as dag: - fn = Executable( - "./tests/assets/fns/sum.py", - adapter="dml-python-fork-adapter", - prepop={"x": dag.put(10)}, - ) - result = dag.call(fn, *nums) - assert result.value() == sum(nums) - assert "x" in result.load().keys() - assert result.load().x.value() == 10 - - def test_node_call(self): + dag = dml.new("d0", "d0") + fn = Executable( + "./tests/assets/fns/sum.py", + adapter="dml-python-fork-adapter", + prepop={"x": dag.put(10)}, + ) + result = dag.call(fn, *nums) + assert result.value() == sum(nums) + assert "x" in result.load().keys() + assert result.load().x.value() == 10 + + def test_node_call_w_kwarg(self, dml): nums = [1, 2, 3] - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with mock.patch.dict(os.environ, DML_FN_CACHE_DIR=dml.kwargs["config_dir"]): - with dml.new("d0", "d0") as dag: - fn = dag.put(SUM) - result = fn(*nums) - assert result.value() == sum(nums) + dag = dml.new("d0", "d0") + fn = Executable( + "./tests/assets/fns/sum.py", + adapter="dml-python-fork-adapter", + prepop={"x": 10}, + ) + result = dag.call(fn, *nums, x=100) + assert result.value() == sum(nums) + assert "x" in result.load().keys() + assert result.load().x.value() == 100 - def test_load_recursing(self): + def test_bad_kwarg(self, dml): nums = [1, 2, 3] - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with mock.patch.dict(os.environ, DML_FN_CACHE_DIR=dml.kwargs["config_dir"]): - with dml.new("d0", "d0") as dag: - dag.commit(dag.call(SUM, *nums, name="n1")) - d1 = dml.new("d1", "d1") - n1 = d1.put(dml.load("d0").n1, name="n1_1") - assert n1.dag == d1 - n2 = n1.load().n1.load().num_args - assert n2.value() == len(nums) - assert n1.value() == sum(nums) + dag = dml.new("d0", "d0") + fn = Executable( + "./tests/assets/fns/sum.py", + adapter="dml-python-fork-adapter", + prepop={"x": 10}, + ) + msg = re.escape(r"Function called with extraneous kwargs (not in `ex.prepop`): ['y']") + with pytest.raises(Error, match=msg): + dag.call(fn, *nums, y=100) + + def test_node_call(self, dml): + nums = [1, 2, 3] + dag = dml.new("d0", "d0") + fn = dag.put(SUM) + result = fn(*nums) + assert result.value() == sum(nums) + + def test_load_recursing(self, dml): + nums = [1, 2, 3] + with dml.new("d0", "d0") as dag: + dag.commit(dag.call(SUM, *nums, name="n1")) + d1 = dml.new("d1", "d1") + n1 = d1.put(dml.load("d0").n1, name="n1_1") + assert n1.dag == d1 + n2 = n1.load().n1.load().num_args + assert n2.value() == len(nums) + assert n1.value() == sum(nums) def test_caching(self): nums = [1, 2, 3] @@ -155,16 +163,79 @@ def test_no_caching(self): uid1 = n1.load().uuid.value() assert uid != uid1, "Cached dag should have the same UUID" - def test_nodemap(self): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with dml.new("d0", "d0") as d0: - d0.a = 23 - node = d0.put(42, name="b") - other = d0.put(420) - assert d0.a.value() == 23 - assert list(d0) == ["a", "b"] - d0.commit([node, other]) + def test_nodemap(self, dml): + dag = dml.new("d0", "d0") + dag.a = 23 + node = dag.put(42, name="b") + other = dag.put(420) + assert dag.a.value() == 23 + assert list(dag) == ["a", "b"] + dag.commit([node, other]) + + def test_set_attrs(self, dml): + dag = dml.new("d0", "d0") + n0 = dag.put({0}) + assert n0.contains(1).value() is False + assert n0.contains(0).value() is True + assert 0 in n0 + n1 = n0.append(1) + assert n1.value() == {0, 1} + + def test_load_constructors(self, dml): + dag = dml.new("d0", "d0") + l0 = dag.put(42) + c0 = dag.put({"a": 1, "b": [l0, "23"]}) + assert c0.backtrack("b", 0) == l0 + assert c0.backtrack("b", 1).value() == "23" + assert c0.backtrack("b").backtrack(0) == l0 + assert c0["b"][0] != l0 + c1 = c0["b"] + assert c1.backtrack() == c0 + assert c1.backtrack().backtrack("b", 0) == l0 + + def test_fn_ok_cache(self, dml): + with dml.new("d0", "d0") as dag: + nodes = [dag.call(SUM, i, 1, 2) for i in range(2)] # unique function applications + dag.call(SUM, 0, 1, 2) # add a repeat outside so `nodes` is still unique + dag.commit(nodes[0]) + assert dag.result.value() == 3 + cache_list = dml("cache", "list", as_text=True) # response is jsonlines format + assert len([x for x in cache_list if x.rstrip() == "{"]) == 2 # this gets us unique maps + + def test_async_fn_ok(self, dml): + debug_file = os.path.join(dml.config_dir, "debug") + with dml.new("d0", "d0") as dag: + n1 = dag.call(ASYNC, 1, 2, 3) + dag.commit(n1) + assert n1.value() == 6 + with open(debug_file, "r") as f: + assert len([1 for _ in f]) == 2 + + def test_async_fn_error(self, dml): + with pytest.raises(Error, match=r".*unsupported operand type.*"): + with dml.new("d0", "d0") as dag: + dag.call(ASYNC, 1, 2, "asdf") + info = [x for x in dml("dag", "list") if x["name"] == "d0"] + assert len(info) == 1 + + def test_async_fn_timeout(self, dml): + with pytest.raises(TimeoutError): + with dml.new("d0", "d0") as dag: + dag.call(TIMEOUT, 1, 2, 3, timeout=1000) + + def test_load(self, dml): + with dml.new("d0", "d0") as dag: + dag.put(42, name="n0") + dag.commit("foo") + dl = dml.load("d0") + assert isinstance(dl, Dag) + assert dl.n0.value() == 42 + assert dl.result.value() == "foo" + + def test_doc(self, dml): + dag = dml.new("d0", "d0") + n = dag.put(42, name="n0", doc="The answer to life, the universe, and everything") + assert n.__doc__ == "The answer to life, the universe, and everything" class TestBasic(TestCase): @@ -287,83 +358,3 @@ def message_handler(dump): assert len(dml("dag", "list", "--all")) > 1 dml("dag", "delete", dag["name"], "Deleting dag") dml("repo", "gc", as_text=True) - - def test_set_attrs(self): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with dml.new("d0", "d0") as dag: - n0 = dag.put({0}) - assert n0.contains(1).value() is False - assert n0.contains(0).value() is True - assert 0 in n0 - n1 = n0.append(1) - assert n1.value() == {0, 1} - - def test_load_constructors(self): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - dag = dml.new("d0", "d0") - l0 = dag.put(42) - c0 = dag.put({"a": 1, "b": [l0, "23"]}) - assert c0.backtrack("b", 0) == l0 - assert c0.backtrack("b", 1).value() == "23" - assert c0.backtrack("b").backtrack(0) == l0 - assert c0["b"][0] != l0 - c1 = c0["b"] - assert c1.backtrack() == c0 - assert c1.backtrack().backtrack("b", 0) == l0 - - def test_fn_ok_cache(self): - with TemporaryDirectory(prefix="dml-test-") as fn_cache_dir: - with mock.patch.dict(os.environ, DML_FN_CACHE_DIR=fn_cache_dir): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with dml.new("d0", "d0") as dag: - nodes = [dag.call(SUM, i, 1, 2) for i in range(2)] # unique function applications - dag.call(SUM, 0, 1, 2) # add a repeat outside so `nodes` is still unique - dag.commit(nodes[0]) - self.assertEqual(dag.result.value(), 3) - cache_list = dml("cache", "list", as_text=True) # response is jsonlines format - assert len([x for x in cache_list if x.rstrip() == "{"]) == 2 # this gets us unique maps - - def test_async_fn_ok(self): - with TemporaryDirectory(prefix="dml-test-") as fn_cache_dir: - with mock.patch.dict(os.environ, DML_FN_CACHE_DIR=fn_cache_dir): - debug_file = os.path.join(fn_cache_dir, "debug") - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with dml.new("d0", "d0") as dag: - n1 = dag.call(ASYNC, 1, 2, 3) - dag.commit(n1) - self.assertEqual(n1.value(), 6) - with open(debug_file, "r") as f: - self.assertEqual(len([1 for _ in f]), 2) - - def test_async_fn_error(self): - with TemporaryDirectory(prefix="dml-test-") as fn_cache_dir: - with mock.patch.dict(os.environ, DML_FN_CACHE_DIR=fn_cache_dir): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with self.assertRaisesRegex(Error, r".*unsupported operand type.*"): - with dml.new("d0", "d0") as dag: - dag.call(ASYNC, 1, 2, "asdf") - info = [x for x in dml("dag", "list") if x["name"] == "d0"] - self.assertEqual(len(info), 1) - - def test_async_fn_timeout(self): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with self.assertRaises(TimeoutError): - with dml.new("d0", "d0") as dag: - dag.call(TIMEOUT, 1, 2, 3, timeout=1000) - - def test_load(self): - with TemporaryDirectory(prefix="dml-cache-") as cache_path: - with Dml.temporary(cache_path=cache_path) as dml: - with dml.new("d0", "d0") as dag: - dag.put(42, name="n0") - dag.commit("foo") - dl = dml.load("d0") - assert isinstance(dl, Dag) - self.assertEqual(dl.n0.value(), 42) - self.assertEqual(dl.result.value(), "foo") From beafe534366f90d71e1b3d21aef8988a61d371e1 Mon Sep 17 00:00:00 2001 From: Aaron Niskin Date: Sun, 5 Oct 2025 17:00:56 -0700 Subject: [PATCH 2/3] Update src/daggerml/core.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/daggerml/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daggerml/core.py b/src/daggerml/core.py index 2c96cf2..123d030 100644 --- a/src/daggerml/core.py +++ b/src/daggerml/core.py @@ -487,7 +487,7 @@ def call( fn = fn.value() if set(kw) - set(fn.prepop): extras = sorted(set(kw) - set(fn.prepop)) - msg = f"Function called with extraneous kwargs (not in `ex.prepop`): {extras}" + msg = f"Function called with extraneous kwargs (not in `fn.prepop`): {extras}" raise Error(msg, origin="dml", type="KeyError") fn = Executable(uri=fn.uri, data=fn.data, adapter=fn.adapter, prepop={**fn.prepop, **kw}) # FIXME: replace fails: `TypeError: Executable.__init__() missing 1 required positional argument: 'uri'` From 2f50eefdc10835174f27a501ab4f550cb9e4c383 Mon Sep 17 00:00:00 2001 From: Aaron Niskin Date: Sun, 5 Oct 2025 17:04:04 -0700 Subject: [PATCH 3/3] fixed bug --- tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index 644ce84..6ec6e6c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -108,7 +108,7 @@ def test_bad_kwarg(self, dml): adapter="dml-python-fork-adapter", prepop={"x": 10}, ) - msg = re.escape(r"Function called with extraneous kwargs (not in `ex.prepop`): ['y']") + msg = re.escape(r"Function called with extraneous kwargs (not in `fn.prepop`): ['y']") with pytest.raises(Error, match=msg): dag.call(fn, *nums, y=100)