Skip to content

Refactor SEC pipeline: implement incremental S3 copy and remove deprecated config#279

Merged
jfrench9 merged 4 commits into
mainfrom
refactor/copy-direct-incremental
Feb 2, 2026
Merged

Refactor SEC pipeline: implement incremental S3 copy and remove deprecated config#279
jfrench9 merged 4 commits into
mainfrom
refactor/copy-direct-incremental

Conversation

@jfrench9
Copy link
Copy Markdown
Member

@jfrench9 jfrench9 commented Feb 2, 2026

Summary

This PR refactors the SEC data pipeline by implementing an incremental S3 to LadybugDB copy process and removes deprecated pipeline configurations. The changes improve data processing efficiency and clean up legacy code paths.

Key Accomplishments

  • Implemented incremental S3 copy process: Added new ingestion processor that enables incremental copying of SEC data from S3 to LadybugDB, reducing processing time and resource usage
  • Enhanced Dagster pipeline architecture:
    • Updated SEC assets and jobs to support the new incremental workflow
    • Improved sensor configurations for better monitoring and triggering
    • Streamlined job definitions across billing and infrastructure modules
  • Removed deprecated configurations: Cleaned up legacy SEC pipeline settings from environment variables and secrets management
  • Updated documentation: Revised setup documentation to reflect current pipeline configuration

Technical Changes

  • Added comprehensive ingestion processor with incremental copy capabilities (+204 lines)
  • Refactored SEC jobs and sensors with improved error handling and monitoring (+121 lines in jobs, +178 lines in sensors)
  • Created new SEC assets for better data lineage tracking (+100 lines)
  • Removed 32 lines of deprecated environment configuration and 6 lines from secrets manager
  • Updated AWS setup scripts and environment examples to remove obsolete references

Breaking Changes

  • Removed deprecated SEC pipeline environment variables - ensure any external systems are updated to use the new configuration approach
  • Modified Dagster job and sensor interfaces - dependent workflows may need adjustment

Testing Notes

  • Verify incremental copy functionality with existing S3 data sources
  • Test sensor triggering and job execution in development environment
  • Validate that removal of deprecated configurations doesn't affect other pipeline components
  • Confirm proper error handling and monitoring in the new incremental process

Infrastructure Considerations

  • The incremental copy process will change S3 access patterns - monitor for any performance impacts
  • New sensors may trigger more frequent job runs - review resource allocation accordingly
  • Ensure LadybugDB can handle the new incremental data ingestion pattern
  • Updated AWS permissions may be required for the enhanced S3 operations

🤖 Generated with Claude Code

Branch Info:

  • Source: refactor/copy-direct-incremental
  • Target: main
  • Type: refactor

Co-Authored-By: Claude noreply@anthropic.com

- Added a new asynchronous method `copy_incremental_to_ladybug` in `XBRLDuckDBGraphProcessor` to facilitate direct copying of current and previous quarter files to LadybugDB, bypassing DuckDB staging.
- Introduced `SECIncrementalCopyConfig` for configuration management of the incremental copy process.
- Created `sec_graph_incremental_copy` asset to handle the incremental copy operation, utilizing ignore_errors for duplicate handling.
- Updated Dagster jobs and sensors to integrate the new incremental copy functionality, providing a preferred approach for daily updates.
- Enhanced logging and error handling for better traceability during the copy process.
…ated documentation

- Removed unused environment variables and configuration settings related to SEC incremental pipeline and billing schedules from `.env.example`, `aws.sh`, and `env.py`.
- Updated README documentation to reflect the removal of these settings and clarify the current SEC pipeline structure.
- Adjusted job definitions and sensor configurations to align with the new incremental staging approach, ensuring clarity in the Dagster setup.
- Enhanced comments in the codebase to improve understanding of the current pipeline processes and their statuses.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Feb 2, 2026

@claude please review this PR

@claude
Copy link
Copy Markdown

claude Bot commented Feb 2, 2026

Claude finished @jfrench9's task in 1m 56s —— View job


Code Review Complete ✅

I've thoroughly reviewed this PR implementing incremental S3 to LadybugDB copy and removing deprecated SEC pipeline configuration. Overall, this is a well-structured refactor with solid architectural improvements. Below are my findings organized by severity.


🔴 Critical Issues

1. Missing MaterializeResult Attribute (Blocking)

Location: robosystems/adapters/sec/processors/ingestion.py:956-962

The MaterializeResult class is missing the required table_names attribute:

# Line 954-962
return MaterializeResult(
  status=status,
  table_names=successful_tables,  # ❌ This attribute doesn't exist in the class definition
  failed_tables=[
    {"table": name, "error": error} for name, error in failed_tables
  ],
  total_rows=total_rows,
  duration_ms=duration * 1000,
)

Problem: The MaterializeResult dataclass (lines 127-148) only defines:

  • status, total_rows_ingested, duration_ms, tables (list[dict]), error

But the code tries to set table_names which doesn't exist. This will cause a runtime TypeError.

Fix: Either:

  1. Add table_names: list[str] = field(default_factory=list) to the MaterializeResult dataclass, OR
  2. Use the existing tables field to store table information

⚠️ High Priority Issues

2. Inconsistent Error Handling Pattern

Location: robosystems/adapters/sec/processors/ingestion.py:769-971

The copy_incremental_to_ladybug method handles "No files found" errors inconsistently:

# Line 919-924: Checks error message in response
if "No files found" in copy_result.get("error", ""):
  log_progress(f"Skipped {table_name}: no files for Q{quarter}")
  successful_tables.append(table_name)
  table_rows[table_name] = 0

# Line 931-937: Checks error message in exception
except Exception as e:
  error_str = str(e)
  if "No files found" in error_str:
    # Same logic repeated

Problem: This pattern appears multiple times with string matching on error messages. If the error message format changes, the logic breaks.

Recommendation: Define a custom exception type or use error codes instead of string matching.

3. S3 File Existence Check Could Be Optimized

Location: robosystems/adapters/sec/processors/ingestion.py:332-338, 660-661, 876-877

The _s3_url_exists method is called multiple times in loops:

# Line 660-661: Called in a loop for each table
if len(s3_patterns) > 1:
  s3_patterns = [p for p in s3_patterns if self._s3_url_exists(p)]

Problem: Each call makes an S3 HEAD request. For many files, this could be slow.

Recommendation: Consider batching HEAD requests or caching results within the same execution context.

4. Unclear Logic in Quarter Overlap Window

Location: robosystems/adapters/sec/processors/ingestion.py:811-817, robosystems/dagster/sensors/sec.py:311-318

The 5-day overlap logic is duplicated in two places with identical implementation:

# Duplicated in both files
quarter_start_month = (quarter - 1) * 3 + 1
if now.month == quarter_start_month and now.day <= 5:
  # Scan previous quarter

Problem:

  • Duplication means double maintenance if logic changes
  • The "5 days" constant is hardcoded with no explanation of why 5 specifically

Recommendation: Extract this logic into a shared utility function with a named constant QUARTER_OVERLAP_DAYS = 5 and documentation explaining the business rationale.


💡 Medium Priority Improvements

5. Missing Validation in Config Classes

Location: robosystems/dagster/assets/sec.py:363-387

class SECIncrementalCopyConfig(Config):
  graph_id: str = "sec"
  year: int | None = None
  quarter: int | None = None  # No validation that quarter is 1-4
  skip_taxonomy_relationships: bool = False

Issue: The quarter field accepts any integer, but only 1-4 are valid.

Recommendation: Add validation:

from pydantic import field_validator

@field_validator('quarter')
@classmethod
def validate_quarter(cls, v):
    if v is not None and not (1 <= v <= 4):
        raise ValueError('quarter must be between 1 and 4')
    return v

6. Timeout Values Lack Rationale

Location: robosystems/adapters/sec/processors/ingestion.py:154-177

The code defines various timeout constants:

DEFAULT_STAGING_TIMEOUT = 300  # 5 minutes
LARGE_TABLE_STAGING_TIMEOUT = 1800  # 30 minutes
CHUNKED_STAGING_TIMEOUT = 600  # 10 minutes

Issue: These values appear arbitrary without explanation of how they were determined.

Recommendation: Add comments explaining the basis for these values (e.g., "Based on production testing with 100M rows on r7g.medium instance").

7. Progress Callback Pattern Could Be Simplified

Location: Throughout ingestion.py

The pattern of wrapping log_progress appears many times:

def log_progress(msg: str) -> None:
    logger.info(msg)
    if progress_callback:
        progress_callback(msg)

Recommendation: Create a reusable helper or use functools.partial to reduce boilerplate.


✅ Architecture & Design Strengths

Excellent Decisions:

  1. Incremental Copy Design - Bypassing DuckDB for incremental updates is the right call. Direct S3→LadybugDB with ignore_errors=true is simpler and more efficient.

  2. Quarter Overlap Logic - Handling late-indexed filings at quarter boundaries shows attention to real-world SEC filing timing issues.

  3. Retry Logic - The _stage_table_with_retry wrapper with exponential backoff (lines 1273-1332) is well-implemented with proper cleanup.

  4. Configuration Removal - Removing deprecated env vars and secrets is good housekeeping. The PR correctly updates:

    • .env.example
    • robosystems/config/env.py
    • robosystems/config/secrets_manager.py
    • bin/setup/aws.sh
  5. Sensor Chain Architecture - The sensor-based pipeline chaining (download → process → stage → copy → snapshot) is clean and maintainable.

Good Code Quality:

  • Documentation: Excellent docstrings explaining the "why" behind design decisions
  • Error Handling: Comprehensive try-catch blocks with proper cleanup in finally blocks
  • Type Hints: Good use of type annotations throughout
  • Logging: Detailed progress logging for observability

🔍 Minor Observations

8. Copy Timeout Hardcoded

Location: robosystems/adapters/sec/processors/ingestion.py:865

COPY_TIMEOUT = 600  # 10 minutes per table

This is a local constant inside a method, unlike other timeouts defined at module level. Consider moving it to the module-level constants section for consistency.

9. Table Rows Type Inconsistency

The table_rows variable is used as dict[str, int] in some places but the MaterializeResult.tables field expects list[dict[str, Any]]. Ensure consistency.

10. Documentation Update Needed

The PR description mentions "Updated documentation" but I see only one line changed in bin/setup/README.md. Consider updating:

  • Any developer onboarding docs
  • Architecture diagrams if they exist
  • Runbook for the new incremental copy workflow

📋 Testing Recommendations

Before merging, verify:

  1. Fix Critical Issue Inital open source release of robosystems #1 - The MaterializeResult.table_names attribute must be added or the code must be updated to use existing fields
  2. Integration test - Run the full incremental pipeline end-to-end: download → process → copy → verify duplicate handling
  3. Quarter boundary test - Test the 5-day overlap logic on the 1st-5th of a new quarter
  4. Error resilience - Test behavior when S3 files are missing/corrupted
  5. Timeout behavior - Verify the timeout values work for production data volumes

Summary

This PR makes solid architectural improvements by simplifying the incremental update path. The new direct S3→LadybugDB copy approach is cleaner than the DuckDB staging route for daily updates.

Must fix before merge: Issue #1 (MaterializeResult.table_names)
Should address: Issues #2-4 for production robustness
Nice to have: Issues #5-10 for long-term maintainability

The deprecation cleanup is thorough and well-executed. Good work overall!


- Introduced functions to determine current and previous quarters, and to check for overlap during the first days of a new quarter, improving handling of late-indexed SEC filings.
- Updated the `XBRLDuckDBGraphProcessor` to utilize these new utilities for scanning relevant quarters, ensuring comprehensive data retrieval.
- Modified the `MaterializeResult` class to include additional fields for better tracking of ingestion results.
- Refined timeout constants for staging and materialization processes to optimize performance based on production testing.
- Introduced `EntityUpdateResult` class to encapsulate results from the `update_entities_from_s3` operation, providing detailed statistics on entity updates.
- Implemented `update_entities_from_s3` method in `XBRLDuckDBGraphProcessor` to update existing Entity nodes with the latest data from S3 parquet files, addressing the mutability of Entity attributes.
- Created `sec_entity_incremental_update` asset to orchestrate the entity update process within the Dagster pipeline, ensuring it runs after the incremental copy.
- Updated job definitions and sensors to include the new entity update job, enhancing the overall SEC processing pipeline.
- Enhanced logging for better traceability during the entity update process.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Feb 2, 2026

