Skip to content

Add wait-and-create Dagster job for automated capacity provisioning#332

Merged
jfrench9 merged 2 commits into
mainfrom
feature/add-capacity-dagster-job
Feb 10, 2026
Merged

Add wait-and-create Dagster job for automated capacity provisioning#332
jfrench9 merged 2 commits into
mainfrom
feature/add-capacity-dagster-job

Conversation

@jfrench9
Copy link
Copy Markdown
Member

Summary

This PR introduces a new Dagster-based capacity provisioning system that automatically waits for and creates graph resources based on demand. The implementation adds a comprehensive job orchestration framework with real-time monitoring capabilities.

Key Accomplishments

Core Infrastructure

  • New Dagster Job: Implemented wait-and-create graph job for intelligent capacity provisioning
  • Enhanced Allocation Manager: Extended graph allocation management with improved capacity handling logic
  • Real-time Monitoring: Added direct monitoring capabilities with SSE (Server-Sent Events) integration
  • API Tier Models: Introduced new graph tier models for structured capacity management

Monitoring & Observability

  • Dagster Integration: Enhanced Dagster job definitions and monitoring infrastructure
  • Direct Monitor: New monitoring service for real-time capacity tracking and notifications
  • SSE Enhancements: Improved server-sent events handling for live status updates

API & Router Updates

  • Graph Router: Expanded graph management endpoints with new capacity provisioning routes
  • Model Extensions: Updated API models to support tiered graph configurations
  • Enhanced Definitions: Improved Dagster job definitions with proper registration

Breaking Changes

None identified. This is an additive feature that extends existing functionality without modifying current APIs.

Testing Notes

  • Comprehensive Test Coverage: Added 340+ lines of new test coverage for graph jobs
  • Integration Tests: New test suite for direct monitoring functionality
  • Router Testing: Extended graph router test coverage
  • Allocation Manager Tests: Enhanced existing allocation manager test suite

All tests should pass and maintain existing functionality while validating new capacity provisioning workflows.

Infrastructure Considerations

  • Resource Management: New job will consume additional compute resources during capacity provisioning operations
  • Monitoring Overhead: Direct monitoring service adds real-time processing requirements
  • Database Impact: Enhanced allocation tracking may increase database query volume
  • SSE Connections: New real-time monitoring features will maintain persistent connections

The implementation follows existing architectural patterns and maintains compatibility with current deployment strategies.


🤖 Generated with Claude Code

Branch Info:

  • Source: feature/add-capacity-dagster-job
  • Target: main
  • Type: feature

Co-Authored-By: Claude noreply@anthropic.com

- Introduced a new job, `wait_and_create_graph_job`, to handle graph creation when capacity is unavailable and an ASG scale-up is triggered.
- Implemented helper functions for emitting SSE events during the provisioning process.
- Added a new API endpoint to check the capacity status of graph tiers, returning their availability for immediate provisioning or indicating if scaling is required.
- Updated the allocation manager to support capacity checks and scaling logic.
- Enhanced tests to cover the new job and API functionality, ensuring robust error handling and correct behavior during capacity provisioning scenarios.
@jfrench9
Copy link
Copy Markdown
Member Author

@claude please review this PR

@claude
Copy link
Copy Markdown

claude Bot commented Feb 10, 2026

Claude finished @jfrench9's task in 1m 34s —— View job


Code Review for PR #332

Completed comprehensive review of the wait-and-create Dagster job for automated capacity provisioning. Overall, this is a well-architected solution with excellent test coverage and proper error handling.


✅ Strengths

Architecture & Design

  1. Smart Capacity Handling: The unified poll-retry loop in _wait_and_create elegantly handles both capacity waiting and race conditions in a single mechanism (robosystems/dagster/jobs/graph.py:524-651)
  2. Separation of Concerns: Direct execution for fast path (SSE), Dagster for wait-for-capacity scenarios - "cake and eat it too" approach
  3. Consistent Error Handling: Top-level try/except ensures no dangling operations (robosystems/dagster/jobs/graph.py:497-661)
  4. Graceful Degradation: SSE emission failures don't break operations (robosystems/dagster/jobs/graph.py:56-74)

Reliability & Observability

  1. Comprehensive SSE Progress: Progress updates at 5%, 10%, 50%, 55%, 95% ranges give users clear feedback
  2. Asset Materialization Tracking: All operations report to Dagster for observability even when executed directly
  3. Extensive Test Coverage: 340+ lines of new tests with excellent edge case coverage
  4. Rate Limiting for Scale-ups: 5-minute cooldown prevents thundering herd on ASG triggers (robosystems/middleware/graph/allocation_manager.py:1125-1136)

Code Quality

  1. Proper CLAUDE.md Adherence: Uses uv run pattern, centralized config, follows architectural guidelines
  2. Type Safety: Proper Pydantic models with field validation (robosystems/models/api/graphs/tier.py)
  3. Security Conscious: Input validation, proper error messages, no sensitive data leaks
  4. Documentation: Clear docstrings and inline comments explaining non-obvious logic

🔍 Issues Found

Critical (Must Fix)

None identified - No blocking issues found.

