Skip to content

Refactor: Move graph tasks to operations and add agent loading#593

Merged
jfrench9 merged 3 commits into
mainfrom
refactor/worker-operations-imports
Apr 5, 2026
Merged

Refactor: Move graph tasks to operations and add agent loading#593
jfrench9 merged 3 commits into
mainfrom
refactor/worker-operations-imports

Conversation

@jfrench9
Copy link
Copy Markdown
Member

@jfrench9 jfrench9 commented Apr 5, 2026

Summary

Restructures the worker module by relocating graph operation tasks into a dedicated operations/graph/tasks package and introduces agent loading/registry functionality. This refactor establishes a clearer separation of concerns between the worker orchestration layer and the domain-specific operation definitions.

Key Changes

Graph Task Relocation

  • Moved graph_creation.py, graph_materialization.py, and subgraph_creation.py from robosystems/worker/tasks/ to robosystems/operations/graph/tasks/
  • Created a new __init__.py for operations/graph/tasks to properly export and organize the relocated task modules
  • Updated robosystems/worker/tasks/__init__.py to re-export or reference tasks from their new locations, preserving backward compatibility at the worker level

Agent Registry & Loading

  • Enhanced robosystems/operations/agents/__init__.py with new exports
  • Extended robosystems/operations/agents/agent_registry.py with agent loading functionality, enabling dynamic discovery and registration of agents

Worker Module Updates

  • Modified robosystems/worker/__init__.py to align with the new import structure and integrate agent loading capabilities

Motivation

The previous structure co-located task definitions within the worker package, coupling domain logic (graph operations, agent management) with worker infrastructure. This refactor moves toward a cleaner architecture where:

  • operations/ owns the definition of domain tasks and agent registrations
  • worker/ focuses on task execution, orchestration, and Celery integration

Breaking Changes

  • Import paths have changed for graph task modules. Any external code directly importing from robosystems.worker.tasks.graph_creation, graph_materialization, or subgraph_creation may need to update imports. However, re-exports in robosystems/worker/tasks/__init__.py should maintain backward compatibility for consumers importing from the package level.

Testing Notes

  • Verify that existing graph operation tasks (creation, materialization, subgraph creation) execute correctly through the worker after the import path changes
  • Validate that agent loading and registry functionality correctly discovers and registers agents
  • Ensure no circular import issues arise from the new cross-package references between operations and worker

Infrastructure Considerations

  • Worker deployments should be verified to ensure the updated module structure is correctly packaged and all task auto-discovery configurations account for the new operations.graph.tasks location
  • Any task routing or Celery beat schedules referencing tasks by their full dotted path may need updates to reflect the new module locations

🤖 Generated with Claude Code

Branch Info:

  • Source: refactor/worker-operations-imports
  • Target: main
  • Type: refactor

Co-Authored-By: Claude noreply@anthropic.com

- Introduced new worker tasks for graph creation, materialization, and subgraph creation, enhancing the graph operations framework.
- Implemented load_adapter_agents and load_adapter_tasks functions to facilitate future adapter integrations for agent and task registration.
- Updated agent registry to support loading agents from adapters, establishing a foundation for extensibility in agent management.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Apr 5, 2026

@claude please review this PR

@claude
Copy link
Copy Markdown

claude Bot commented Apr 5, 2026

Claude finished @jfrench9's task in 1m 34s —— View job


PR Review: Refactor - Move graph tasks to operations and add agent loading

  • Gather context and understand the request
  • Read changed files
  • Analyze diff for full context
  • Provide comprehensive review

Summary

This is a well-motivated refactor that improves the architectural separation between operations/ (domain logic) and worker/ (infrastructure). The implementation is clean and consistent. A few minor issues worth addressing before merging.


Architecture Assessment

The direction is correct. Relocating graph tasks from worker/tasks/ to operations/graph/tasks/ makes domain logic portable and reinforces the pattern that worker/ provides infrastructure only. The symmetric load_adapter_tasks() / load_adapter_agents() extension points are a good forward-looking design, consistent with the existing get_dagster_components() adapter pattern.


Issues

1. clear_registry() doesn't reset _adapters_loaded — test isolation risk

robosystems/operations/agents/agent_registry.py, line 103–105

def clear_registry() -> None:
  """Clear all registrations. For testing only."""
  _AGENTS.clear()
  # Missing: _adapters_loaded = False  ← !!

