diff --git a/circup/bundle.py b/circup/bundle.py index 08ee5d2..8e39791 100644 --- a/circup/bundle.py +++ b/circup/bundle.py @@ -14,7 +14,6 @@ DATA_DIR, PLATFORMS, REQUESTS_TIMEOUT, - tags_data_load, get_latest_release_from_url, ) @@ -46,6 +45,8 @@ def __init__(self, repo): # tag self._current = None self._latest = None + self.pinned_tag = None + self._available = [] def lib_dir(self, platform): """ @@ -99,21 +100,27 @@ def requirements_for(self, library_name, toml_file=False): @property def current_tag(self): """ - Lazy load current cached tag from the BUNDLE_DATA json file. + The current tag for the project. If the tag hasn't been explicitly set + this will be the pinned tag, if one is set. If there is no pinned tag, + this will be the latest available tag that is locally available. - :return: The current cached tag value for the project. + :return: The current tag value for the project. """ if self._current is None: - self._current = tags_data_load(logger).get(self.key, "0") + self._current = self.pinned_tag or ( + # This represents the latest version locally available + self._available[-1] + if len(self._available) > 0 + else None + ) return self._current @current_tag.setter def current_tag(self, tag): """ - Set the current cached tag (after updating). + Set the current tag (after updating). :param str tag: The new value for the current tag. - :return: The current cached tag value for the project. """ self._current = tag @@ -130,6 +137,48 @@ def latest_tag(self): ) return self._latest + @property + def available_tags(self): + """ + The locally available tags to use for the project. + + :return: All tags available for the project. + """ + return tuple(self._available) + + @available_tags.setter + def available_tags(self, tags): + """ + Set the available tags. + + :param str|list tags: The new value for the locally available tags. + """ + if isinstance(tags, str): + tags = [tags] + self._available = sorted(tags) + + def add_tag(self, tag: str) -> None: + """ + Add a tag to the list of available tags. + + This will add the tag if it isn't already present in the list of + available tags. The tag will be added so that the list is sorted in an + increasing order. This ensures that that last tag is always the latest. + + :param str tag: The tag to add to the list of available tags. + """ + if tag in self._available: + # The tag is already stored for some reason, lets not add it again + return + + for rev_i, available_tag in enumerate(reversed(self._available)): + if int(tag) > int(available_tag): + i = len(self._available) - rev_i + self._available.insert(i, tag) + break + else: + self._available.insert(0, tag) + def validate(self): """ Test the existence of the expected URLs (not their content) @@ -166,5 +215,7 @@ def __repr__(self): "url_format": self.url_format, "current": self._current, "latest": self._latest, + "pinned": self.pinned_tag, + "available": self._available, } ) diff --git a/circup/command_utils.py b/circup/command_utils.py index c12d546..d972cb1 100644 --- a/circup/command_utils.py +++ b/circup/command_utils.py @@ -12,10 +12,10 @@ from subprocess import check_output import sys -import shutil import zipfile import json import re +from pathlib import Path import toml import requests import click @@ -29,7 +29,6 @@ BUNDLE_CONFIG_LOCAL, BUNDLE_DATA, NOT_MCU_LIBRARIES, - tags_data_load, ) from circup.logging import logger from circup.module import Module @@ -106,7 +105,7 @@ def completion_for_install(ctx, param, incomplete): with the ``circup install`` command. """ # pylint: disable=unused-argument - available_modules = get_bundle_versions(get_bundles_list(), avoid_download=True) + available_modules = get_bundle_versions(get_bundles_list(None), avoid_download=True) module_names = {m.replace(".py", "") for m in available_modules} if incomplete: module_names = [name for name in module_names if name.startswith(incomplete)] @@ -121,7 +120,9 @@ def completion_for_example(ctx, param, incomplete): """ # pylint: disable=unused-argument, consider-iterating-dictionary - available_examples = get_bundle_examples(get_bundles_list(), avoid_download=True) + available_examples = get_bundle_examples( + get_bundles_list(None), avoid_download=True + ) matching_examples = [] for term in incomplete: _examples = [ @@ -134,17 +135,18 @@ def completion_for_example(ctx, param, incomplete): return sorted(matching_examples) -def ensure_latest_bundle(bundle): +def ensure_bundle_tag(bundle, tag): """ - Ensure that there's a copy of the latest library bundle available so circup - can check the metadata contained therein. + Ensure that there's a copy of the library bundle with the version referenced + by the tag. :param Bundle bundle: the target Bundle object. + :param tag: the target bundle's tag (version). + + :return: If the bundle is available. """ - logger.info("Checking library updates for %s.", bundle.key) - tag = bundle.latest_tag do_update = False - if tag == bundle.current_tag: + if tag in bundle.available_tags: for platform in PLATFORMS: # missing directories (new platform added on an existing install # or side effect of pytest or network errors) @@ -156,19 +158,78 @@ def ensure_latest_bundle(bundle): logger.info("New version available (%s).", tag) try: get_bundle(bundle, tag) - tags_data_save_tag(bundle.key, tag) + tags_data_save_tags(bundle.key, bundle.available_tags) except requests.exceptions.HTTPError as ex: - # See #20 for reason for this click.secho( - ( - "There was a problem downloading that platform bundle. " - "Skipping and using existing download if available." - ), - fg="red", + f"There was a problem downloading the {bundle.key} bundle.", fg="red" ) logger.exception(ex) + return False else: - logger.info("Current bundle up to date %s.", tag) + logger.info("Current bundle version available (%s).", tag) + return True + + +def ensure_latest_bundle(bundle): + """ + Ensure that there's a copy of the latest library bundle available so circup + can check the metadata contained therein. + + :param Bundle bundle: the target Bundle object. + """ + logger.info("Checking library updates for %s.", bundle.key) + tag = bundle.latest_tag + is_available = ensure_bundle_tag(bundle, tag) + if is_available: + click.echo(f"Using latest bundle for {bundle.key} ({tag}).") + else: + if bundle.current_tag is None: + # See issue #20 for reason for this + click.secho("Please try again in a moment.", fg="red") + sys.exit(1) + else: + # See PR #184 for reason for this + click.secho( + f"Skipping and using existing bundle for {bundle.key} ({bundle.current_tag}).", + fg="red", + ) + + +def ensure_pinned_bundle(bundle): + """ + Ensure that there's a copy of the pinned library bundle available so circup + can check the metadata contained therein. + + :param Bundle bundle: the target Bundle object. + """ + logger.info("Checking library for %s.", bundle.key) + tag = bundle.pinned_tag + is_available = ensure_bundle_tag(bundle, tag) + if is_available: + click.echo(f"Using pinned bundle for {bundle.key} ({tag}).") + else: + click.secho( + ( + "Check pinned version to make sure it is correct and check " + f"{bundle.url} to make sure the version ({tag}) exists." + ), + fg="red", + ) + sys.exit(1) + + +def ensure_bundle(bundle): + """ + Ensure that there's a copy of either the pinned library bundle, or if no + version is pinned, the latest library bundle available so circup can check + the metadata contained therein. + + :param Bundle bundle: the target Bundle object. + """ + if bundle.pinned_tag is not None: + ensure_pinned_bundle(bundle) + else: + ensure_latest_bundle(bundle) def find_device(): @@ -301,7 +362,7 @@ def get_bundle(bundle, tag): :param Bundle bundle: the target Bundle object. :param str tag: The GIT tag to use to download the bundle. """ - click.echo(f"Downloading latest bundles for {bundle.key} ({tag}).") + click.echo(f"Downloading bundles for {bundle.key} ({tag}).") for platform, github_string in PLATFORMS.items(): # Report the platform: "8.x-mpy", etc. click.echo(f"{github_string}:") @@ -323,10 +384,9 @@ def get_bundle(bundle, tag): pbar.update(len(chunk)) logger.info("Saved to %s", temp_zip) temp_dir = bundle.dir.format(platform=platform) - if os.path.isdir(temp_dir): - shutil.rmtree(temp_dir) with zipfile.ZipFile(temp_zip, "r") as zfile: zfile.extractall(temp_dir) + bundle.add_tag(tag) bundle.current_tag = tag click.echo("\nOK\n") @@ -348,7 +408,7 @@ def get_bundle_examples(bundles_list, avoid_download=False): try: for bundle in bundles_list: if not avoid_download or not os.path.isdir(bundle.lib_dir("py")): - ensure_latest_bundle(bundle) + ensure_bundle(bundle) path = bundle.examples_dir("py") meta_saved = os.path.join(path, "../bundle_examples.json") if os.path.exists(meta_saved): @@ -383,9 +443,10 @@ def get_bundle_examples(bundles_list, avoid_download=False): def get_bundle_versions(bundles_list, avoid_download=False): """ - Returns a dictionary of metadata from modules in the latest known release - of the library bundle. Uses the Python version (rather than the compiled - version) of the library modules. + Returns a dictionary of metadata from modules in either the pinned release + if one is present in 'pyproject.toml', or the latest known release of the + library bundle. Uses the Python version (rather than the compiled version) + of the library modules. :param List[Bundle] bundles_list: List of supported bundles as Bundle objects. :param bool avoid_download: if True, download the bundle only if missing. @@ -395,7 +456,7 @@ def get_bundle_versions(bundles_list, avoid_download=False): all_the_modules = dict() for bundle in bundles_list: if not avoid_download or not os.path.isdir(bundle.lib_dir("py")): - ensure_latest_bundle(bundle) + ensure_bundle(bundle) path = bundle.lib_dir("py") path_modules = _get_modules_file(path, logger) for name, module in path_modules.items(): @@ -443,14 +504,29 @@ def get_bundles_local_dict(): return dict() -def get_bundles_list(): +def get_bundles_list(bundle_tags): """ Retrieve the list of bundles from the config dictionary. + :param Dict[str,str]|None bundle_tags: Pinned bundle tags. These override + any tags found in the pyproject.toml. :return: List of supported bundles as Bundle objects. """ bundle_config = get_bundles_dict() + tags = tags_data_load() + pyproject = find_pyproject() + pinned_tags = ( + pyproject_bundle_versions(pyproject) if pyproject is not None else None + ) + + if bundle_tags is not None: + pinned_tags = bundle_tags if pinned_tags is None else pinned_tags | bundle_tags + bundles_list = [Bundle(bundle_config[b]) for b in bundle_config] + for bundle in bundles_list: + bundle.available_tags = tags.get(bundle.key, []) + if pinned_tags is not None: + bundle.pinned_tag = pinned_tags.get(bundle.key) logger.info("Using bundles: %s", ", ".join(b.key for b in bundles_list)) return bundles_list @@ -613,15 +689,38 @@ def save_local_bundles(bundles_data): os.unlink(BUNDLE_CONFIG_LOCAL) -def tags_data_save_tag(key, tag): +def tags_data_load(): + """ + Load the list of the version tags of the bundles on disk. + + :return: a dict() of tags indexed by Bundle identifiers/keys. + """ + tags_data = None + try: + with open(BUNDLE_DATA, encoding="utf-8") as data: + try: + tags_data = json.load(data) + except json.decoder.JSONDecodeError as ex: + # Sometimes (why?) the JSON file becomes corrupt. In which case + # log it and carry on as if setting up for first time. + logger.error("Could not parse %s", BUNDLE_DATA) + logger.exception(ex) + except FileNotFoundError: + pass + if not isinstance(tags_data, dict): + tags_data = {} + return tags_data + + +def tags_data_save_tags(key, tags): """ - Add or change the saved tag value for a bundle. + Add or change the saved available tags value for a bundle. :param str key: The bundle's identifier/key. - :param str tag: The new tag for the bundle. + :param List[str] tags: The new tags for the bundle. """ - tags_data = tags_data_load(logger) - tags_data[key] = tag + tags_data = tags_data_load() + tags_data[key] = tags with open(BUNDLE_DATA, "w", encoding="utf-8") as data: json.dump(tags_data, data) @@ -855,3 +954,43 @@ def is_virtual_env_active(): virtual environment, regardless how circup is installed. """ return "VIRTUAL_ENV" in os.environ + + +def find_pyproject(): + """ + Look for a pyproject.toml in the current directory or its parent directories. + + :return: The path to the pyproject.toml for the project, or None if it + couldn't be found. + """ + logger.info("Looking for pyproject.toml file.") + cwd = Path.cwd() + candidates = [cwd, cwd.parent] + + for path in candidates: + pyproject_file = path / "pyproject.toml" + + if pyproject_file.exists(): + logger.info("Found pyproject.toml at '%s'", str(pyproject_file)) + return pyproject_file + + logger.info("No pyproject.toml file found.") + return None + + +def pyproject_bundle_versions(pyproject_file): + """ + Check for specified bundle versions. + """ + pyproject_toml_data = toml.load(pyproject_file) + return pyproject_toml_data.get("tool", {}).get("circup", {}).get("bundle-versions") + + +def parse_cli_bundle_tags(bundle_tags_cli): + """Parse bundle tags that were provided from the command line.""" + bundle_tags = {} + for bundle_tag_item in bundle_tags_cli: + item = bundle_tag_item.split("=") + if len(item) == 2: + bundle_tags[item[0].strip()] = item[1].strip() + return bundle_tags if len(bundle_tags) > 0 else None diff --git a/circup/commands.py b/circup/commands.py index ffacc49..34f5a83 100644 --- a/circup/commands.py +++ b/circup/commands.py @@ -37,6 +37,7 @@ libraries_from_auto_file, get_dependencies, get_bundles_local_dict, + parse_cli_bundle_tags, save_local_bundles, get_bundles_dict, completion_for_example, @@ -84,13 +85,32 @@ help="Manual CircuitPython version. If provided in combination " "with --board-id, it overrides the detected CPy version.", ) +@click.option( + "bundle_versions", + "--bundle-version", + multiple=True, + help="Specify the version to use for a bundle. Include the bundle name and " + "the version separated by '=', similar to the format of requirements.txt. " + "This option can be used multiple times for different bundles. Bundle " + "version values provided here will override any pinned values from the " + "pyproject.toml.", +) @click.version_option( prog_name="Circup", message="%(prog)s, A CircuitPython module updater. Version %(version)s", ) @click.pass_context def main( # pylint: disable=too-many-locals - ctx, verbose, path, host, port, password, timeout, board_id, cpy_version + ctx, + verbose, + path, + host, + port, + password, + timeout, + board_id, + cpy_version, + bundle_versions, ): # pragma: no cover """ A tool to manage and update libraries on a CircuitPython device. @@ -98,6 +118,9 @@ def main( # pylint: disable=too-many-locals # pylint: disable=too-many-arguments,too-many-branches,too-many-statements,too-many-locals, R0801 ctx.ensure_object(dict) ctx.obj["TIMEOUT"] = timeout + ctx.obj["BUNDLE_TAGS"] = ( + parse_cli_bundle_tags(bundle_versions) if len(bundle_versions) > 0 else None + ) if password is None: password = os.getenv("CIRCUP_WEBWORKFLOW_PASSWORD") @@ -210,7 +233,7 @@ def freeze(ctx, requirement): # pragma: no cover device. Option -r saves output to requirements.txt file """ logger.info("Freeze") - modules = find_modules(ctx.obj["backend"], get_bundles_list()) + modules = find_modules(ctx.obj["backend"], get_bundles_list(ctx.obj["BUNDLE_TAGS"])) if modules: output = [] for module in modules: @@ -258,7 +281,9 @@ def list_cli(ctx): # pragma: no cover modules = [ m.row - for m in find_modules(ctx.obj["backend"], get_bundles_list()) + for m in find_modules( + ctx.obj["backend"], get_bundles_list(ctx.obj["BUNDLE_TAGS"]) + ) if m.outofdate ] if modules: @@ -334,7 +359,7 @@ def install( # pylint: disable=too-many-branches # TODO: Ensure there's enough space on the device - available_modules = get_bundle_versions(get_bundles_list()) + available_modules = get_bundle_versions(get_bundles_list(ctx.obj["BUNDLE_TAGS"])) mod_names = {} for module, metadata in available_modules.items(): mod_names[module.replace(".py", "").lower()] = metadata @@ -440,7 +465,7 @@ def example(ctx, examples, op_list, rename, overwrite): else: click.echo("Available example libraries:") available_examples = get_bundle_examples( - get_bundles_list(), avoid_download=True + get_bundles_list(ctx.obj["BUNDLE_TAGS"]), avoid_download=True ) lib_names = { str(key.split(os.path.sep)[0]): value @@ -451,7 +476,7 @@ def example(ctx, examples, op_list, rename, overwrite): for example_arg in examples: available_examples = get_bundle_examples( - get_bundles_list(), avoid_download=True + get_bundles_list(ctx.obj["BUNDLE_TAGS"]), avoid_download=True ) if example_arg in available_examples: filename = available_examples[example_arg].split(os.path.sep)[-1] @@ -485,14 +510,15 @@ def example(ctx, examples, op_list, rename, overwrite): @main.command() @click.argument("match", required=False, nargs=1) -def show(match): # pragma: no cover +@click.pass_context +def show(ctx, match): # pragma: no cover """ Show a list of available modules in the bundle. These are modules which *could* be installed on the device. If MATCH is specified only matching modules will be listed. """ - available_modules = get_bundle_versions(get_bundles_list()) + available_modules = get_bundle_versions(get_bundles_list(ctx.obj["BUNDLE_TAGS"])) module_names = sorted([m.replace(".py", "") for m in available_modules]) if match is not None: match = match.lower() @@ -555,7 +581,7 @@ def update(ctx, update_all): # pragma: no cover """ logger.info("Update") # Grab current modules. - bundles_list = get_bundles_list() + bundles_list = get_bundles_list(ctx.obj["BUNDLE_TAGS"]) installed_modules = find_modules(ctx.obj["backend"], bundles_list) modules_to_update = [m for m in installed_modules if m.outofdate] @@ -654,13 +680,14 @@ def update(ctx, update_all): # pragma: no cover @main.command("bundle-show") @click.option("--modules", is_flag=True, help="List all the modules per bundle.") -def bundle_show(modules): +@click.pass_context +def bundle_show(ctx, modules): """ - Show the list of bundles, default and local, with URL, current version - and latest version retrieved from the web. + Show the list of bundles, default and local, with URL, current version, + available versions, and latest version retrieved from the web. """ local_bundles = get_bundles_local_dict().values() - bundles = get_bundles_list() + bundles = get_bundles_list(ctx.obj["BUNDLE_TAGS"]) available_modules = get_bundle_versions(bundles) for bundle in bundles: @@ -669,7 +696,13 @@ def bundle_show(modules): else: click.secho(bundle.key, fg="green") click.echo(" " + bundle.url) - click.echo(" version = " + bundle.current_tag) + click.echo( + " version = " + + bundle.current_tag + + (" (pinned)" if bundle.pinned_tag is not None else "") + ) + click.echo(" available versions:") + click.echo(" " + "\n ".join(bundle.available_tags)) if modules: click.echo("Modules:") for name, mod in sorted(available_modules.items()): @@ -739,7 +772,7 @@ def bundle_add(ctx, bundle): # save the bundles list save_local_bundles(bundles_dict) # update and get the new bundles for the first time - get_bundle_versions(get_bundles_list()) + get_bundle_versions(get_bundles_list(ctx.obj["BUNDLE_TAGS"])) @main.command("bundle-remove") @@ -788,3 +821,40 @@ def bundle_remove(bundle, reset): ) if modified: save_local_bundles(bundles_local_dict) + + +@main.command() +@click.pass_context +def bundle_freeze(ctx): # pragma: no cover + """ + Output details of all the bundles for modules found on the connected + CIRCUITPYTHON device. Copying the output into pyproject.toml will pin the + bundles. + """ + logger.info("Bundle Freeze") + device_modules = ctx.obj["backend"].get_device_versions() + if not device_modules: + click.echo("No modules found on the device.") + return + + available_modules = get_bundle_versions(get_bundles_list(ctx.obj["BUNDLE_TAGS"])) + bundles_used = {} + for name in device_modules: + module = available_modules.get(name) + if module: + bundle = module["bundle"] + bundles_used[bundle.key] = bundle.current_tag + + if bundles_used: + click.echo( + "Copy the following lines into your pyproject.toml to pin " + "the bundles used with modules on the device:\n" + ) + output = ["[tool.circup.bundle-versions]"] + for bundle_name, version in bundles_used.items(): + output.append(f'"{bundle_name}" = "{version}"') + for line in output: + click.echo(line) + logger.info(line) + else: + click.echo("No bundles used with the modules on the device.") diff --git a/circup/shared.py b/circup/shared.py index 68921f6..cf0364b 100644 --- a/circup/shared.py +++ b/circup/shared.py @@ -9,7 +9,6 @@ import glob import os import re -import json import importlib.resources import appdirs import requests @@ -192,29 +191,6 @@ def extract_metadata(path, logger): return result -def tags_data_load(logger): - """ - Load the list of the version tags of the bundles on disk. - - :return: a dict() of tags indexed by Bundle identifiers/keys. - """ - tags_data = None - try: - with open(BUNDLE_DATA, encoding="utf-8") as data: - try: - tags_data = json.load(data) - except json.decoder.JSONDecodeError as ex: - # Sometimes (why?) the JSON file becomes corrupt. In which case - # log it and carry on as if setting up for first time. - logger.error("Could not parse %s", BUNDLE_DATA) - logger.exception(ex) - except FileNotFoundError: - pass - if not isinstance(tags_data, dict): - tags_data = {} - return tags_data - - def get_latest_release_from_url(url, logger): """ Find the tag name of the latest release by using HTTP HEAD and decoding the redirect. diff --git a/tests/test_circup.py b/tests/test_circup.py index 95e68c0..491ace7 100644 --- a/tests/test_circup.py +++ b/tests/test_circup.py @@ -38,13 +38,18 @@ import circup from circup import DiskBackend from circup.command_utils import ( + ensure_bundle, + ensure_pinned_bundle, find_device, ensure_latest_bundle, + find_pyproject, get_bundle, get_bundles_dict, imports_from_code, get_all_imports, libraries_from_auto_file, + pyproject_bundle_versions, + tags_data_load, ) from circup.shared import PLATFORMS from circup.module import Module @@ -59,6 +64,8 @@ with open(TEST_BUNDLE_CONFIG_LOCAL_JSON, "rb") as tbc: TEST_BUNDLE_LOCAL_DATA = json.load(tbc) +TAG_CONTEXT_OBJ = {"BUNDLE_TAGS": {}} + def test_Bundle_init(): """ @@ -80,6 +87,8 @@ def test_Bundle_init(): "adafruit-circuitpython-bundle-{platform}-{tag}.zip", "current": None, "latest": None, + "pinned": None, + "available": [], } ) @@ -88,9 +97,8 @@ def test_Bundle_lib_dir(): """ Check the return of Bundle.lib_dir with a test tag. """ - bundle_data = {TEST_BUNDLE_NAME: "TESTTAG"} - with mock.patch("circup.bundle.tags_data_load", return_value=bundle_data): - bundle = circup.Bundle(TEST_BUNDLE_NAME) + bundle = circup.Bundle(TEST_BUNDLE_NAME) + with mock.patch.object(bundle, "_available", ["TESTTAG"]): assert bundle.current_tag == "TESTTAG" assert bundle.lib_dir("py") == ( circup.shared.DATA_DIR + "/" @@ -111,7 +119,7 @@ def test_Bundle_latest_tag(): bundle_data = {TEST_BUNDLE_NAME: "TESTTAG"} with mock.patch( "circup.bundle.get_latest_release_from_url", return_value="BESTESTTAG" - ), mock.patch("circup.bundle.tags_data_load", return_value=bundle_data): + ), mock.patch("circup.command_utils.tags_data_load", return_value=bundle_data): bundle = circup.Bundle(TEST_BUNDLE_NAME) assert bundle.latest_tag == "BESTESTTAG" @@ -162,8 +170,12 @@ def test_get_bundles_list(): """ with mock.patch( "circup.command_utils.BUNDLE_CONFIG_FILE", TEST_BUNDLE_CONFIG_JSON - ), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""): - bundles_list = circup.get_bundles_list() + ), mock.patch("circup.command_utils.BUNDLE_CONFIG_LOCAL", ""), mock.patch( + "circup.command_utils.tags_data_load", return_value=dict() + ), mock.patch( + "circup.command_utils.find_pyproject", return_value=None + ): + bundles_list = circup.get_bundles_list(None) bundle = circup.Bundle(TEST_BUNDLE_NAME) assert repr(bundles_list) == repr([bundle]) @@ -942,15 +954,19 @@ def test_ensure_latest_bundle_bad_bundle_data(): manual testing) then default to update. """ with mock.patch("circup.bundle.Bundle.latest_tag", "12345"), mock.patch( - "circup.command_utils.open" - ), mock.patch("circup.command_utils.get_bundle") as mock_gb, mock.patch( + "circup.command_utils.get_bundle" + ) as mock_gb, mock.patch( "builtins.open", mock.mock_open(read_data="}{INVALID_JSON") ), mock.patch( "circup.command_utils.json.dump" ), mock.patch( - "circup.bundle.logger" + "circup.command_utils.tags_data_save_tags" + ), mock.patch( + "circup.command_utils.logger" ) as mock_logger: bundle = circup.Bundle(TEST_BUNDLE_NAME) + tags = tags_data_load() + bundle.available_tags = tags.get(bundle.key, []) ensure_latest_bundle(bundle) mock_gb.assert_called_once_with(bundle, "12345") @@ -977,10 +993,10 @@ def test_ensure_latest_bundle_to_update(): def test_ensure_latest_bundle_to_update_http_error(): """ - If an HTTP error happens during a bundle update, print a friendly - error message, and use existing bundle. + If an HTTP error happens during a bundle update for the latest version, + print a friendly error message, and use latest existing bundle. """ - tags_data = {TEST_BUNDLE_NAME: "12345"} + tags_data = {TEST_BUNDLE_NAME: ["12345", "67890"]} with mock.patch("circup.Bundle.latest_tag", "54321"), mock.patch( # "circup.tags_data_load", return_value=tags_data # ), mock.patch( @@ -994,13 +1010,45 @@ def test_ensure_latest_bundle_to_update_http_error(): ) as mock_json, mock.patch( "circup.click.secho" ) as mock_click: - circup.Bundle.tags_data = dict() mock_json.load.return_value = tags_data bundle = circup.Bundle(TEST_BUNDLE_NAME) + tags = tags_data_load() + bundle.available_tags = tags.get(bundle.key, []) ensure_latest_bundle(bundle) mock_gb.assert_called_once_with(bundle, "54321") + assert bundle.current_tag == "67890" + assert mock_json.dump.call_count == 0 # not saved. + assert mock_click.call_count == 2 # friendly message. + + +def test_ensure_pinned_bundle_to_exit_http_error(): + """ + If an HTTP error happens during a bundle update for the pinned version, + print a friendly error message, and exit. + """ + tags_data = {TEST_BUNDLE_NAME: ["12345"]} + bundle = circup.Bundle(TEST_BUNDLE_NAME) + with mock.patch.object(bundle, "pinned_tag", "54321"), mock.patch( + "circup.os.path.isfile", + return_value=True, + ), mock.patch("circup.command_utils.open"), mock.patch( + "circup.command_utils.get_bundle", + side_effect=requests.exceptions.HTTPError("404"), + ) as mock_gb, mock.patch( + "circup.command_utils.json" + ) as mock_json, mock.patch( + "circup.click.secho" + ) as mock_click, mock.patch( + "circup.sys.exit" + ) as mock_exit: + mock_json.load.return_value = tags_data + tags = tags_data_load() + bundle.available_tags = tags.get(bundle.key, []) + ensure_pinned_bundle(bundle) + mock_gb.assert_called_once_with(bundle, "54321") assert mock_json.dump.call_count == 0 # not saved. - assert mock_click.call_count == 1 # friendly message. + assert mock_click.call_count == 2 # friendly message. + mock_exit.assert_called_once_with(1) def test_ensure_latest_bundle_no_update(): @@ -1015,7 +1063,7 @@ def test_ensure_latest_bundle_no_update(): ) as mock_gb, mock.patch( "circup.command_utils.os.path.isfile", return_value=True ), mock.patch( - "circup.bundle.Bundle.current_tag", "12345" + "circup.bundle.Bundle.available_tags", ["12345"] ), mock.patch( "circup.command_utils.logger" ) as mock_logger: @@ -1044,10 +1092,10 @@ def test_get_bundle(): ) as mock_open, mock.patch( "circup.os.path.isdir", return_value=True ), mock.patch( - "circup.command_utils.shutil" - ) as mock_shutil, mock.patch( "circup.command_utils.zipfile" - ) as mock_zipfile: + ) as mock_zipfile, mock.patch( + "circup.Bundle.add_tag" + ) as mock_add_tag: mock_click.progressbar = mock_progress mock_requests.get().status_code = mock_requests.codes.ok mock_requests.get.reset_mock() @@ -1058,9 +1106,9 @@ def test_get_bundle(): _bundle_count = len(PLATFORMS) assert mock_requests.get.call_count == _bundle_count assert mock_open.call_count == _bundle_count - assert mock_shutil.rmtree.call_count == _bundle_count assert mock_zipfile.ZipFile.call_count == _bundle_count assert mock_zipfile.ZipFile().__enter__().extractall.call_count == _bundle_count + assert mock_add_tag.call_count == 1 def test_get_bundle_network_error(): @@ -1069,7 +1117,7 @@ def test_get_bundle_network_error(): then the error is logged and re-raised for the HTTP status code. """ with mock.patch("circup.command_utils.requests") as mock_requests, mock.patch( - "circup.shared.tags_data_load", return_value=dict() + "circup.command_utils.tags_data_load", return_value=dict() ), mock.patch("circup.command_utils.logger") as mock_logger: # Force failure with != requests.codes.ok mock_requests.get().status_code = mock_requests.codes.BANG @@ -1099,7 +1147,7 @@ def test_show_command(): with mock.patch( "circup.commands.get_bundle_versions", return_value=test_bundle_modules ): - result = runner.invoke(circup.show) + result = runner.invoke(circup.main, ["show"], obj=TAG_CONTEXT_OBJ) assert result.exit_code == 0 assert all(m.replace(".py", "") in result.output for m in test_bundle_modules) @@ -1113,7 +1161,7 @@ def test_show_match_command(): with mock.patch( "circup.commands.get_bundle_versions", return_value=test_bundle_modules ): - result = runner.invoke(circup.show, ["t"]) + result = runner.invoke(circup.show, ["t"], obj=TAG_CONTEXT_OBJ) assert result.exit_code == 0 assert "one" not in result.output @@ -1127,7 +1175,7 @@ def test_show_match_py_command(): with mock.patch( "circup.commands.get_bundle_versions", return_value=test_bundle_modules ): - result = runner.invoke(circup.show, ["py"]) + result = runner.invoke(circup.show, ["py"], obj=TAG_CONTEXT_OBJ) assert result.exit_code == 0 assert "0 shown" in result.output @@ -1316,5 +1364,100 @@ def test_install_auto_file_bad(): "--auto-file", "./tests/bad_python.py", ], + obj=TAG_CONTEXT_OBJ, ) assert result.exit_code == 2 + + +def test_current_tag(): + """ + Make sure the current tag is the pinned tag if there is a pinned tag and + the last available_tag when there is no pinned tag. + """ + available_tags = ["PINNED_TAG", "LATEST_TAG"] + bundle = circup.Bundle(TEST_BUNDLE_NAME) + with mock.patch.object(bundle, "_available", available_tags), mock.patch.object( + bundle, "pinned_tag", "PINNED_TAG" + ): + assert bundle.current_tag == "PINNED_TAG" + + bundle = circup.Bundle(TEST_BUNDLE_NAME) + with mock.patch.object(bundle, "_available", available_tags): + assert bundle.current_tag == "LATEST_TAG" + + +def test_ensure_bundle(): + """Make sure the correct path is taken when calling ensure_bundle.""" + with mock.patch( + "circup.command_utils.ensure_bundle_tag", return_value=True + ), mock.patch("circup.bundle.Bundle.latest_tag", "LATEST_TAG"), mock.patch( + "circup.command_utils.click" + ) as mock_click: + bundle = circup.Bundle(TEST_BUNDLE_NAME) + ensure_bundle(bundle) + mock_click.echo.assert_called_once_with( + f"Using latest bundle for {TEST_BUNDLE_NAME} (LATEST_TAG)." + ) + + mock_click.echo.reset_mock() + with mock.patch.object(bundle, "pinned_tag", "PINNED_TAG"): + ensure_bundle(bundle) + mock_click.echo.assert_called_once_with( + f"Using pinned bundle for {TEST_BUNDLE_NAME} (PINNED_TAG)." + ) + + +def test_available_tag_order(): + """Make sure available tags are always correctly sorted.""" + tags = ["7", "1", "4", "3", "8", "9", "5", "2"] + bundle = circup.Bundle(TEST_BUNDLE_NAME) + while len(tags) > 0: + tag = tags.pop() + bundle.add_tag(tag) + assert bundle.available_tags == tuple(sorted(bundle.available_tags)) + + tag_count = len(bundle.available_tags) + # Duplicate tags are not added again + bundle.add_tag("7") + assert len(bundle.available_tags) == tag_count + + +def test_pyproject_handling(tmp_path): + """ + Make sure the pyproject file is found as expected and is correctly + parsed for any pinned versions. + """ + bundle_data = {TEST_BUNDLE_NAME: "TESTTAG"} + + # Mock project directory + project_root = tmp_path / "project" + project_root.mkdir() + + # Create a pyproject.toml at the root + pyproject_path = project_root / "pyproject.toml" + pyproject_path.write_text( + "[tool.circup.bundle-versions]\n" + f"'{TEST_BUNDLE_NAME}' = '{bundle_data[TEST_BUNDLE_NAME]}'\n" + ) + + # Create nested subdirectories + subdirectory = project_root / "inner" + subdirectory.mkdir() + + # Make sure we find the pyproject file from both the project directory + # and a subdirectory. + for cwd in [project_root, subdirectory]: + with mock.patch("circup.command_utils.Path.cwd", return_value=cwd): + found_pyproject = find_pyproject() + assert found_pyproject == pyproject_path + + pinned_tags = pyproject_bundle_versions(found_pyproject) + + assert len(pinned_tags) == 1 + assert TEST_BUNDLE_NAME in pinned_tags + assert bundle_data[TEST_BUNDLE_NAME] == pinned_tags[TEST_BUNDLE_NAME] + + with mock.patch("circup.command_utils.Path.cwd", return_value=project_root.parent): + # Make sure it works when there is no pyproject + found_pyproject = find_pyproject() + assert found_pyproject is None