Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ GEMINI.md
.mutmut-cache/
mutants/
*.bak
*.lock
*.lock
*.html
*.tmp
13 changes: 12 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,15 @@ dev = [
]

[tool.setuptools]
packages = ["snipeit", "snipeit.resources"]
packages = ["snipeit", "snipeit.resources"]

[dependency-groups]
dev = ["requests-mock",
"pytest",
"pytest-cov",
"coverage",
"hypothesis",
"mutmut<3",
"ruff",
"pyright",
]
206 changes: 155 additions & 51 deletions snipeit/resources/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@ def checkin(self, **kwargs: Any) -> 'Asset':

def audit(self, **kwargs: Any) -> 'Asset':
"""
Audits the asset.
Audits the asset by ID.

Primary path: POST /hardware/{id}/audit (supported by the existing tests).
Note: A manager helper exists for POST /hardware/audit/:id as well.

Args:
**kwargs: Additional optional fields.
**kwargs: Optional fields such as location_id, note, update_location, next_audit_date.

Returns:
The updated Asset object.
Expand All @@ -80,6 +83,12 @@ def audit(self, **kwargs: Any) -> 'Asset':
self._manager._create(path, kwargs)
return self.refresh()

def restore(self) -> 'Asset':
"""Restores a soft-deleted asset via POST /hardware/:id/restore and refreshes it."""
path = f"{self._path}/{self.id}/restore"
self._manager._create(path, {})
return self.refresh()


class AssetsManager(BaseResourceManager[Asset]):
"""Manager for all Asset-related API operations."""
Expand Down Expand Up @@ -109,6 +118,19 @@ def create(self, status_id: int, model_id: int, asset_tag: str | None = None, **
data.update(kwargs)
return super().create(**data)

# ---- Audits ----
def audit_by_id(self, asset_id: int, **kwargs: Any) -> Dict[str, Any]:
"""POST /hardware/audit/:id with optional fields like location_id, note, update_location."""
return self._create(f"{self.path}/audit/{asset_id}", kwargs)

def list_audit_overdue(self) -> Dict[str, Any]:
"""GET /hardware/audit/overdue"""
return self._get(f"{self.path}/audit/overdue")

def list_audit_due(self) -> Dict[str, Any]:
"""GET /hardware/audit/due"""
return self._get(f"{self.path}/audit/due")

def get_by_tag(self, asset_tag: str, **kwargs: Any) -> 'Asset':
"""
Gets a single asset by its asset tag.
Expand All @@ -130,7 +152,8 @@ def get_by_tag(self, asset_tag: str, **kwargs: Any) -> 'Asset':

def get_by_serial(self, serial: str, **kwargs: Any) -> 'Asset':
"""
Gets a single asset by its serial number.
Gets a single asset by its serial number. Handles responses that are either a single
object or a list-style envelope with rows/total.

Args:
serial: The serial number to search for.
Expand All @@ -142,30 +165,33 @@ def get_by_serial(self, serial: str, **kwargs: Any) -> 'Asset':
try:
response = self._get(f"{self.path}/byserial/{serial}", **kwargs)
except SnipeITApiError as e:
# Handle cases where the API returns a direct error for not found serials
if "Asset does not exist" in str(e):
raise SnipeITNotFoundError(f"Asset with serial {serial} not found.") from e
raise e
raise

# Envelope shape
if isinstance(response, dict) and "rows" in response:
# If API does not include 'total', treat as not found for safety (per tests)
if "total" not in response:
raise SnipeITNotFoundError(f"Asset with serial {serial} not found.")
rows = response.get("rows") or []
if len(rows) == 1 and response.get("total") == 1:
return self._make(rows[0])
if response.get("total", 0) > 1:
raise SnipeITApiError(f"Expected 1 asset with serial {serial}, but found {response.get('total')}.")
raise SnipeITNotFoundError(f"Asset with serial {serial} not found.")

# Single-object shape
if isinstance(response, dict) and response.get("id") is not None:
return self._make(response)

if response.get("total", 0) == 1:
return self._make(response["rows"][0])
elif response.get("total", 0) > 1:
raise SnipeITApiError(f"Expected 1 asset with serial {serial}, but found {response['total']}.")
raise SnipeITNotFoundError(f"Asset with serial {serial} not found.")
raise SnipeITApiError("Unexpected response for byserial")

def create_maintenance(self, asset_id: int, asset_improvement: str, supplier_id: int, title: str, **kwargs: Any) -> Dict[str, Any]:
"""
Creates a new asset maintenance record.

Args:
asset_id: The ID of the asset.
asset_improvement: The type of improvement.
supplier_id: The ID of the supplier.
title: The title of the maintenance.
**kwargs: Additional optional fields.