If a test calls clear_registry() and then triggers a re-import (or calls load_adapter_agents() again), the _adapters_loaded guard prevents any re-loading. This means adapter loading can never be tested in isolation once the module has been imported. Recommend:

def clear_registry() -> None:
  """Clear all registrations. For testing only."""
  global _adapters_loaded
  _AGENTS.clear()
  _adapters_loaded = False

Fix this →

2. worker/tasks/__init__.py has no clear_registry equivalent at all

TASK_REGISTRY is a module-level dict with no way to reset it in tests. The agent registry has clear_registry() — the task registry should have the same:

def clear_registry() -> None:
  """Clear all registrations. For testing only."""
  global _adapters_loaded
  TASK_REGISTRY.clear()
  _adapters_loaded = False

Fix this →

3. register_task silently overwrites on duplicate (inconsistent with register_agent)

robosystems/worker/tasks/__init__.py, line 23–28

register_agent logs a warning when an existing type is overridden; register_task does not:

def decorator(cls: type[BaseTask]) -> type[BaseTask]:
    TASK_REGISTRY[task_type] = cls  # silent overwrite
    return cls

This makes accidental task type collisions harder to detect. Consider adding the same warning pattern that register_agent uses.

Fix this →


Minor Observations

Operations → Worker cross-dependency (graph_creation.py:17, graph_materialization.py:16, subgraph_creation.py:16): All three relocated files still import register_task and BaseTask from robosystems.worker.tasks. This means operations/graph/tasks/ is not truly independent of the worker infrastructure. This is acceptable given that worker/tasks acts as pure infrastructure (no domain logic), but it's worth noting the separation is architectural/conceptual rather than strict.

get_worker_components() return value (operations/graph/tasks/__init__.py, line 18): The function returns task type strings for introspection but the actual registration is purely a side effect of import. This matches the get_dagster_components() convention described in the docstring, so the pattern is intentional and documented. No action required — just noting it differs from the load_adapter_tasks() call path where the return value of get_worker_components() would be used by the caller.


What's Good

  • Clean re-export aliasing in operations/graph/tasks/__init__.py (import x as x) correctly signals these as intentional re-exports to type checkers
  • Idempotency guards on both load_adapter_tasks() and load_adapter_agents() prevent double-loading
  • worker/__init__.py module docstring now clearly documents the registration strategy
  • noqa: F401 usage is correct for side-effect-only imports
  • No backward compatibility breakage — worker/tasks/__init__.py change isn't needed for re-exports since the tasks were never exported from that __init__.py to begin with

Verdict: Approve after addressing items 1 and 2 (test isolation). Item 3 is a nice-to-have.

Comment thread robosystems/worker/tasks/__init__.py Fixed
Comment thread robosystems/operations/agents/agent_registry.py Fixed
- Added clear_registry function to both agent and task registries to facilitate testing by resetting their states.
- Introduced logging for task registration overrides to improve visibility and debugging during task management.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Apr 5, 2026

@claude have we fixed the issues

@claude
Copy link
Copy Markdown

claude Bot commented Apr 5, 2026

Claude finished @jfrench9's task in 26s —— View job


Yes, all three issues from the previous review have been fixed.

Status: All Issues Resolved ✓

# Issue Status
1 clear_registry() in agent_registry.py didn't reset _adapters_loaded Fixedglobal _adapters_loaded + _adapters_loaded = False added (line 105–107)
2 worker/tasks/__init__.py had no clear_registry() at all Fixedclear_registry() added (lines 70–74), resets both TASK_REGISTRY and _adapters_loaded
3 register_task silently overwrote duplicates Fixed — warning log added (lines 31–35), matching the pattern in register_agent

The github-code-quality bot's complaints about "unused global variable _adapters_loaded" are false positives — _adapters_loaded is used inside load_adapter_*() and clear_registry() via global declarations, which the static analyzer doesn't track correctly.

This PR is good to merge.


Comment thread robosystems/worker/tasks/__init__.py Fixed
- Replaced global variable tracking for loaded adapters with a function attribute to manage state more cleanly in both agent and task registries.
- Updated clear_registry functions to reset the loading state, enhancing testability and maintainability of the registries.
@jfrench9 jfrench9 merged commit 15c0d08 into main Apr 5, 2026
7 checks passed
@jfrench9 jfrench9 deleted the refactor/worker-operations-imports branch April 5, 2026 04:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant