Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ REDIS_URL=redis://localhost:6379/0
# --- REST API (Python FastAPI) ------------------------------------------------
# NOTE: Do NOT use generic HOST= or PORT= here — Next.js also reads PORT
# and would start on the wrong port. FastAPI defaults to 0.0.0.0:8000.
CORS_ORIGINS=http://localhost:3000
CORS_ORIGINS=["http://localhost:3000"]

# --- Internal API (Python <-> Next.js service communication) ------------------
TRACEROOT_UI_URL=http://localhost:3000
Expand Down
21 changes: 11 additions & 10 deletions backend/db/clickhouse/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""ClickHouse client using clickhouse-connect."""

import os
from datetime import UTC, datetime
from typing import Any

import clickhouse_connect
from clickhouse_connect.driver.client import Client

from shared.config import settings


class ClickHouseClient:
"""ClickHouse client wrapper for trace data operations."""
Expand All @@ -15,15 +16,15 @@ def __init__(self, client: Client):
self._client = client

@classmethod
def from_env(cls) -> "ClickHouseClient":
"""Create client from environment variables."""
port = os.getenv("CLICKHOUSE_HTTP_PORT") or os.getenv("CLICKHOUSE_PORT", "8123")
def from_settings(cls) -> "ClickHouseClient":
"""Create client from centralized settings."""
ch = settings.clickhouse
client = clickhouse_connect.get_client(
host=os.getenv("CLICKHOUSE_HOST", "localhost"),
port=int(port),
username=os.getenv("CLICKHOUSE_USER", "clickhouse"),
password=os.getenv("CLICKHOUSE_PASSWORD", "clickhouse"),
database=os.getenv("CLICKHOUSE_DATABASE", "default"),
host=ch.host,
port=ch.port,
username=ch.user,
password=ch.password,
database=ch.database,
)
return cls(client)

Expand Down Expand Up @@ -148,5 +149,5 @@ def get_clickhouse_client() -> ClickHouseClient:
"""Get or create the singleton ClickHouse client."""
global _client
if _client is None:
_client = ClickHouseClient.from_env()
_client = ClickHouseClient.from_settings()
return _client
4 changes: 2 additions & 2 deletions backend/rest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from rest.routers.public.traces import router as public_traces_router
from rest.routers.traces import router as traces_router
from rest.routers.users import router as users_router
from shared.config import settings

app = FastAPI(
title="Traceroot API",
Expand All @@ -27,10 +28,9 @@
)

# CORS configuration
cors_origins = os.getenv("CORS_ORIGINS", "http://localhost:3000").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
Expand Down
11 changes: 4 additions & 7 deletions backend/rest/routers/deps.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
"""FastAPI dependencies for authentication (via Next.js internal API)."""

import os
from typing import Annotated

import httpx
from fastapi import Depends, Header, HTTPException, status
from shared.enums import MemberRole

# Configuration for internal API (Python → Next.js)
TRACEROOT_UI_URL = os.getenv("TRACEROOT_UI_URL", "http://localhost:3000")
INTERNAL_API_SECRET = os.getenv("INTERNAL_API_SECRET", "")
from shared.config import settings
from shared.enums import MemberRole


class ProjectAccessInfo:
Expand Down Expand Up @@ -43,9 +40,9 @@ async def get_project_access(
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{TRACEROOT_UI_URL}/api/internal/validate-project-access",
f"{settings.traceroot_ui_url}/api/internal/validate-project-access",
json={"userId": x_user_id, "projectId": project_id},
headers={"X-Internal-Secret": INTERNAL_API_SECRET},
headers={"X-Internal-Secret": settings.internal_api_secret},
)
except httpx.RequestError as e:
raise HTTPException(
Expand Down
12 changes: 4 additions & 8 deletions backend/rest/routers/public/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import gzip
import hashlib
import logging
import os
import uuid
from datetime import UTC, datetime
from typing import Annotated, Any
Expand All @@ -28,16 +27,13 @@
from pydantic import BaseModel

from rest.services.s3 import get_s3_service
from worker.tasks import process_s3_traces
from shared.config import settings
from worker.ingest_tasks import process_s3_traces

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/public/traces", tags=["Traces (Public)"])

# Configuration for internal API (Python → Next.js)
TRACEROOT_UI_URL = os.getenv("TRACEROOT_UI_URL", "http://localhost:3000")
INTERNAL_API_SECRET = os.getenv("INTERNAL_API_SECRET", "")


async def authenticate_api_key(
authorization: Annotated[str | None, Header()] = None,
Expand Down Expand Up @@ -81,9 +77,9 @@ async def authenticate_api_key(
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{TRACEROOT_UI_URL}/api/internal/validate-api-key",
f"{settings.traceroot_ui_url}/api/internal/validate-api-key",
json={"keyHash": key_hash},
headers={"X-Internal-Secret": INTERNAL_API_SECRET},
headers={"X-Internal-Secret": settings.internal_api_secret},
)
except httpx.RequestError as e:
logger.error(f"Failed to validate API key: {e}")
Expand Down
24 changes: 13 additions & 11 deletions backend/rest/services/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
"""

import logging
import os
from typing import Any

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

from shared.config import settings

logger = logging.getLogger(__name__)


Expand All @@ -39,17 +40,18 @@ def __init__(
"""Initialize S3 service.

Args:
endpoint_url: S3/MinIO endpoint URL.
access_key_id: AWS access key ID.
secret_access_key: AWS secret access key.
bucket_name: S3 bucket name.
region: AWS region.
endpoint_url: S3/MinIO endpoint URL. Defaults to settings.
access_key_id: AWS access key ID. Defaults to settings.
secret_access_key: AWS secret access key. Defaults to settings.
bucket_name: S3 bucket name. Defaults to settings.
region: AWS region. Defaults to settings.
"""
self._endpoint_url = endpoint_url or os.getenv("S3_ENDPOINT_URL")
self._access_key_id = access_key_id or os.getenv("S3_ACCESS_KEY_ID")
self._secret_access_key = secret_access_key or os.getenv("S3_SECRET_ACCESS_KEY")
self._bucket_name = bucket_name or os.getenv("S3_BUCKET_NAME", "traceroot")
self._region = region or os.getenv("S3_REGION", "us-east-1")
s3 = settings.s3
self._endpoint_url = endpoint_url or s3.endpoint_url
self._access_key_id = access_key_id or s3.access_key_id
self._secret_access_key = secret_access_key or s3.secret_access_key
self._bucket_name = bucket_name or s3.bucket_name
self._region = region or s3.region

self._client: Any = None

Expand Down
78 changes: 78 additions & 0 deletions backend/shared/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Centralized configuration for the Traceroot backend.

All environment variables are read once at import time via Pydantic Settings.
Services import ``settings`` from this module instead of calling os.getenv().

Environment variables are loaded from .env by entrypoints (rest/main.py,
worker/celery_app.py) before this module is first imported.
"""

from pydantic_settings import BaseSettings, SettingsConfigDict


class ClickHouseSettings(BaseSettings):
"""ClickHouse connection settings.

Env vars: CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_NATIVE_PORT,
CLICKHOUSE_USER, CLICKHOUSE_PASSWORD, CLICKHOUSE_DATABASE
"""

model_config = SettingsConfigDict(env_prefix="CLICKHOUSE_")

host: str = "localhost"
port: int = 8123
native_port: int = 9000
user: str = "clickhouse"
password: str = "clickhouse"
database: str = "default"


class S3Settings(BaseSettings):
"""S3/MinIO settings for trace data storage.

Env vars: S3_ENDPOINT_URL, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY,
S3_BUCKET_NAME, S3_REGION
"""

model_config = SettingsConfigDict(env_prefix="S3_")

endpoint_url: str | None = None
access_key_id: str | None = None
secret_access_key: str | None = None
bucket_name: str = "traceroot"
region: str = "us-east-1"


class RedisSettings(BaseSettings):
"""Redis settings for Celery broker and result backend.

Env vars: REDIS_URL, REDIS_RESULT_URL
"""

model_config = SettingsConfigDict(env_prefix="REDIS_")

url: str = "redis://localhost:6379/0"
result_url: str = "redis://localhost:6379/1"


class Settings(BaseSettings):
"""Root settings for the Traceroot backend.

Nested settings (clickhouse, s3, redis) each read from their own
prefixed env vars. Top-level fields read from unprefixed env vars.
"""

# CORS
cors_origins: list[str] = ["http://localhost:3000"]

# Internal communication (Python <-> Next.js)
traceroot_ui_url: str = "http://localhost:3000"
internal_api_secret: str = ""

Comment on lines +68 to +71
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty internal secret

internal_api_secret defaults to "", which means the backend can start up without INTERNAL_API_SECRET and then all internal auth calls that send X-Internal-Secret will fail at runtime (or, if the frontend is also configured with an empty secret, effectively disables the shared-secret check). If the intent is “fail fast at startup”, this should be validated as non-empty (no default) so misconfiguration is caught immediately.

# Service-specific settings
clickhouse: ClickHouseSettings = ClickHouseSettings()
s3: S3Settings = S3Settings()
redis: RedisSettings = RedisSettings()


settings = Settings()
25 changes: 12 additions & 13 deletions backend/worker/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@
# Load environment variables from .env file
load_dotenv()

from shared.config import settings

# Configure logging for worker tasks
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s: %(levelname)s/%(processName)s] %(name)s - %(message)s",
)

# Redis URL from environment or default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
REDIS_RESULT_URL = os.getenv("REDIS_RESULT_URL", "redis://localhost:6379/1")

app = Celery("traceroot")

app.conf.update(
# Broker and backend
broker_url=REDIS_URL,
result_backend=REDIS_RESULT_URL,
broker_url=settings.redis.url,
result_backend=settings.redis.result_url,
# Reliability settings
task_acks_late=True, # ACK after task completes (not before)
task_reject_on_worker_lost=True, # Requeue if worker dies mid-task
Expand All @@ -53,15 +51,16 @@
result_expires=3600,
)

# Auto-discover tasks from worker.tasks module
app.autodiscover_tasks(["worker"])
# Auto-discover tasks from worker.ingest_tasks module
app.autodiscover_tasks(["worker"], related_name="ingest_tasks")


@worker_ready.connect
def on_worker_ready(**kwargs):
"""Run ClickHouse migrations when worker starts."""
logger.info("Running ClickHouse migrations on worker startup...")
try:
ch = settings.clickhouse
result = subprocess.run(
[
str(Path(__file__).resolve().parent.parent / "db" / "clickhouse" / "migrate.sh"),
Expand All @@ -71,11 +70,11 @@ def on_worker_ready(**kwargs):
text=True,
env={
**os.environ,
"CLICKHOUSE_HOST": os.getenv("CLICKHOUSE_HOST", "localhost"),
"CLICKHOUSE_PORT": os.getenv("CLICKHOUSE_NATIVE_PORT", "9000"),
"CLICKHOUSE_USER": os.getenv("CLICKHOUSE_USER", "clickhouse"),
"CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD", "clickhouse"),
"CLICKHOUSE_DATABASE": os.getenv("CLICKHOUSE_DATABASE", "default"),
"CLICKHOUSE_HOST": ch.host,
"CLICKHOUSE_PORT": str(ch.native_port),
"CLICKHOUSE_USER": ch.user,
"CLICKHOUSE_PASSWORD": ch.password,
"CLICKHOUSE_DATABASE": ch.database,
},
)

Expand Down
1 change: 0 additions & 1 deletion backend/worker/features/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def process_s3_traces(self, s3_key: str, project_id: str) -> dict:
# Import here to avoid circular imports and ensure fresh connections
from db.clickhouse.client import get_clickhouse_client
from rest.services.s3 import get_s3_service
from worker.transformer import transform_otel_to_clickhouse
from worker.otel_transform import transform_otel_to_clickhouse

logger.info(f"Processing S3 traces: {s3_key} for project {project_id}")

Expand Down
Empty file removed backend/worker/jobs/__init__.py
Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def transform_otel_to_clickhouse(
span_record["total_tokens"] = total_tokens

# Calculate cost from actual token counts
from worker.features.tokens.pricing import get_model_price
from worker.tokens.pricing import get_model_price

prices = get_model_price(model_name)
if prices:
Expand All @@ -351,7 +351,7 @@ def transform_otel_to_clickhouse(
span_record["cost"] = float(input_cost + output_cost)
else:
# Fall back to text-based estimation
from worker.features.tokens import calculate_cost
from worker.tokens import calculate_cost

usage = calculate_cost(
model=model_name,
Expand Down
Empty file.
1 change: 0 additions & 1 deletion backend/worker/schedules/__init__.py

This file was deleted.

Empty file removed backend/worker/tests/__init__.py
Empty file.
Loading
Loading