@claude can you review the latest changes

@claude
Copy link
Copy Markdown

claude Bot commented Feb 2, 2026

Claude finished @jfrench9's task in 2m 27s —— View job


Code Review Complete ✅

I've reviewed the latest changes in this PR implementing incremental S3 to LadybugDB copy and removing deprecated SEC pipeline configuration. The refactoring is well-executed with solid improvements to the architecture.


🎉 Key Improvements Since Last Review

Critical Issue Fixed

MaterializeResult.table_names attribute added (line 134 in ingestion.py)

  • Previous review identified this as a blocking issue
  • Now properly defined in the dataclass with correct field factory

New Features Added (4 commits since start)

1. Incremental S3 Copy (copy_incremental_to_ladybug) - ingestion.py:824-1026

  • Direct S3 → LadybugDB copy bypassing DuckDB
  • Uses ignore_errors=true for automatic duplicate handling
  • Scans current + previous quarter during 5-day overlap window
  • Excellent documentation explaining the "why" behind design decisions

2. Entity Update System (update_entities_from_s3) - ingestion.py:1028-1250

  • Solves Entity mutability problem (company names, tickers can change)
  • Uses Cypher MERGE for updates instead of COPY
  • Smart change detection (only updates entities with actual changes)
  • Well-documented performance trade-offs (MERGE 40x slower than COPY, but acceptable for ~50-200 updates/quarter)

3. Quarter Overlap Utilities - adapters/sec/__init__.py:22-99

  • Extracted duplicate logic into shared utilities
  • QUARTER_OVERLAP_DAYS = 5 now a named constant with documentation
  • Functions: get_current_quarter(), get_previous_quarter(), is_in_quarter_overlap_window(), get_quarters_to_scan()
  • Addresses previous review issue chore: improve documentation and suppress build output #4 about duplicated logic

4. New Dagster Assets - dagster/assets/sec.py

5. Enhanced Sensor Chain - dagster/sensors/sec.py

  • New sec_stage_to_copy_sensor: chains stage → copy
  • Updated sec_incremental_post_ingest_snapshot_sensor: chains copy → snapshot
  • Reuses shared quarter scanning utilities (DRY principle)
  • All sensors now default to STOPPED (explicit opt-in via Dagster UI)

6. Config Cleanup - Removed deprecated env vars/secrets

  • SEC_INCREMENTAL_PIPELINE_ENABLED removed
  • SEC_PARALLEL_SENSOR_ENABLED removed
  • BILLING_SCHEDULES_ENABLED removed
  • INSTANCE_SCHEDULES_ENABLED removed
  • Cleaned from: .env.example, env.py, secrets_manager.py, aws.sh

✅ Strengths of This Refactor

Architecture & Design

  1. Incremental Copy Strategy - The direct S3 → LadybugDB approach is the right choice for daily updates:

    • Simpler than DuckDB diffing
    • LadybugDB constraints handle deduplication naturally
    • Well-documented trade-offs vs alternatives
  2. Entity Mutability Solution - Recognizes that Entity nodes are special (mutable) and requires separate update path:

    • Insightful comment about MERGE being 40x slower but acceptable for small volumes
    • Change detection prevents unnecessary updates
    • Batch processing (500 identifiers) for efficiency
  3. Code Organization - Excellent refactoring of shared logic:

    • Quarter overlap utilities extracted to adapters/sec/__init__.py
    • Removes duplication between sensors and ingestion processor
    • Named constant QUARTER_OVERLAP_DAYS = 5 with rationale
  4. Pipeline Decoupling - Clear separation of concerns:

    • Download → Process → Stage (DuckDB) → Copy (LadybugDB) → Update (Entities) → Snapshot
    • Each step can be retried independently
    • Good documentation of when to use each approach
  5. Configuration Validation - Pydantic Field validators on quarter parameter:

    quarter: int | None = Field(default=None, ge=1, le=4)

Code Quality

  1. Documentation - Excellent docstrings explaining:

    • "Why" behind design decisions (not just "what")
    • Performance characteristics (MERGE 40x slower than COPY)
    • When to use each approach (incremental vs full rebuild)
    • Preconditions (LadybugDB must exist with schema)
  2. Error Handling - Comprehensive try-catch blocks with proper cleanup

  3. Type Hints - Good use of type annotations throughout

  4. Progress Logging - Detailed logging for observability