High Priority (Should Fix)

  1. Missing Subscription Creation in Direct Flow (robosystems/middleware/sse/direct_monitor.py:153-181)

    • The subscription creation in run_graph_creation catches all exceptions but only logs them
    • If subscription creation fails, the graph exists but has no billing record
    • Recommendation: Consider transaction rollback or compensation logic if subscription creation fails
    • Location: robosystems/middleware/sse/direct_monitor.py:177-181
  2. Database Generator Pattern Issue (robosystems/dagster/jobs/graph.py:609-624)

    db_gen = get_db_session()
    db = next(db_gen)
    try:
        # ... do work ...
    finally:
        try:
            next(db_gen)
        except StopIteration:
            pass
    • This pattern is fragile and non-idiomatic
    • Recommendation: Use context manager or explicit session management:
    from robosystems.database import get_db_session
    
    async with get_db_session() as db:
        # ... do work ...
    • Locations: robosystems/dagster/jobs/graph.py:609-624, robosystems/middleware/sse/direct_monitor.py:162-176
  3. Potential Memory Leak in Paginated Scan (robosystems/middleware/graph/allocation_manager.py:1314-1338)

    • The _update_volume_registry_add_database accumulates all_items across all pages
    • For a large volume registry, this could consume significant memory
    • Recommendation: Process volumes as they're found instead of accumulating:
    while pages_scanned < MAX_PAGES:
        response = self.volume_table.scan(**scan_params)
        items = response.get("Items", [])
        if items:
            # Process immediately instead of accumulating
            volume_id = items[0]["volume_id"]
            break
        # ... pagination logic ...
    • Location: robosystems/middleware/graph/allocation_manager.py:1314-1338

Medium Priority (Nice to Fix)

  1. Hard-coded Tier Exclusions (robosystems/routers/graphs/main.py:837-840)

    excluded_tiers = ["ladybug-shared"]
    • Hard-coded list should be in config
    • Recommendation: Move to config/graph_tier.py as INTERNAL_TIERS or similar
    • Location: robosystems/routers/graphs/main.py:837-840
  2. Magic Numbers for Progress Percentages (robosystems/dagster/jobs/graph.py:567-570)

    overall_pct = 55 + (percent / 100) * 40  # Maps 0-100% to 55-95%
    • Progress range mapping (55-95%) is not self-documenting
    • Recommendation: Add constants or comments explaining the ranges
    • Location: robosystems/dagster/jobs/graph.py:567-570
  3. Inconsistent Error Handling in Provisioning (robosystems/middleware/sse/direct_monitor.py:853-901)

    • The error cleanup path has nested try/except with generic exception catching
    • Could mask underlying issues
    • Recommendation: Be more specific about which exceptions to catch in cleanup logic
    • Location: robosystems/middleware/sse/direct_monitor.py:853-901

Low Priority (Consider Addressing)

  1. Repeated Mock Setup in Tests (tests/dagster/test_graph_jobs.py:124-160)

    • Multiple tests have nearly identical mock setup code
    • Recommendation: Extract common fixtures for allocation manager, service mocks
    • Locations: Multiple test files could benefit from shared fixtures
  2. Long Function Length (robosystems/dagster/jobs/graph.py:469-661)

    • _wait_and_create is 193 lines - could be broken into smaller functions
    • Recommendation: Extract billing subscription creation, progress calculation logic
    • Location: robosystems/dagster/jobs/graph.py:469-661
  3. String-based Tier Mapping (robosystems/routers/graphs/main.py:422-429)

    • Dictionary mapping strings to enums is repeated in multiple places
    • Recommendation: Move to GraphTier.from_string() class method
    • Locations: Multiple files (routers/graphs/main.py:422-429, middleware/sse/direct_monitor.py:169)

📊 Code Quality Metrics

  • Test Coverage: Excellent - 340+ new test lines for 309 lines of new job code (110% ratio)
  • Documentation: Good - All public functions have docstrings
  • Type Safety: Good - Proper Pydantic models and type hints throughout
  • Error Handling: Very Good - Comprehensive error handling with SSE failure emissions
  • Security: Good - Input validation, no credential leaks, proper authorization checks

🎯 Recommendations Summary

Before Merge

  1. Address High Priority Fix MCP tools query results to include units and improve dimensions #2: Fix database generator pattern (5-10 min fix)
  2. Review High Priority Inital open source release of robosystems #1: Ensure subscription creation failures are acceptable (design decision)
  3. Consider High Priority Enhance SEC pipeline security and add reconsolidation support #3: Add memory optimization for volume registry scans

Future Improvements

  1. Extract tier mappings to a centralized location
  2. Add constants for progress percentage ranges
  3. Refactor long functions (especially _wait_and_create)
  4. Create shared test fixtures to reduce duplication

✅ LGTM with Minor Recommendations

This PR is well-implemented with excellent test coverage and proper error handling. The high-priority items are recommendations for robustness rather than blocking issues. The architecture is sound and follows established patterns in the codebase.

Key Achievement: Successfully implements graceful capacity provisioning with automatic fallback to ASG scaling, providing excellent UX during infrastructure constraints.


Reviewed Files:

  • robosystems/dagster/jobs/graph.py (309 new lines)
  • robosystems/middleware/graph/allocation_manager.py (77 additions)
  • robosystems/middleware/sse/direct_monitor.py (83 additions)
  • robosystems/routers/graphs/main.py (124 additions)
  • robosystems/models/api/graphs/tier.py (19 additions)
  • tests/dagster/test_graph_jobs.py (340 new lines)
  • tests/middleware/sse/test_direct_monitor.py (154 additions)
  • tests/routers/graphs/test_graph.py (102 additions)
    | Branch: feature/add-capacity-dagster-job

- Replaced `get_db_session` with a direct call to `session` for improved clarity and consistency in database session management.
- Enhanced error logging for billing subscription failures to provide better operational visibility.
- Updated tests to reflect changes in database session handling, ensuring proper mocking of the new session approach.
@jfrench9 jfrench9 merged commit 68a9bd2 into main Feb 10, 2026
7 checks passed
@jfrench9 jfrench9 deleted the feature/add-capacity-dagster-job branch February 13, 2026 03:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant