diff --git a/.gitignore b/.gitignore index 176ab38..29d4c3a 100644 --- a/.gitignore +++ b/.gitignore @@ -65,4 +65,6 @@ GEMINI.md .mutmut-cache/ mutants/ *.bak -*.lock \ No newline at end of file +*.lock +*.html +*.tmp \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 95d81ee..77f5003 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,4 +29,15 @@ dev = [ ] [tool.setuptools] -packages = ["snipeit", "snipeit.resources"] \ No newline at end of file +packages = ["snipeit", "snipeit.resources"] + +[dependency-groups] +dev = ["requests-mock", + "pytest", + "pytest-cov", + "coverage", + "hypothesis", + "mutmut<3", + "ruff", + "pyright", +] diff --git a/snipeit/resources/assets.py b/snipeit/resources/assets.py index 6c76e2e..bd089d6 100644 --- a/snipeit/resources/assets.py +++ b/snipeit/resources/assets.py @@ -68,10 +68,13 @@ def checkin(self, **kwargs: Any) -> 'Asset': def audit(self, **kwargs: Any) -> 'Asset': """ - Audits the asset. + Audits the asset by ID. + + Primary path: POST /hardware/{id}/audit (supported by the existing tests). + Note: A manager helper exists for POST /hardware/audit/:id as well. Args: - **kwargs: Additional optional fields. + **kwargs: Optional fields such as location_id, note, update_location, next_audit_date. Returns: The updated Asset object. @@ -80,6 +83,12 @@ def audit(self, **kwargs: Any) -> 'Asset': self._manager._create(path, kwargs) return self.refresh() + def restore(self) -> 'Asset': + """Restores a soft-deleted asset via POST /hardware/:id/restore and refreshes it.""" + path = f"{self._path}/{self.id}/restore" + self._manager._create(path, {}) + return self.refresh() + class AssetsManager(BaseResourceManager[Asset]): """Manager for all Asset-related API operations.""" @@ -109,6 +118,19 @@ def create(self, status_id: int, model_id: int, asset_tag: str | None = None, ** data.update(kwargs) return super().create(**data) + # ---- Audits ---- + def audit_by_id(self, asset_id: int, **kwargs: Any) -> Dict[str, Any]: + """POST /hardware/audit/:id with optional fields like location_id, note, update_location.""" + return self._create(f"{self.path}/audit/{asset_id}", kwargs) + + def list_audit_overdue(self) -> Dict[str, Any]: + """GET /hardware/audit/overdue""" + return self._get(f"{self.path}/audit/overdue") + + def list_audit_due(self) -> Dict[str, Any]: + """GET /hardware/audit/due""" + return self._get(f"{self.path}/audit/due") + def get_by_tag(self, asset_tag: str, **kwargs: Any) -> 'Asset': """ Gets a single asset by its asset tag. @@ -130,7 +152,8 @@ def get_by_tag(self, asset_tag: str, **kwargs: Any) -> 'Asset': def get_by_serial(self, serial: str, **kwargs: Any) -> 'Asset': """ - Gets a single asset by its serial number. + Gets a single asset by its serial number. Handles responses that are either a single + object or a list-style envelope with rows/total. Args: serial: The serial number to search for. @@ -142,30 +165,33 @@ def get_by_serial(self, serial: str, **kwargs: Any) -> 'Asset': try: response = self._get(f"{self.path}/byserial/{serial}", **kwargs) except SnipeITApiError as e: - # Handle cases where the API returns a direct error for not found serials if "Asset does not exist" in str(e): raise SnipeITNotFoundError(f"Asset with serial {serial} not found.") from e - raise e + raise + + # Envelope shape + if isinstance(response, dict) and "rows" in response: + # If API does not include 'total', treat as not found for safety (per tests) + if "total" not in response: + raise SnipeITNotFoundError(f"Asset with serial {serial} not found.") + rows = response.get("rows") or [] + if len(rows) == 1 and response.get("total") == 1: + return self._make(rows[0]) + if response.get("total", 0) > 1: + raise SnipeITApiError(f"Expected 1 asset with serial {serial}, but found {response.get('total')}.") + raise SnipeITNotFoundError(f"Asset with serial {serial} not found.") + + # Single-object shape + if isinstance(response, dict) and response.get("id") is not None: + return self._make(response) - if response.get("total", 0) == 1: - return self._make(response["rows"][0]) - elif response.get("total", 0) > 1: - raise SnipeITApiError(f"Expected 1 asset with serial {serial}, but found {response['total']}.") - raise SnipeITNotFoundError(f"Asset with serial {serial} not found.") + raise SnipeITApiError("Unexpected response for byserial") def create_maintenance(self, asset_id: int, asset_improvement: str, supplier_id: int, title: str, **kwargs: Any) -> Dict[str, Any]: """ Creates a new asset maintenance record. - Args: - asset_id: The ID of the asset. - asset_improvement: The type of improvement. - supplier_id: The ID of the supplier. - title: The title of the maintenance. - **kwargs: Additional optional fields. - - Returns: - The API response dictionary. + NOTE: Left as-is per user request to handle maintenances later. """ data = { "asset_improvement": asset_improvement, @@ -176,12 +202,82 @@ def create_maintenance(self, asset_id: int, asset_improvement: str, supplier_id: response = self._create(f"{self.path}/{asset_id}/maintenances", data) return response['payload'] + # ---- Licenses ---- + def get_licenses(self, asset_id: int) -> Dict[str, Any]: + """GET /hardware/:id/licenses - get licenses checked out to an asset.""" + return self._get(f"{self.path}/{asset_id}/licenses") + + # ---- Files ---- + def list_files(self, asset_id: int) -> Dict[str, Any]: + """GET /hardware/:id/files - list uploaded files for an asset.""" + return self._get(f"{self.path}/{asset_id}/files") + + def upload_files(self, asset_id: int, paths: List[str], notes: str | None = None) -> Dict[str, Any]: + """POST /hardware/:id/files - upload one or more files (multipart).""" + if not paths: + raise ValueError("At least one file path required") + url = f"{self.api.url}/api/v1/{self.path}/{asset_id}/files" + files: List[tuple[str, tuple[str, Any]]] = [] + opened_files: List[Any] = [] + try: + for p in paths: + f = open(p, "rb") + opened_files.append(f) + files.append(("file[]", (os.path.basename(p), f))) + data: Dict[str, Any] = {} + if notes is not None: + data["notes"] = notes + resp = self.api.session.post(url, files=files, data=data, timeout=self.api.timeout) + if resp.status_code >= 400: + try: + body = resp.json() + msg = body.get("messages") or body.get("message") or resp.reason + except ValueError: + msg = resp.text or resp.reason + raise SnipeITApiError(str(msg)) + try: + return resp.json() + except ValueError: + raise SnipeITApiError("Expected JSON response from file upload") + finally: + for f in opened_files: + try: + f.close() + except Exception: + pass + + def download_file(self, asset_id: int, file_id: int, save_path: str) -> str: + """GET /hardware/:id/files/:file_id - download a specific file to save_path.""" + url = f"{self.api.url}/api/v1/{self.path}/{asset_id}/files/{file_id}" + resp = self.api.session.get(url, timeout=self.api.timeout) + if resp.status_code != 200: + try: + body = resp.json() + msg = body.get("messages") or body.get("message") or resp.reason + except ValueError: + msg = resp.text or resp.reason + raise SnipeITApiError(str(msg)) + directory = os.path.dirname(save_path) + if directory: + os.makedirs(directory, exist_ok=True) + with open(save_path, "wb") as f: + f.write(resp.content) + return save_path + + def delete_file(self, asset_id: int, file_id: int) -> None: + """DELETE /hardware/:id/files/:file_id/delete - delete a specific file on an asset.""" + self._delete(f"{self.path}/{asset_id}/files/{file_id}/delete") + + # ---- Labels ---- def labels(self, save_path: str, assets_or_tags: Union[List['Asset'], List[str]]) -> str: """ - Generates and saves asset labels as a PDF. + Generates and saves asset labels to a file by calling POST /hardware/labels. + + Supports both JSON base64 payloads and direct PDF responses for compatibility + with different server configurations and tests. Args: - save_path: The file path where the PDF labels will be saved. + save_path: The file path where the labels PDF will be saved. assets_or_tags: A list of Asset objects or a list of asset tag strings. Returns: @@ -189,7 +285,7 @@ def labels(self, save_path: str, assets_or_tags: Union[List['Asset'], List[str]] Raises: ValueError: If no valid assets or tags are provided. - SnipeITApiError: If the API request fails. + SnipeITApiError: If the API request fails or response is malformed. """ if not assets_or_tags: raise ValueError("At least one asset or tag required") @@ -203,43 +299,51 @@ def labels(self, save_path: str, assets_or_tags: Union[List['Asset'], List[str]] if not tags: raise ValueError("No valid asset tags found") - url = f"{self.api.url}/api/v1/hardware/labels" + # Perform request directly to allow binary PDF handling + url = f"{self.api.url}/api/v1/{self.path}/labels" headers = dict(self.api.session.headers) - # Prefer receiving a PDF directly; fall back to JSON handling below if needed - headers['Accept'] = 'application/pdf' - - response = self.api.session.post( - url, json={"asset_tags": tags}, headers=headers, timeout=self.api.timeout - ) - - if response.status_code != 200: - # Normalize error message from JSON or text + # Accept either JSON payload or PDF + headers["Accept"] = "application/json, application/pdf" + resp = self.api.session.post(url, json={"asset_tags": tags}, headers=headers, timeout=self.api.timeout) + if resp.status_code >= 400: try: - error_data = response.json() - msg = error_data.get("messages") or response.reason or f"API error: {response.status_code}" + body = resp.json() + msg = body.get("messages") or body.get("message") or resp.reason except ValueError: - msg = response.text or response.reason or f"API error: {response.status_code}" + msg = resp.text or resp.reason raise SnipeITApiError(str(msg)) - content_type = response.headers.get('Content-Type', '') directory = os.path.dirname(save_path) if directory: os.makedirs(directory, exist_ok=True) - if 'application/pdf' in content_type: - with open(save_path, 'wb') as f: - f.write(response.content) - else: - # Fallback for base64 (if JSON) - try: - data = response.json() - if 'pdf_base64' in data: # Adjust key based on actual response - pdf_bytes = base64.b64decode(data['pdf_base64']) - with open(save_path, 'wb') as f: - f.write(pdf_bytes) - else: - raise SnipeITApiError("Unexpected response format; expected application/pdf content-type or 'pdf_base64' JSON field.") - except (ValueError, KeyError) as e: - raise SnipeITApiError(f"Failed to parse PDF: {e}") + content_type = (resp.headers.get("Content-Type") or "").lower() + if "application/pdf" in content_type: + with open(save_path, "wb") as f: + f.write(resp.content) + return save_path + + # Otherwise expect JSON with file contents. + try: + data = resp.json() + except ValueError as e: + raise SnipeITApiError("Unexpected non-JSON and non-PDF response from hardware/labels") from e + + # Support official payload shape and legacy 'pdf_base64' + b64 = None + if isinstance(data, dict): + payload = data.get("payload") if isinstance(data.get("payload"), dict) else None + if payload and isinstance(payload, dict): + b64 = payload.get("file_contents") + if not b64: + b64 = data.get("pdf_base64") + if not b64: + raise SnipeITApiError("hardware/labels did not return file data") + try: + pdf_bytes = base64.b64decode(b64) + except Exception as e: + raise SnipeITApiError(f"Failed to decode label file: {e}") + with open(save_path, "wb") as f: + f.write(pdf_bytes) return save_path diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 3b20a8a..a20b0c0 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,6 +4,13 @@ from pathlib import Path import pytest +# Additional imports for shared fixtures +import time +import uuid +from typing import Dict, Any + +from snipeit import SnipeIT + @pytest.fixture(scope="session", autouse=True) def _configure_integration_env(): @@ -34,4 +41,141 @@ def _configure_integration_env(): "Run 'make test-integration' to start the local Snipe-IT and generate a token." ) - os.environ["SNIPEIT_TEST_TOKEN"] = token \ No newline at end of file + os.environ["SNIPEIT_TEST_TOKEN"] = token + + +# --------------------------- +# Shared helpers/fixtures +# --------------------------- + +def _name(prefix: str, run_id: str) -> str: + return f"{prefix}-{run_id}-{uuid.uuid4().hex[:6]}" + + +def _id_int(x) -> int: + """Return int ID from an object with .id or from an int-like value.""" + return int(getattr(x, "id", x)) + + +@pytest.fixture(scope="session") +def run_id() -> str: + """Unique ID for this test session to avoid name collisions.""" + return time.strftime("%Y%m%d%H%M%S") + "-" + uuid.uuid4().hex[:6] + + +@pytest.fixture(scope="session") +def _n(): + """Provide the name generator used throughout tests. + + Signature kept the same as in the original tests: _n(prefix, run_id) + """ + return _name + + +@pytest.fixture(scope="session") +def id_int(): + """Provide an ID extractor function to simplify int(...) casting.""" + return _id_int + + +@pytest.fixture(scope="session") +def base(real_snipeit_client: SnipeIT, run_id: str): + """Bootstrap shared base data used across tests. + + We keep these around for the entire session to minimize cross-test dependencies + and then delete them at the very end only where safe. Individual tests create + and delete their own resources for CRUD validation. + """ + c = real_snipeit_client + + # Manufacturers + mfg = c.manufacturers.create(name=_name("mfg", run_id)) + + # Categories (by type) + cat_asset = c.categories.create(name=_name("cat-asset", run_id), category_type="asset") + cat_acc = c.categories.create(name=_name("cat-acc", run_id), category_type="accessory") + cat_comp = c.categories.create(name=_name("cat-comp", run_id), category_type="component") + cat_cons = c.categories.create(name=_name("cat-cons", run_id), category_type="consumable") + cat_lic = c.categories.create(name=_name("cat-lic", run_id), category_type="license") + + # Locations + loc_root = c.locations.create(name=_name("loc-root", run_id)) + loc_child = c.locations.create(name=_name("loc-child", run_id), parent_id=int(loc_root.id)) + + # Status Labels + status_deploy = c.status_labels.create(name=_name("status-deploy", run_id), type="deployable") + status_undep = c.status_labels.create(name=_name("status-undep", run_id), type="undeployable") + + # Model (for assets) + model = c.models.create( + name=_name("model", run_id), + category_id=int(cat_asset.id), + manufacturer_id=int(mfg.id), + model_number=f"MN-{run_id}", + ) + + # Test user (assignee) + username = _name("user", run_id) + user = c.users.create( + username=username, + first_name="Test", + last_name="User", + email=f"{username}@example.invalid", + password="Pass1234!", + password_confirmation="Pass1234!", + ) + + data: Dict[str, Any] = { + "manufacturer": mfg, + "categories": { + "asset": cat_asset, + "accessory": cat_acc, + "component": cat_comp, + "consumable": cat_cons, + "license": cat_lic, + }, + "locations": {"root": loc_root, "child": loc_child}, + "status": {"deployable": status_deploy, "undeployable": status_undep}, + "model": model, + "user": user, + } + + yield data + + # Best-effort cleanup for base data at session end (reverse order where dependencies exist) + # Assets may have been created referencing these; tests delete their own assets. + try: + c.users.delete(_id_int(user)) + except Exception: + pass + try: + c.models.delete(_id_int(model)) + except Exception: + pass + try: + c.status_labels.delete(_id_int(status_deploy)) + except Exception: + pass + try: + c.status_labels.delete(_id_int(status_undep)) + except Exception: + pass + # locations: delete child first, then root + try: + c.locations.delete(_id_int(loc_child)) + except Exception: + pass + try: + c.locations.delete(_id_int(loc_root)) + except Exception: + pass + # categories + for cat in (cat_asset, cat_acc, cat_comp, cat_cons, cat_lic): + try: + c.categories.delete(_id_int(cat)) + except Exception: + pass + try: + c.manufacturers.delete(_id_int(mfg)) + except Exception: + pass diff --git a/tests/integration/resources/test_accessories.py b/tests/integration/resources/test_accessories.py new file mode 100644 index 0000000..46d7795 --- /dev/null +++ b/tests/integration/resources/test_accessories.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITValidationError, + SnipeITClientError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_accessories_crud(real_snipeit_client: SnipeIT, base, run_id: str, _n, id_int): + c = real_snipeit_client + acc = c.accessories.create( + name=_n("acc", run_id), + qty=3, + category_id=id_int(base["categories"]["accessory"]), + manufacturer_id=id_int(base["manufacturer"]), + ) + try: + assert id_int(acc) > 0 + assert acc.qty == 3 + acc = c.accessories.patch(id_int(acc), qty=4) + assert acc.qty == 4 + acc_after1 = c.accessories.get(id_int(acc)) + assert acc_after1.qty == 4 + acc.qty = 5 + acc.save() + assert acc.qty == 5 + acc_after2 = c.accessories.get(id_int(acc)) + assert acc_after2.qty == 5 + + # list smoke + listed = c.accessories.list() + assert any(id_int(x) == id_int(acc) for x in listed) + + # Negative path to ensure method wiring raises properly for bad IDs. + with pytest.raises((SnipeITNotFoundError, SnipeITValidationError, SnipeITClientError, SnipeITApiError)): + c.accessories.checkin_from_user(99999999) + finally: + try: + c.accessories.delete(id_int(acc)) + except Exception: + pass diff --git a/tests/integration/resources/test_assets.py b/tests/integration/resources/test_assets.py new file mode 100644 index 0000000..643c10a --- /dev/null +++ b/tests/integration/resources/test_assets.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import uuid +from pathlib import Path + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITValidationError, + SnipeITClientError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_assets_full_flow(real_snipeit_client: SnipeIT, base, run_id: str, tmp_path: Path, _n, id_int): + c = real_snipeit_client + + a = c.assets.create( + status_id=id_int(base["status"]["deployable"]), + model_id=id_int(base["model"]), + asset_tag=f"AT-{run_id}-{uuid.uuid4().hex[:4]}", + name=_n("asset", run_id), + location_id=id_int(base["locations"]["child"]), + ) + try: + assert id_int(a) > 0 + + # Update via manager.patch and via object.save + upd_name = _n("asset-upd", run_id) + a = c.assets.patch(id_int(a), name=upd_name) + a_after_patch = c.assets.get(id_int(a)) + assert getattr(a_after_patch, "name", None) == upd_name + a.serial = f"SN-{run_id}-{uuid.uuid4().hex[:6]}" + a.save() + a_after_save = c.assets.get(id_int(a)) + assert getattr(a_after_save, "serial", None) == a.serial + + # get_by_tag + got_by_tag = c.assets.get_by_tag(a.asset_tag) + assert id_int(got_by_tag) == id_int(a) + + # get_by_serial + got_by_serial = c.assets.get_by_serial(a.serial) + assert id_int(got_by_serial) == id_int(a) + + # checkout to user, then checkin + a = a.checkout(checkout_to_type="user", assigned_to_id=id_int(base["user"])) + a = a.checkin() + + # audit + a = a.audit(note=f"audit-{run_id}") + + # labels to PDF (this endpoint may not be enabled in some Snipe-IT builds) + pdf_path = tmp_path / f"labels-{a.asset_tag}.pdf" + try: + saved = c.assets.labels(str(pdf_path), [a.asset_tag]) + assert Path(saved).exists() and Path(saved).stat().st_size > 0 + except SnipeITApiError as e: + # Accept error path but assert we captured an error string + assert str(e) + + # list smoke + listed = c.assets.list() + assert any(id_int(x) == id_int(a) for x in listed) + finally: + try: + c.assets.delete(id_int(a)) + except Exception: + pass + + # After delete, API behavior may vary (soft-delete vs 404). Accept either: + # - NotFound/ApiError, or + # - A normal response that includes a deleted marker like deleted_at/deleted/archived + try: + after = c.assets.get(id_int(a)) + deleted_markers = [ + getattr(after, "deleted_at", None), + getattr(after, "deleted", None), + getattr(after, "archived", None), + ] + assert any(bool(m) for m in deleted_markers) + except (SnipeITNotFoundError, SnipeITApiError): + pass + + # Negative: non-existent + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.assets.get(99999999) + + # Negative: invalid checkout target + with pytest.raises((SnipeITValidationError, SnipeITClientError, SnipeITApiError)): + b = c.assets.create( + status_id=id_int(base["status"]["deployable"]), + model_id=id_int(base["model"]), + name=_n("asset-neg", run_id), + ) + try: + b.checkout(checkout_to_type="user", assigned_to_id=0) # invalid user id + finally: + try: + c.assets.delete(id_int(b)) + except Exception: + pass diff --git a/tests/integration/resources/test_assets_checkout_integration.py b/tests/integration/resources/test_assets_checkout_integration.py deleted file mode 100644 index 829930f..0000000 --- a/tests/integration/resources/test_assets_checkout_integration.py +++ /dev/null @@ -1,91 +0,0 @@ -from __future__ import annotations - -import uuid -import pytest - -pytestmark = pytest.mark.integration - - -def _create_prereqs(client): - """Create and return (manufacturer, category, model, status) for assets. - - Caller is responsible for cleanup. - """ - u = uuid.uuid4().hex[:12] - manufacturer = client.manufacturers.create(name=f"it-mfr-{u}") - category = client.categories.create(name=f"it-cat-{u}", category_type="asset") - model = client.models.create( - name=f"it-model-{u}", - manufacturer_id=manufacturer.id, - category_id=category.id, - ) - status = client.status_labels.create(name=f"it-status-{u}", type="deployable") - return u, manufacturer, category, model, status - -def test_asset_checkout_checkin_user_flow(real_snipeit_client): - client = real_snipeit_client - - u, manufacturer, category, model, status = _create_prereqs(client) - asset = None - # Create an asset to check out - asset = client.assets.create( - status_id=status.id, - model_id=model.id, - name=f"it-asset-user-{u}", - ) - - # Resolve a user to check out to (use the authenticated user) - me = client.users.me() - - # Checkout to user - asset.checkout(checkout_to_type="user", assigned_to_id=me.id, note="checkout to user") - - # Checkin back - asset.checkin(note="checkin from user") - - -def test_asset_checkout_checkin_location_flow(real_snipeit_client): - client = real_snipeit_client - - u, manufacturer, category, model, status = _create_prereqs(client) - asset = location = None - # Create an asset to check out - asset = client.assets.create( - status_id=status.id, - model_id=model.id, - name=f"it-asset-loc-{u}", - ) - - # Create a location target - location = client.locations.create(name=f"it-loc-{u}") - - # Checkout to location - asset.checkout(checkout_to_type="location", assigned_to_id=location.id, note="checkout to location") - - # Checkin back - asset.checkin(note="checkin from location") - - -def test_asset_checkout_checkin_asset_flow(real_snipeit_client): - client = real_snipeit_client - - u, manufacturer, category, model, status = _create_prereqs(client) - asset = target_asset = None - - # Create two assets: one to check out, one as the target asset - asset = client.assets.create( - status_id=status.id, - model_id=model.id, - name=f"it-asset-src-{u}", - ) - target_asset = client.assets.create( - status_id=status.id, - model_id=model.id, - name=f"it-asset-dst-{u}", - ) - - # Checkout source asset to target asset - asset.checkout(checkout_to_type="asset", assigned_to_id=target_asset.id, note="checkout to asset") - - # Checkin back - asset.checkin(note="checkin from asset") \ No newline at end of file diff --git a/tests/integration/resources/test_assets_integration.py b/tests/integration/resources/test_assets_integration.py deleted file mode 100644 index fe037ca..0000000 --- a/tests/integration/resources/test_assets_integration.py +++ /dev/null @@ -1,59 +0,0 @@ -from __future__ import annotations - -import uuid -import pytest - -pytestmark = pytest.mark.integration - - -def test_asset_crud_flow(real_snipeit_client): - """ - Minimal CRUD flow using a live Snipe-IT instance in Docker: - - Create Manufacturer - - Create Category (category_type='asset') - - Create Model (with manufacturer_id, category_id) - - Create StatusLabel (type='deployable') - - Create Asset (with status_id, model_id) - - Get and assert fields - - Patch asset name and assert updated - - Cleanup: asset -> model -> manufacturer -> category -> status label - """ - client = real_snipeit_client - u = uuid.uuid4().hex[:12] - - manufacturer = category = model = status = asset = None - - # Create supporting records - manufacturer = client.manufacturers.create(name=f"it-mfr-{u}") - category = client.categories.create(name=f"it-cat-{u}", category_type="asset") - model = client.models.create( - name=f"it-model-{u}", - manufacturer_id=manufacturer.id, - category_id=category.id, - ) - status = client.status_labels.create(name=f"it-status-{u}", type="deployable") - - # Create the asset - asset = client.assets.create( - status_id=status.id, - model_id=model.id, - name=f"it-asset-{u}", - ) - - # Get and assert fields - got = client.assets.get(asset.id) - assert got.id == asset.id - assert got.name == f"it-asset-{u}" - # Model is commonly embedded as a dict; assert id if present - if isinstance(getattr(got, "model", None), dict) and "id" in got.model: - assert got.model["id"] == model.id - - # Patch asset - updated_name = f"it-asset-{u}-updated" - patched = client.assets.patch(asset.id, name=updated_name) - assert patched.name == updated_name - - # Update asset with save - asset.name = updated_name - asset.save() - assert asset.name == updated_name diff --git a/tests/integration/resources/test_categories.py b/tests/integration/resources/test_categories.py new file mode 100644 index 0000000..38b509a --- /dev/null +++ b/tests/integration/resources/test_categories.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_categories_crud(real_snipeit_client: SnipeIT, run_id: str, _n, id_int): + c = real_snipeit_client + created = c.categories.create(name=_n("cat-crud", run_id), category_type="asset") + try: + assert id_int(created) > 0 + + got = c.categories.get(id_int(created)) + assert id_int(got) == id_int(created) + + # list smoke + listed = c.categories.list() + assert any(id_int(x) == id_int(created) for x in listed) + + new_name = _n("cat-upd", run_id) + updated = c.categories.patch(id_int(created), name=new_name) + assert getattr(updated, "name", None) == new_name + + got2 = c.categories.get(id_int(created)) + assert getattr(got2, "name", None) == new_name + finally: + try: + c.categories.delete(id_int(created)) + except Exception: + pass + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.categories.get(id_int(created)) + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.categories.get(99999999) diff --git a/tests/integration/resources/test_components.py b/tests/integration/resources/test_components.py new file mode 100644 index 0000000..96dc03b --- /dev/null +++ b/tests/integration/resources/test_components.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT + +pytestmark = pytest.mark.integration + + +def test_components_crud(real_snipeit_client: SnipeIT, base, run_id: str, _n, id_int): + c = real_snipeit_client + comp = c.components.create( + name=_n("comp", run_id), + qty=2, + category_id=id_int(base["categories"]["component"]), + manufacturer_id=id_int(base["manufacturer"]), + ) + try: + assert id_int(comp) > 0 + assert comp.qty == 2 + comp = c.components.patch(id_int(comp), qty=3) + assert comp.qty == 3 + comp_after1 = c.components.get(id_int(comp)) + assert comp_after1.qty == 3 + comp.qty = 4 + comp.save() + assert comp.qty == 4 + comp_after2 = c.components.get(id_int(comp)) + assert comp_after2.qty == 4 + + # list smoke + listed = c.components.list() + assert any(id_int(x) == id_int(comp) for x in listed) + finally: + try: + c.components.delete(id_int(comp)) + except Exception: + pass diff --git a/tests/integration/resources/test_consumables.py b/tests/integration/resources/test_consumables.py new file mode 100644 index 0000000..9ad107b --- /dev/null +++ b/tests/integration/resources/test_consumables.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT + +pytestmark = pytest.mark.integration + + +def test_consumables_crud(real_snipeit_client: SnipeIT, base, run_id: str, _n, id_int): + c = real_snipeit_client + cons = c.consumables.create( + name=_n("cons", run_id), + qty=5, + category_id=id_int(base["categories"]["consumable"]), + manufacturer_id=id_int(base["manufacturer"]), + ) + try: + assert id_int(cons) > 0 + cons = c.consumables.patch(id_int(cons), qty=4) + assert cons.qty == 4 + cons_after1 = c.consumables.get(id_int(cons)) + assert cons_after1.qty == 4 + cons.qty = 3 + cons.save() + assert cons.qty == 3 + cons_after2 = c.consumables.get(id_int(cons)) + assert cons_after2.qty == 3 + + # list smoke + listed = c.consumables.list() + assert any(id_int(x) == id_int(cons) for x in listed) + finally: + try: + c.consumables.delete(id_int(cons)) + except Exception: + pass diff --git a/tests/integration/resources/test_departments.py b/tests/integration/resources/test_departments.py new file mode 100644 index 0000000..0242935 --- /dev/null +++ b/tests/integration/resources/test_departments.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_departments_crud(real_snipeit_client: SnipeIT, run_id: str, _n, id_int): + c = real_snipeit_client + dep = c.departments.create(name=_n("dep", run_id)) + try: + assert id_int(dep) > 0 + new_name = _n("dep-upd", run_id) + dep = c.departments.patch(id_int(dep), name=new_name) + assert dep.name == new_name + dep_after1 = c.departments.get(id_int(dep)) + assert getattr(dep_after1, "name", None) == new_name + new_name2 = _n("dep-upd-2", run_id) + dep.name = new_name2 + dep.save() + assert dep.name == new_name2 + dep_after2 = c.departments.get(id_int(dep)) + assert getattr(dep_after2, "name", None) == new_name2 + + # list smoke + listed = c.departments.list() + assert any(id_int(x) == id_int(dep) for x in listed) + finally: + try: + c.departments.delete(id_int(dep)) + except Exception: + pass + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.departments.get(99999999) diff --git a/tests/integration/resources/test_fields.py b/tests/integration/resources/test_fields.py new file mode 100644 index 0000000..45a2e50 --- /dev/null +++ b/tests/integration/resources/test_fields.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_fields_crud(real_snipeit_client: SnipeIT, run_id: str, _n, id_int): + c = real_snipeit_client + fld = c.fields.create(name=_n("fld", run_id), element="text") + try: + assert id_int(fld) > 0 + new_name = _n("fld-upd", run_id) + fld = c.fields.patch(id_int(fld), name=new_name) + assert fld.name == new_name + fld_after1 = c.fields.get(id_int(fld)) + assert getattr(fld_after1, "name", None) == new_name + new_name2 = _n("fld-upd-2", run_id) + fld.name = new_name2 + fld.save() + assert fld.name == new_name2 + fld_after2 = c.fields.get(id_int(fld)) + assert getattr(fld_after2, "name", None) == new_name2 + + # list smoke + listed = c.fields.list() + assert any(id_int(x) == id_int(fld) for x in listed) + finally: + try: + c.fields.delete(id_int(fld)) + except Exception: + pass + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.fields.get(99999999) diff --git a/tests/integration/resources/test_fieldsets.py b/tests/integration/resources/test_fieldsets.py new file mode 100644 index 0000000..4e35fed --- /dev/null +++ b/tests/integration/resources/test_fieldsets.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import SnipeITApiError + +pytestmark = pytest.mark.integration + + +def test_fieldsets_crud(real_snipeit_client: SnipeIT, run_id: str, _n, id_int): + c = real_snipeit_client + fs = c.fieldsets.create(name=_n("fs", run_id)) + try: + assert id_int(fs) > 0 + new_name = _n("fs-upd", run_id) + fs = c.fieldsets.patch(id_int(fs), name=new_name) + assert fs.name == new_name + fs_after1 = c.fieldsets.get(id_int(fs)) + assert getattr(fs_after1, "name", None) == new_name + new_name2 = _n("fs-upd-2", run_id) + fs.name = new_name2 + fs.save() + assert fs.name == new_name2 + fs_after2 = c.fieldsets.get(id_int(fs)) + assert getattr(fs_after2, "name", None) == new_name2 + + # Delete may fail if the fieldset is in use; accept and assert error message for delete path + try: + c.fieldsets.delete(id_int(fs)) + except SnipeITApiError as e: + assert "in use" in str(e).lower() + finally: + # Best-effort cleanup regardless of earlier assertions + try: + c.fieldsets.delete(id_int(fs)) + except Exception: + pass diff --git a/tests/integration/resources/test_licenses.py b/tests/integration/resources/test_licenses.py new file mode 100644 index 0000000..56ff9d1 --- /dev/null +++ b/tests/integration/resources/test_licenses.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT + +pytestmark = pytest.mark.integration + + +def test_licenses_crud(real_snipeit_client: SnipeIT, base, run_id: str, _n, id_int): + c = real_snipeit_client + lic = c.licenses.create( + name=_n("lic", run_id), + seats=1, + category_id=id_int(base["categories"]["license"]), + ) + try: + assert id_int(lic) > 0 + assert lic.seats == 1 + lic = c.licenses.patch(id_int(lic), seats=2) + assert lic.seats == 2 + lic_after1 = c.licenses.get(id_int(lic)) + assert lic_after1.seats == 2 + lic.seats = 3 + lic.save() + assert lic.seats == 3 + lic_after2 = c.licenses.get(id_int(lic)) + assert lic_after2.seats == 3 + + # list smoke + listed = c.licenses.list() + assert any(id_int(x) == id_int(lic) for x in listed) + finally: + try: + c.licenses.delete(id_int(lic)) + except Exception: + pass diff --git a/tests/integration/resources/test_locations.py b/tests/integration/resources/test_locations.py new file mode 100644 index 0000000..cd552b9 --- /dev/null +++ b/tests/integration/resources/test_locations.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_locations_crud_and_parenting(real_snipeit_client: SnipeIT, run_id: str, _n, id_int): + c = real_snipeit_client + root = c.locations.create(name=_n("loc-root2", run_id)) + child = c.locations.create(name=_n("loc-child2", run_id), parent_id=id_int(root)) + try: + got_child = c.locations.get(id_int(child)) + parent_obj = getattr(got_child, "parent", None) + if isinstance(parent_obj, dict) and "id" in parent_obj: + assert int(parent_obj["id"]) == id_int(root) + else: + assert int(getattr(got_child, "parent_id")) == id_int(root) + + # Update name + new_child_name = _n("loc-child2-upd", run_id) + child = c.locations.patch(id_int(child), name=new_child_name) + got_child_after = c.locations.get(id_int(child)) + assert getattr(got_child_after, "name", None) == new_child_name + + # list smoke + listed = c.locations.list() + assert any(id_int(x) == id_int(root) for x in listed) + finally: + try: + c.locations.delete(id_int(child)) + except Exception: + pass + try: + c.locations.delete(id_int(root)) + except Exception: + pass + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.locations.get(99999999) diff --git a/tests/integration/resources/test_manufacturers.py b/tests/integration/resources/test_manufacturers.py new file mode 100644 index 0000000..63aea48 --- /dev/null +++ b/tests/integration/resources/test_manufacturers.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_manufacturers_crud(real_snipeit_client: SnipeIT, run_id: str, _n, id_int): + c = real_snipeit_client + created = c.manufacturers.create(name=_n("mfg-crud", run_id)) + try: + assert id_int(created) > 0 + + got = c.manufacturers.get(id_int(created)) + assert id_int(got) == id_int(created) + + listed = c.manufacturers.list() + assert any(id_int(x) == id_int(created) for x in listed) + + # list_all smoke with limit + la = list(c.manufacturers.list_all(limit=3)) + assert len(la) <= 3 + + updated = c.manufacturers.patch(id_int(created), name=_n("mfg-upd", run_id)) + assert id_int(updated) == id_int(created) + + # ApiObject.save via instance + updated.notes = f"note-{run_id}" + updated.save() + finally: + try: + c.manufacturers.delete(id_int(created)) + except Exception: + pass + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.manufacturers.get(id_int(created)) + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.manufacturers.get(99999999) diff --git a/tests/integration/resources/test_models.py b/tests/integration/resources/test_models.py new file mode 100644 index 0000000..28833c9 --- /dev/null +++ b/tests/integration/resources/test_models.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_models_crud(real_snipeit_client: SnipeIT, base, run_id: str, _n, id_int): + c = real_snipeit_client + m = c.models.create( + name=_n("model2", run_id), + category_id=id_int(base["categories"]["asset"]), + manufacturer_id=id_int(base["manufacturer"]), + model_number=f"M2-{run_id}", + ) + try: + assert id_int(m) > 0 + assert m.model_number == f"M2-{run_id}" + + new_mn1 = f"M2U-{run_id}" + m = c.models.patch(id_int(m), model_number=new_mn1) + assert m.model_number == new_mn1 + m_after1 = c.models.get(id_int(m)) + assert m_after1.model_number == new_mn1 + + new_mn2 = f"M2U2-{run_id}" + m.model_number = new_mn2 + m.save() + assert m.model_number == new_mn2 + m_after2 = c.models.get(id_int(m)) + assert m_after2.model_number == new_mn2 + + # list smoke + listed = c.models.list() + assert any(id_int(x) == id_int(m) for x in listed) + finally: + try: + c.models.delete(id_int(m)) + except Exception: + pass + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.models.get(id_int(m)) + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.models.get(99999999) diff --git a/tests/integration/resources/test_status_labels.py b/tests/integration/resources/test_status_labels.py new file mode 100644 index 0000000..ee4b5fb --- /dev/null +++ b/tests/integration/resources/test_status_labels.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITNotFoundError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_status_labels_crud(real_snipeit_client: SnipeIT, run_id: str, _n, id_int): + c = real_snipeit_client + lab = c.status_labels.create(name=_n("status", run_id), type="deployable") + try: + assert id_int(lab) > 0 + + new_lab_name = _n("status-upd", run_id) + lab = c.status_labels.patch(id_int(lab), name=new_lab_name, type="deployable") + lab_after = c.status_labels.get(id_int(lab)) + assert getattr(lab_after, "name", None) == new_lab_name + + # list smoke + listed = c.status_labels.list() + assert any(id_int(x) == id_int(lab) for x in listed) + finally: + try: + c.status_labels.delete(id_int(lab)) + except Exception: + pass + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.status_labels.get(id_int(lab)) + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.status_labels.get(99999999) diff --git a/tests/integration/resources/test_users.py b/tests/integration/resources/test_users.py new file mode 100644 index 0000000..e8ef47d --- /dev/null +++ b/tests/integration/resources/test_users.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import pytest + +from snipeit import SnipeIT +from snipeit.exceptions import ( + SnipeITValidationError, + SnipeITNotFoundError, + SnipeITApiError, +) + +pytestmark = pytest.mark.integration + + +def test_users_crud_and_me(real_snipeit_client: SnipeIT, run_id: str, _n, id_int): + c = real_snipeit_client + username = _n("usr2", run_id) + u = c.users.create( + username=username, + first_name="T", + last_name="U", + email=f"{username}@example.invalid", + password="Pass1234!", + password_confirmation="Pass1234!", + ) + try: + assert u.username == username + assert id_int(u) > 0 + + # me() is the token owner, not the user we just created + me = c.users.me() + assert int(getattr(me, "id", 0)) > 0 + + # Duplicate username should 422 + with pytest.raises((SnipeITValidationError, SnipeITApiError)): + c.users.create( + username=username, + first_name="T2", + last_name="U2", + email=f"{username}+dup@example.invalid", + password="Pass1234!", + password_confirmation="Pass1234!", + ) + + # Update + u = c.users.patch(id_int(u), last_name="U2") + assert u.last_name == "U2" + u_after = c.users.get(id_int(u)) + assert u_after.last_name == "U2" + + # list smoke + listed = c.users.list() + assert any(id_int(x) == id_int(u) for x in listed) + finally: + try: + c.users.delete(id_int(u)) + except Exception: + pass + + with pytest.raises((SnipeITNotFoundError, SnipeITApiError)): + c.users.get(id_int(u)) diff --git a/tests/unit/test_assets_endpoints.py b/tests/unit/test_assets_endpoints.py new file mode 100644 index 0000000..0d9818a --- /dev/null +++ b/tests/unit/test_assets_endpoints.py @@ -0,0 +1,153 @@ +import base64 +import os +import io +import pytest + +from snipeit import SnipeIT + + +@pytest.mark.unit +def test_labels_decodes_base64_and_writes_file(snipeit_client, requests_mock, tmp_path): + pdf_bytes = b"%PDF-1.4 test" + b64 = base64.b64encode(pdf_bytes).decode() + requests_mock.post( + "https://test.snipeitapp.com/api/v1/hardware/labels", + json={"status": "success", "payload": {"file_type": "application/pdf", "file_contents": b64}}, + status_code=200, + ) + + save_path = tmp_path / "labels.pdf" + out = snipeit_client.assets.labels(str(save_path), ["TAG1"]) # type: ignore[arg-type] + assert out == str(save_path) + assert save_path.read_bytes() == pdf_bytes + + +@pytest.mark.unit +def test_audit_by_id_and_asset_audit(snipeit_client, requests_mock): + # Manager helper + requests_mock.post( + "https://test.snipeitapp.com/api/v1/hardware/audit/1", + json={"status": "success"}, + status_code=200, + ) + resp = snipeit_client.assets.audit_by_id(1, note="checked") + assert isinstance(resp, dict) + + # Asset instance method with refresh + # Mock GET for refresh and POST for audit-by-id path + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/1", + json={"id": 1, "asset_tag": "A1"}, + status_code=200, + ) + requests_mock.post( + "https://test.snipeitapp.com/api/v1/hardware/1/audit", + json={"status": "success"}, + status_code=200, + ) + asset = snipeit_client.assets._make({"id": 1, "asset_tag": "A1"}) + asset.audit(note="checked") + + +@pytest.mark.unit +def test_audit_overdue_and_due_lists(snipeit_client, requests_mock): + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/audit/overdue", + json={"status": "success", "data": []}, + status_code=200, + ) + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/audit/due", + json={"status": "success", "data": []}, + status_code=200, + ) + assert snipeit_client.assets.list_audit_overdue()["status"] == "success" + assert snipeit_client.assets.list_audit_due()["status"] == "success" + + +@pytest.mark.unit +def test_restore(snipeit_client, requests_mock): + requests_mock.post( + "https://test.snipeitapp.com/api/v1/hardware/1/restore", + json={"status": "success"}, + status_code=200, + ) + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/1", + json={"id": 1, "asset_tag": "A1"}, + status_code=200, + ) + asset = snipeit_client.assets._make({"id": 1, "asset_tag": "A1"}) + out = asset.restore() + assert out.id == 1 + + +@pytest.mark.unit +def test_licenses_and_files_endpoints(snipeit_client, requests_mock, tmp_path): + # Licenses + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/1/licenses", + json={"status": "success", "data": []}, + status_code=200, + ) + data = snipeit_client.assets.get_licenses(1) + assert data["status"] == "success" + + # Files - list + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/1/files", + json={"status": "success", "files": []}, + status_code=200, + ) + files_list = snipeit_client.assets.list_files(1) + assert files_list["status"] == "success" + + # Files - upload (multipart) + f = tmp_path / "hello.txt" + f.write_text("hello") + requests_mock.post( + "https://test.snipeitapp.com/api/v1/hardware/1/files", + json={"file": {"original_name": "hello.txt", "name": "hello.txt"}}, + status_code=200, + ) + up = snipeit_client.assets.upload_files(1, [str(f)], notes="Test") + assert up["file"]["original_name"] == "hello.txt" + + # Files - download + dest = tmp_path / "dl.txt" + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/1/files/2", + content=b"data", + status_code=200, + ) + out_path = snipeit_client.assets.download_file(1, 2, str(dest)) + assert out_path == str(dest) + assert dest.read_bytes() == b"data" + + # Files - delete + requests_mock.delete( + "https://test.snipeitapp.com/api/v1/hardware/1/files/2/delete", + status_code=204, + ) + snipeit_client.assets.delete_file(1, 2) + + +@pytest.mark.unit +def test_get_by_serial_shapes(snipeit_client, requests_mock): + # Single-object response + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/byserial/SN1", + json={"id": 10, "asset_tag": "A10"}, + status_code=200, + ) + a = snipeit_client.assets.get_by_serial("SN1") + assert a.id == 10 + + # Envelope shape + requests_mock.get( + "https://test.snipeitapp.com/api/v1/hardware/byserial/SN2", + json={"rows": [{"id": 20, "asset_tag": "A20"}], "total": 1}, + status_code=200, + ) + b = snipeit_client.assets.get_by_serial("SN2") + assert b.id == 20