Official Python SDK for LogWard with advanced features: automatic batching, retry logic, circuit breaker, query API, live streaming, and middleware support.
- ✅ Automatic batching with configurable size and interval
- ✅ Retry logic with exponential backoff
- ✅ Circuit breaker pattern for fault tolerance
- ✅ Max buffer size with drop policy to prevent memory leaks
- ✅ Query API for searching and filtering logs
- ✅ Live tail with Server-Sent Events (SSE)
- ✅ Trace ID context for distributed tracing
- ✅ Global metadata added to all logs
- ✅ Structured error serialization
- ✅ Internal metrics (logs sent, errors, latency, etc.)
- ✅ Flask, Django & FastAPI middleware for auto-logging HTTP requests
- ✅ Full Python 3.8+ support with type hints
- Python 3.8 or higher
- pip or poetry
pip install logward-sdk# For async support
pip install logward-sdk[async]
# For Flask middleware
pip install logward-sdk[flask]
# For Django middleware
pip install logward-sdk[django]
# For FastAPI middleware
pip install logward-sdk[fastapi]
# Install all extras
pip install logward-sdk[async,flask,django,fastapi]from logward_sdk import LogWardClient, ClientOptions
client = LogWardClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
# Send logs
client.info('api-gateway', 'Server started', {'port': 3000})
client.error('database', 'Connection failed', Exception('Timeout'))
# Graceful shutdown (automatic via atexit, but can be called manually)
client.close()| Option | Type | Default | Description |
|---|---|---|---|
api_url |
str |
required | Base URL of your LogWard instance |
api_key |
str |
required | Project API key (starts with lp_) |
batch_size |
int |
100 |
Number of logs to batch before sending |
flush_interval |
int |
5000 |
Interval in ms to auto-flush logs |
| Option | Type | Default | Description |
|---|---|---|---|
max_buffer_size |
int |
10000 |
Max logs in buffer (prevents memory leak) |
max_retries |
int |
3 |
Max retry attempts on failure |
retry_delay_ms |
int |
1000 |
Initial retry delay (exponential backoff) |
circuit_breaker_threshold |
int |
5 |
Failures before opening circuit |
circuit_breaker_reset_ms |
int |
30000 |
Time before retrying after circuit opens |
enable_metrics |
bool |
True |
Track internal metrics |
debug |
bool |
False |
Enable debug logging to console |
global_metadata |
dict |
{} |
Metadata added to all logs |
auto_trace_id |
bool |
False |
Auto-generate trace IDs for logs |
import os
client = LogWardClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
# Batching
batch_size=100,
flush_interval=5000,
# Buffer management
max_buffer_size=10000,
# Retry with exponential backoff (1s → 2s → 4s)
max_retries=3,
retry_delay_ms=1000,
# Circuit breaker
circuit_breaker_threshold=5,
circuit_breaker_reset_ms=30000,
# Metrics & debugging
enable_metrics=True,
debug=True,
# Global context
global_metadata={
'env': os.getenv('APP_ENV'),
'version': '1.0.0',
'hostname': os.uname().nodename,
},
# Auto trace IDs
auto_trace_id=False,
)
)from logward_sdk import LogLevel
client.debug('service-name', 'Debug message')
client.info('service-name', 'Info message', {'userId': 123})
client.warn('service-name', 'Warning message')
client.error('service-name', 'Error message', {'custom': 'data'})
client.critical('service-name', 'Critical message')The SDK automatically serializes Exception objects:
try:
raise RuntimeError('Database timeout')
except Exception as e:
# Automatically serializes error with stack trace
client.error('database', 'Query failed', e)Generated log metadata:
{
"error": {
"name": "RuntimeError",
"message": "Database timeout",
"stack": "Traceback (most recent call last):\n ..."
}
}Track requests across services with trace IDs.
client.set_trace_id('request-123')
client.info('api', 'Request received')
client.info('database', 'Querying users')
client.info('api', 'Response sent')
client.set_trace_id(None) # Clear contextwith client.with_trace_id('request-456'):
client.info('api', 'Processing in context')
client.warn('cache', 'Cache miss')
# Trace ID automatically restored after contextimport uuid
with client.with_new_trace_id():
client.info('worker', 'Background job started')
client.info('worker', 'Job completed')Search and retrieve logs programmatically.
from datetime import datetime, timedelta
from logward_sdk import QueryOptions, LogLevel
result = client.query(
QueryOptions(
service='api-gateway',
level=LogLevel.ERROR,
from_time=datetime.now() - timedelta(hours=24),
to_time=datetime.now(),
limit=100,
offset=0,
)
)
print(f"Found {result.total} logs")
for log in result.logs:
print(log)result = client.query(QueryOptions(q='timeout', limit=50))logs = client.get_by_trace_id('trace-123')
print(f"Trace has {len(logs)} logs")from datetime import datetime, timedelta
from logward_sdk import AggregatedStatsOptions
stats = client.get_aggregated_stats(
AggregatedStatsOptions(
from_time=datetime.now() - timedelta(days=7),
to_time=datetime.now(),
interval='1h',
)
)
for service in stats.top_services:
print(f"{service['service']}: {service['count']} logs")Stream logs in real-time using Server-Sent Events.
def handle_log(log):
print(f"[{log['time']}] {log['level']}: {log['message']}")
def handle_error(error):
print(f"Stream error: {error}")
client.stream(
on_log=handle_log,
on_error=handle_error,
filters={
'service': 'api-gateway',
'level': 'error',
}
)
# Note: This blocks. Run in separate thread for production.Track SDK performance and health.
metrics = client.get_metrics()
print(f"Logs sent: {metrics.logs_sent}")
print(f"Logs dropped: {metrics.logs_dropped}")
print(f"Errors: {metrics.errors}")
print(f"Retries: {metrics.retries}")
print(f"Avg latency: {metrics.avg_latency_ms}ms")
print(f"Circuit breaker trips: {metrics.circuit_breaker_trips}")
# Get circuit breaker state
print(client.get_circuit_breaker_state()) # CLOSED|OPEN|HALF_OPEN
# Reset metrics
client.reset_metrics()Auto-log all HTTP requests and responses.
from flask import Flask
from logward_sdk import LogWardClient, ClientOptions
from logward_sdk.middleware import LogWardFlaskMiddleware
app = Flask(__name__)
client = LogWardClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
LogWardFlaskMiddleware(
app,
client=client,
service_name='flask-api',
log_requests=True,
log_responses=True,
skip_paths=['https://github.com/metrics'],
)Logged automatically:
- Request:
GET /api/users - Response:
GET /api/users 200 (45ms) - Errors:
Request error: Internal Server Error
# settings.py
MIDDLEWARE = [
'logward_sdk.middleware.LogWardDjangoMiddleware',
]
from logward_sdk import LogWardClient, ClientOptions
LOGWARD_CLIENT = LogWardClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
LOGWARD_SERVICE_NAME = 'django-api'from fastapi import FastAPI
from logward_sdk import LogWardClient, ClientOptions
from logward_sdk.middleware import LogWardFastAPIMiddleware
app = FastAPI()
client = LogWardClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
app.add_middleware(
LogWardFastAPIMiddleware,
client=client,
service_name='fastapi-api',
)See the examples/ directory for complete working examples:
- basic.py - Simple usage
- advanced.py - All advanced features
- flask_example.py - Flask integration
- fastapi_example.py - FastAPI integration
client = LogWardClient(options: ClientOptions)log(entry: LogEntry) -> Nonedebug(service: str, message: str, metadata: dict = None) -> Noneinfo(service: str, message: str, metadata: dict = None) -> Nonewarn(service: str, message: str, metadata: dict = None) -> Noneerror(service: str, message: str, metadata_or_error: dict | Exception = None) -> Nonecritical(service: str, message: str, metadata_or_error: dict | Exception = None) -> None
set_trace_id(trace_id: str | None) -> Noneget_trace_id() -> str | Nonewith_trace_id(trace_id: str)→ context managerwith_new_trace_id()→ context manager
query(options: QueryOptions) -> LogsResponseget_by_trace_id(trace_id: str) -> list[dict]get_aggregated_stats(options: AggregatedStatsOptions) -> AggregatedStatsResponse
stream(on_log: callable, on_error: callable = None, filters: dict = None) -> None
get_metrics() -> ClientMetricsreset_metrics() -> Noneget_circuit_breaker_state() -> CircuitState
flush() -> Noneclose() -> None
import atexit
# Automatic cleanup (already registered by client)
# Or manually:
atexit.register(client.close)client = LogWardClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
global_metadata={
'env': os.getenv('ENV'),
'version': '1.0.0',
'region': 'us-east-1',
},
)
)client = LogWardClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
debug=os.getenv('ENV') == 'development',
)
)import time
import threading
def monitor_metrics():
while True:
metrics = client.get_metrics()
if metrics.logs_dropped > 0:
print(f"⚠️ Logs dropped: {metrics.logs_dropped}")
if metrics.circuit_breaker_trips > 0:
print("🔴 Circuit breaker is OPEN!")
time.sleep(60)
# Run in background thread
monitor_thread = threading.Thread(target=monitor_metrics, daemon=True)
monitor_thread.start()# Clone repository
git clone https://github.com/logward-dev/logward-sdk-python.git
cd logward-sdk-python
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dev dependencies
pip install -e ".[dev]"# Run tests
pytest tests/
# Type checking
mypy logward_sdk/
# Code formatting
black logward_sdk/ tests/ examples/
# Linting
ruff check logward_sdk/MIT
Contributions are welcome! Please open an issue or PR on GitHub.
- Documentation: https://logward.dev/docs
- Issues: GitHub Issues