The KatanaClient is the modern, pythonic client for the Katana Manufacturing ERP API. It provides automatic resilience (retries, rate limiting, error handling) using httpx's native transport layer - no decorators or wrapper methods needed.
- 🛡️ Automatic Resilience: Transport-layer retries and rate limiting
- 🚀 Zero Configuration: Works out of the box with environment variables
- 📦 Complete Type Safety: Full type hints and IDE support
- 🔄 Smart Pagination: Built-in pagination with safety limits
- 🔍 Rich Observability: Structured logging and metrics
- ⚡ Pythonic Design: Uses httpx's native extension points
# Install the client
pip install -e .
# Create .env file with credentials
echo "KATANA_API_KEY=your-api-key-here" > .env
echo "KATANA_BASE_URL=https://api.katanamrp.com/v1" >> .envimport asyncio
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
async def main():
# Automatic configuration from .env file
async with KatanaClient() as client:
# Direct API usage - automatic resilience built-in
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
if response.status_code == 200:
products = response.parsed.data
print(f"Retrieved {len(products)} products")
asyncio.run(main())Every API call through KatanaClient automatically includes:
- Network Errors: Automatic retry with exponential backoff (1s, 2s, 4s, 8s, 16s)
- Rate Limits (429): ALL methods (including POST/PATCH) retry with
Retry-Afterheader support - Server Errors (502/503/504): Only idempotent methods (GET, PUT, DELETE, etc.) are retried
- Client Errors (4xx except 429): No retries
async with KatanaClient(max_retries=5) as client:
# This call will automatically retry on failures
# POST/PATCH requests will retry on 429 but not on 5xx errors
# GET/PUT/DELETE requests will retry on both 429 and 5xx errors
response = await get_all_products.asyncio_detailed(
client=client,
limit=100
)
# No decorators or wrapper methods needed!# Automatic rate limit handling with Retry-After header support
async with KatanaClient() as client:
# These calls will automatically be rate limited
for i in range(100):
response = await get_all_products.asyncio_detailed(
client=client,
page=i,
limit=50
)
# Client automatically waits when rate limitedimport logging
# Configure logging to see resilience in action
logging.basicConfig(level=logging.INFO)
async with KatanaClient() as client:
# Automatic error recovery with detailed logging
response = await get_all_products.asyncio_detailed(
client=client,
limit=100
)
# Logs will show retry attempts and recoveryAuto-pagination is ON by default for all GET requests. All pages are automatically collected into a single response.
async with KatanaClient() as client:
# Get ALL products across all pages automatically
all_products = await get_all_products.asyncio_detailed(
client=client,
is_sellable=True, # API filter parameters
limit=250 # Page size (all pages still collected)
)
print(f"Total products: {len(all_products.parsed.data)}")To get a specific page instead of all pages, add an explicit page parameter. Note:
ANY explicit page value (including page=1) disables auto-pagination:
async with KatanaClient() as client:
# Get ONLY page 2 (auto-pagination disabled when page is explicit)
page2_products = await get_all_products.asyncio_detailed(
client=client,
page=2, # Explicit page disables auto-pagination
limit=50
)
# Returns just the 50 items on page 2
# page=1 ALSO disables auto-pagination (returns only first page)
first_page = await get_all_products.asyncio_detailed(
client=client,
page=1, # Get ONLY page 1, not all pages
limit=50
)To limit the total number of items collected (not just page size), use the max_items
extension via the httpx client:
async with KatanaClient() as client:
httpx_client = client.get_async_httpx_client()
response = await httpx_client.get(
"/products",
params={"limit": 50}, # 50 items per page
extensions={"max_items": 200} # Stop after 200 items total
)The transport intelligently adjusts the limit on the final request to fetch only
what's needed, avoiding over-fetching.
| Parameter | Scope | Effect |
|---|---|---|
limit=50 |
URL param | Page size (50 items per request) |
page=2 |
URL param | Get specific page only (disables auto-pagination) |
max_pages=5 |
Client | Max pages to fetch |
max_items=200 |
Extension | Max total items to collect |
The client supports multiple authentication methods (in priority order):
- Direct parameter: Pass
api_keytoKatanaClient() - Environment variable: Set
KATANA_API_KEY .envfile: Create a.envfile with your credentials~/.netrcfile: Use standard Unix credential file
# Create .env file
KATANA_API_KEY=your-api-key-here
KATANA_BASE_URL=https://api.katanamrp.com/v1 # Optional# Automatically loads from .env
async with KatanaClient() as client:
# Uses credentials from .env
passFor centralized credential management across multiple tools:
# Add to ~/.netrc
machine api.katanamrp.com
password your-api-key-here
# Set proper permissions (required)
chmod 600 ~/.netrc# Automatically loads from ~/.netrc
async with KatanaClient() as client:
# Uses credentials from ~/.netrc
passNote: The password field in netrc stores your API key (bearer token), not an
actual password. The login field is optional and ignored.
export KATANA_API_KEY=your-api-key-hereasync with KatanaClient(api_key="your-api-key-here") as client:
# Explicit API key
passimport logging
# Custom configuration with stdlib logger
async with KatanaClient(
api_key="custom-key",
base_url="https://custom.katana.com/v1",
timeout=60.0, # Request timeout
max_retries=5, # Maximum retry attempts
logger=logging.getLogger("custom") # stdlib logger
) as client:
# Your API calls here
passThe logger parameter accepts any object whose debug, info, warning, and error
methods accept (msg, *args, **kwargs) — the standard logging.Logger call convention.
Both logging.Logger and structlog's BoundLogger satisfy this without adapters:
import structlog
# Use structlog directly — no adapter needed
logger = structlog.get_logger("katana")
async with KatanaClient(logger=logger) as client:
passimport httpx
# Pass through httpx configuration
async with KatanaClient(
# Standard KatanaClient options
api_key="your-key",
max_retries=3,
# httpx client options
verify=False, # SSL verification
proxies="http://proxy:8080",
headers={"Custom": "Header"},
event_hooks={
"request": [custom_request_hook],
"response": [custom_response_hook]
}
) as client:
# Client has both resilience AND custom httpx config
passimport logging
# Configure logging to see what's happening
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
async with KatanaClient() as client:
# All resilience actions are logged
response = await get_all_products.asyncio_detailed(
client=client,
limit=100
)Example log output:
2025-01-15 10:30:15 - katana_client - WARNING - Rate limited on attempt 1, waiting 60.0s
2025-01-15 10:31:16 - katana_client - INFO - Request succeeded after 2 attempts
2025-01-15 10:31:16 - katana_client - DEBUG - Response: 200 GET https://api.katanamrp.com/v1/products (1.24s)
async def custom_response_hook(response):
"""Custom hook to track API usage."""
print(f"API call: {response.request.method} {response.request.url}")
print(f"Status: {response.status_code}")
async with KatanaClient(
event_hooks={
"response": [custom_response_hook]
}
) as client:
# Your custom hooks are called alongside built-in ones
response = await get_all_products.asyncio_detailed(
client=client,
limit=10
)import pytest
from unittest.mock import AsyncMock, patch
from katana_public_api_client import KatanaClient
@pytest.mark.asyncio
async def test_api_integration():
"""Test API integration with mocked responses."""
with patch.dict('os.environ', {'KATANA_API_KEY': 'test-key'}):
async with KatanaClient() as client:
# Mock the underlying httpx client
with patch.object(client, 'get_async_httpx_client') as mock_httpx:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"data": [{"id": 1}]}
mock_httpx.return_value.request = AsyncMock(return_value=mock_response)
# Test your API logic here
from katana_public_api_client.api.product import get_all_products
response = await get_all_products.asyncio_detailed(
client=client,
limit=10
)
assert response.status_code == 200import os
import pytest
from katana_public_api_client import KatanaClient
from katana_public_api_client.api.product import get_all_products
@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_api():
"""Test against real Katana API (requires KATANA_API_KEY)."""
api_key = os.getenv('KATANA_API_KEY')
if not api_key:
pytest.skip("KATANA_API_KEY not set")
async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(
client=client,
limit=1
)
assert response.status_code == 200
assert hasattr(response.parsed, 'data')from katana_public_api_client.katana_client import ResilientAsyncTransport
# Create custom transport with different settings
custom_transport = ResilientAsyncTransport(
max_retries=10,
max_pages=50, # Limit automatic pagination
logger=custom_logger
)
async with KatanaClient(
transport=custom_transport
) as client:
# Uses your custom retry logic
passimport asyncio
from katana_public_api_client.api.product import get_product
async def process_products_in_batches(product_ids, batch_size=10):
"""Process products in batches with automatic resilience."""
async with KatanaClient() as client:
results = []
for i in range(0, len(product_ids), batch_size):
batch = product_ids[i:i + batch_size]
# Each call automatically has resilience
batch_results = await asyncio.gather(*[
get_product.asyncio_detailed(client=client, id=product_id)
for product_id in batch
])
results.extend(batch_results)
# Be nice to the API
await asyncio.sleep(0.1)
return resultsimport httpx
from katana_public_api_client.errors import UnexpectedStatus
async with KatanaClient() as client:
try:
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
if response.status_code == 200:
products = response.parsed.data
print(f"Success: {len(products)} products")
else:
print(f"API returned: {response.status_code}")
except httpx.TimeoutException:
print("Request timed out after retries")
except httpx.ConnectError:
print("Connection failed after retries")
except UnexpectedStatus as e:
print(f"Unexpected API response: {e.status_code}")# ✅ Good: Properly manages connections
async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
# ❌ Bad: Doesn't close connections
client = KatanaClient()
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)# ✅ Good: Reasonable timeout for your use case
async with KatanaClient(timeout=30.0) as client:
# Timeout is appropriate for expected response time
pass
# ❌ Bad: Too short timeout causes unnecessary retries
async with KatanaClient(timeout=1.0) as client:
# Will likely timeout and retry frequently
pass# ✅ Good: Auto-pagination is ON by default with safety limits
all_products = await get_all_products.asyncio_detailed(
client=client,
limit=250 # Sets page size; all pages collected automatically
)
# ✅ Good: Use explicit page when you need just one page
page2 = await get_all_products.asyncio_detailed(
client=client,
page=2, # Explicit page = single page only
limit=100
)
# ❌ Bad: Manual pagination loop without safety limits
page = 1
while True: # Could run forever!
response = await get_all_products.asyncio_detailed(
client=client,
page=page,
limit=100
)
# ... handle response
page += 1async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(
client=client,
limit=50
)
# ✅ Good: Check status before accessing data
if response.status_code == 200:
products = response.parsed.data
print(f"Retrieved {len(products)} products")
elif response.status_code == 401:
print("Authentication failed")
else:
print(f"Unexpected status: {response.status_code}")from katana_public_api_client.api.product import get_all_products
# ✅ Good: Direct API calls with automatic resilience
async with KatanaClient() as client:
response = await get_all_products.asyncio_detailed(
client=client,
is_sellable=True
)
if response.status_code == 200:
products = response.parsed.datafrom katana_public_api_client.api.product import get_all_products
from katana_public_api_client.api.sales_order import get_all_sales_orders
from katana_public_api_client.api.inventory import get_all_inventory_points
# ✅ Good: One client for multiple operations
async with KatanaClient() as client:
products = await get_all_products.asyncio_detailed(client=client)
orders = await get_all_sales_orders.asyncio_detailed(client=client)
inventory = await get_all_inventory_points.asyncio_detailed(client=client)
# ❌ Bad: New client for each operation
for operation in [get_all_products, get_all_sales_orders]:
async with KatanaClient() as client:
await operation.asyncio_detailed(client=client)# ✅ Good: Reasonable page size
products = await get_all_products.asyncio_detailed(
client=client,
limit=250 # Good balance of efficiency and memory - automatically paginated
)
# ❌ Bad: Too small (many requests)
products = await get_all_products.asyncio_detailed(
client=client,
limit=10 # Will make many small requests
)
# ❌ Bad: Too large (may hit API limits)
products = await get_all_products.asyncio_detailed(
client=client,
limit=10000 # May exceed API limits
)import asyncio
from katana_public_api_client.api.product import get_product
# ✅ Good: Limited concurrency respects rate limits
async def get_multiple_products(product_ids):
async with KatanaClient() as client:
# Process in small batches
results = []
for i in range(0, len(product_ids), 5): # 5 concurrent requests
batch = product_ids[i:i+5]
batch_results = await asyncio.gather(*[
get_product.asyncio_detailed(client=client, id=pid)
for pid in batch
])
results.extend(batch_results)
# Small delay between batches
if i + 5 < len(product_ids):
await asyncio.sleep(0.2)
return resultsclass KatanaClient(AuthenticatedClient):
"""The pythonic Katana API client with automatic resilience and pagination."""
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: float = 30.0,
max_retries: int = 5,
max_pages: int = 100,
logger: Optional[Logger] = None,
**httpx_kwargs: Any,
): ...
async def __aenter__(self) -> "KatanaClient": ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...Factory function that creates a layered transport with automatic resilience:
def ResilientAsyncTransport(
max_retries: int = 5,
max_pages: int = 100,
logger: Optional[Logger] = None,
**kwargs: Any
) -> RetryTransport:
"""
Creates a chained transport with:
1. AsyncHTTPTransport (base HTTP transport)
2. ErrorLoggingTransport (logs detailed 4xx errors)
3. PaginationTransport (auto-collects paginated responses)
4. RetryTransport (handles retries with Retry-After header support)
"""
...Next Steps: Check out the API Reference for detailed endpoint documentation, or see Testing Guide for testing patterns.