Returns:
The API response dictionary.
NOTE: Left as-is per user request to handle maintenances later.
"""
data = {
"asset_improvement": asset_improvement,
Expand All @@ -176,20 +202,90 @@ def create_maintenance(self, asset_id: int, asset_improvement: str, supplier_id:
response = self._create(f"{self.path}/{asset_id}/maintenances", data)
return response['payload']

# ---- Licenses ----
def get_licenses(self, asset_id: int) -> Dict[str, Any]:
"""GET /hardware/:id/licenses - get licenses checked out to an asset."""
return self._get(f"{self.path}/{asset_id}/licenses")

# ---- Files ----
def list_files(self, asset_id: int) -> Dict[str, Any]:
"""GET /hardware/:id/files - list uploaded files for an asset."""
return self._get(f"{self.path}/{asset_id}/files")

def upload_files(self, asset_id: int, paths: List[str], notes: str | None = None) -> Dict[str, Any]:
"""POST /hardware/:id/files - upload one or more files (multipart)."""
if not paths:
raise ValueError("At least one file path required")
url = f"{self.api.url}/api/v1/{self.path}/{asset_id}/files"
files: List[tuple[str, tuple[str, Any]]] = []
opened_files: List[Any] = []
try:
for p in paths:
f = open(p, "rb")
opened_files.append(f)
files.append(("file[]", (os.path.basename(p), f)))
data: Dict[str, Any] = {}
if notes is not None:
data["notes"] = notes
resp = self.api.session.post(url, files=files, data=data, timeout=self.api.timeout)
if resp.status_code >= 400:
try:
body = resp.json()
msg = body.get("messages") or body.get("message") or resp.reason
except ValueError:
msg = resp.text or resp.reason
raise SnipeITApiError(str(msg))
try:
return resp.json()
except ValueError:
raise SnipeITApiError("Expected JSON response from file upload")
finally:
for f in opened_files:
try:
f.close()
except Exception:
pass

def download_file(self, asset_id: int, file_id: int, save_path: str) -> str:
"""GET /hardware/:id/files/:file_id - download a specific file to save_path."""
url = f"{self.api.url}/api/v1/{self.path}/{asset_id}/files/{file_id}"
resp = self.api.session.get(url, timeout=self.api.timeout)
if resp.status_code != 200:
try:
body = resp.json()
msg = body.get("messages") or body.get("message") or resp.reason
except ValueError:
msg = resp.text or resp.reason
raise SnipeITApiError(str(msg))
directory = os.path.dirname(save_path)
if directory:
os.makedirs(directory, exist_ok=True)
with open(save_path, "wb") as f:
f.write(resp.content)
return save_path

def delete_file(self, asset_id: int, file_id: int) -> None:
"""DELETE /hardware/:id/files/:file_id/delete - delete a specific file on an asset."""
self._delete(f"{self.path}/{asset_id}/files/{file_id}/delete")

# ---- Labels ----
def labels(self, save_path: str, assets_or_tags: Union[List['Asset'], List[str]]) -> str:
"""
Generates and saves asset labels as a PDF.
Generates and saves asset labels to a file by calling POST /hardware/labels.

Supports both JSON base64 payloads and direct PDF responses for compatibility
with different server configurations and tests.

Args:
save_path: The file path where the PDF labels will be saved.
save_path: The file path where the labels PDF will be saved.
assets_or_tags: A list of Asset objects or a list of asset tag strings.

Returns:
The save_path where the PDF was saved.

Raises:
ValueError: If no valid assets or tags are provided.
SnipeITApiError: If the API request fails.
SnipeITApiError: If the API request fails or response is malformed.
"""
if not assets_or_tags:
raise ValueError("At least one asset or tag required")
Expand All @@ -203,43 +299,51 @@ def labels(self, save_path: str, assets_or_tags: Union[List['Asset'], List[str]]
if not tags:
raise ValueError("No valid asset tags found")

url = f"{self.api.url}/api/v1/hardware/labels"
# Perform request directly to allow binary PDF handling
url = f"{self.api.url}/api/v1/{self.path}/labels"
headers = dict(self.api.session.headers)
# Prefer receiving a PDF directly; fall back to JSON handling below if needed
headers['Accept'] = 'application/pdf'

response = self.api.session.post(
url, json={"asset_tags": tags}, headers=headers, timeout=self.api.timeout
)

if response.status_code != 200:
# Normalize error message from JSON or text
# Accept either JSON payload or PDF
headers["Accept"] = "application/json, application/pdf"
resp = self.api.session.post(url, json={"asset_tags": tags}, headers=headers, timeout=self.api.timeout)
if resp.status_code >= 400:
try:
error_data = response.json()
msg = error_data.get("messages") or response.reason or f"API error: {response.status_code}"
body = resp.json()
msg = body.get("messages") or body.get("message") or resp.reason
except ValueError:
msg = response.text or response.reason or f"API error: {response.status_code}"
msg = resp.text or resp.reason
raise SnipeITApiError(str(msg))

content_type = response.headers.get('Content-Type', '')
directory = os.path.dirname(save_path)
if directory:
os.makedirs(directory, exist_ok=True)

if 'application/pdf' in content_type:
with open(save_path, 'wb') as f:
f.write(response.content)
else:
# Fallback for base64 (if JSON)
try:
data = response.json()
if 'pdf_base64' in data: # Adjust key based on actual response
pdf_bytes = base64.b64decode(data['pdf_base64'])
with open(save_path, 'wb') as f:
f.write(pdf_bytes)
else:
raise SnipeITApiError("Unexpected response format; expected application/pdf content-type or 'pdf_base64' JSON field.")
except (ValueError, KeyError) as e:
raise SnipeITApiError(f"Failed to parse PDF: {e}")
content_type = (resp.headers.get("Content-Type") or "").lower()
if "application/pdf" in content_type:
with open(save_path, "wb") as f:
f.write(resp.content)
return save_path

# Otherwise expect JSON with file contents.
try:
data = resp.json()
except ValueError as e:
raise SnipeITApiError("Unexpected non-JSON and non-PDF response from hardware/labels") from e

# Support official payload shape and legacy 'pdf_base64'
b64 = None
if isinstance(data, dict):
payload = data.get("payload") if isinstance(data.get("payload"), dict) else None
if payload and isinstance(payload, dict):
b64 = payload.get("file_contents")
if not b64:
b64 = data.get("pdf_base64")
if not b64:
raise SnipeITApiError("hardware/labels did not return file data")

try:
pdf_bytes = base64.b64decode(b64)
except Exception as e:
raise SnipeITApiError(f"Failed to decode label file: {e}")
with open(save_path, "wb") as f:
f.write(pdf_bytes)
return save_path
Loading