💡 Minor Observations & Suggestions

1. S3 File Existence Checks Could Be Optimized

Location: ingestion.py:932, 942

Each _s3_url_exists() call makes a HEAD request. For multiple quarters being scanned, this could add latency.

Suggestion: Consider caching HEAD results within the same execution context or using batch HEAD requests if boto3 supports it.

2. Error Message String Matching Pattern

Location: ingestion.py:974, 987

if "No files found" in copy_result.get("error", ""):
    # Handle missing files

This pattern appears multiple times. If the error message format changes in LadybugDB, the logic breaks.

Suggestion: Consider using error codes or custom exception types instead of string matching for more robust error handling.

3. DuckDB Credentials in Entity Update

Location: ingestion.py:1104-1117

The update_entities_from_s3 method configures DuckDB S3 credentials inline. This duplicates logic from staging methods.

Suggestion: Extract S3 configuration for DuckDB into a helper method to ensure consistency across all DuckDB operations.

4. Entity Update Performance Metrics

Location: ingestion.py:1170-1230

The Entity update logic fetches existing entities in batches of 500, then compares all in memory. For very large Entity updates (hypothetically), this could be memory-intensive.

Current approach is fine - typical 50-200 updates/quarter means ~200 entities × ~10 fields = ~2KB of data. Not a concern.

5. Timeout Constants Documentation

Location: ingestion.py:961

timeout=INCREMENTAL_COPY_TIMEOUT,  # 600 = 10 minutes

The constant is used but not defined in the visible code sections. Previous review mentioned timeout values lack rationale.

Suggestion: Ensure timeout constants include comments explaining how they were determined (e.g., "Based on production testing with 1M rows on r7g.medium").

6. Missing Test Coverage for New Features

Files changed: No test files in the PR

Recommendation: Add integration tests for:

  • copy_incremental_to_ladybug() with duplicate handling
  • update_entities_from_s3() change detection logic
  • Quarter overlap window calculations (especially at year boundaries)
  • Sensor chaining behavior

📋 Testing Recommendations

Before merging:

  1. Integration Test: Run full incremental pipeline end-to-end

    • Download → Process → Stage → Copy → Entity Update → Snapshot
    • Verify duplicate handling (run copy twice, should be idempotent)
  2. Quarter Boundary Test: Test on the 1st-5th of a new quarter

    • Verify previous quarter is scanned
    • Confirm late-indexed filings are caught
  3. Entity Update Test:

    • Manually change some Entity attributes in S3 parquet
    • Run entity update
    • Verify only changed entities are updated (check logs for counts)
  4. Error Resilience:

    • Test with missing S3 files (should skip gracefully)
    • Test with LadybugDB connection failures (should error properly)
  5. Sensor Chain:

    • Enable all incremental sensors in Dagster UI
    • Verify automatic chaining works without manual intervention

🎯 Summary

This PR successfully implements a cleaner incremental update architecture for SEC data:

What Works Well:

  • ✅ Critical issue from previous review is fixed
  • ✅ Incremental copy strategy is sound and well-documented
  • ✅ Entity mutability problem is elegantly solved
  • ✅ Deprecated configuration properly cleaned up
  • ✅ Code duplication eliminated with shared utilities
  • ✅ Sensor chain architecture is logical and maintainable

Minor Improvements for Future:

  • Consider optimizing S3 HEAD request patterns
  • Use error codes instead of string matching for robustness
  • Add integration tests for new features
  • Extract DuckDB S3 config into helper method

No Blocking Issues Found

The code is production-ready. The minor suggestions above are for long-term maintainability and can be addressed in follow-up PRs if desired.

Great work on this refactor! The new architecture is cleaner, more maintainable, and better documented than the previous approach. 🎉


Branch: refactor/copy-direct-incremental

@jfrench9 jfrench9 merged commit e19bb1b into main Feb 2, 2026
7 checks passed
@jfrench9 jfrench9 deleted the refactor/copy-direct-incremental branch February 3, 2026 06:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant