Skip to content

Refactor SEC processing sensor to improve efficiency with batch processing#151

Merged
jfrench9 merged 4 commits into
mainfrom
refactor/sec-process-sensor-batches
Jan 8, 2026
Merged

Refactor SEC processing sensor to improve efficiency with batch processing#151
jfrench9 merged 4 commits into
mainfrom
refactor/sec-process-sensor-batches

Conversation

@jfrench9
Copy link
Copy Markdown
Member

@jfrench9 jfrench9 commented Jan 8, 2026

Summary

This PR refactors the SEC processing sensor to significantly improve efficiency by implementing batch processing capabilities. The changes optimize how SEC data is processed while maintaining backward compatibility.

Key Accomplishments

  • Batch Processing Implementation: Added support for processing SEC data in batches, reducing individual processing overhead and improving throughput
  • Efficiency Improvements: Refactored core processing logic to minimize resource utilization and processing time
  • Code Structure Enhancement: Reorganized sensor implementation for better maintainability and readability
  • Performance Optimization: Streamlined data handling workflows to reduce memory footprint and processing latency

Changes Made

  • Enhanced the SEC sensor with batch processing capabilities (73 lines added, 29 lines removed)
  • Improved error handling and processing reliability
  • Optimized data flow and resource management
  • Maintained existing sensor interface for seamless integration

Breaking Changes

None. This refactor maintains backward compatibility with existing SEC processing workflows.

Testing Notes

  • Verify that SEC data processing continues to function correctly with existing pipelines
  • Test batch processing performance improvements under various load conditions
  • Validate that sensor triggering behavior remains consistent
  • Confirm error handling works properly for both individual and batch processing scenarios

Infrastructure Considerations

  • Monitor sensor performance metrics after deployment to validate efficiency improvements
  • Batch processing may change resource utilization patterns - observe memory and CPU usage
  • Consider adjusting sensor polling intervals if batch processing improves overall throughput
  • Review logging output to ensure visibility into batch processing operations

🤖 Generated with Claude Code

Branch Info:

  • Source: refactor/sec-process-sensor-batches
  • Target: main
  • Type: refactor

Co-Authored-By: Claude noreply@anthropic.com

…ocessing

This update removes the deprecated `_check_processed_exists` function and introduces a new method, `_list_processed_partitions`, to efficiently list processed partition keys from S3. The `sec_processing_sensor` function is enhanced to process raw filings in batches, utilizing a cursor for pagination and set lookups for checking processed files, significantly improving performance and reducing timeout risks. Logging has been updated for better tracking of processing status.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Jan 8, 2026

@claude please review this PR

@claude
Copy link
Copy Markdown

claude Bot commented Jan 8, 2026

Claude finished @jfrench9's task in 1m 59s —— View job


PR Review: SEC Processing Sensor Batch Optimization

I've reviewed the refactoring of the SEC processing sensor. This is a well-executed performance optimization that addresses a critical scalability bottleneck. Here's my detailed analysis:


✅ Strengths

1. Excellent Performance Optimization
The shift from individual HEAD requests to a single LIST operation is a massive improvement:

  • Before: O(N) HEAD requests for N raw files (network-bound, slow)
  • After: O(1) LIST operation + O(1) set lookups (much faster)

This change alone could reduce sensor evaluation time from minutes to seconds for large filing sets.

2. Smart Cursor-Based Pagination (lines 163-233)
The implementation correctly uses Dagster's cursor mechanism to:

  • Process files in batches (MAX_FILES_PER_TICK = 500)
  • Resume from the last processed key across ticks
  • Reset cursor after completing a full scan
  • Avoid sensor timeout issues

3. Proper Set-Based Deduplication (lines 91-119)
The _list_processed_partitions() function efficiently builds a set of all processed filings, enabling O(1) lookups instead of N individual S3 HEAD requests.

4. Maintains Backward Compatibility

  • Same public API (sensor signature unchanged)
  • Same partition key format: {year}_{cik}_{accession}
  • Same idempotent run keys: sec-process-{partition_key}

⚠️ Issues & Recommendations

1. Critical: Missing Cursor Update on Early Exit (lines 185-190)

if not raw_files:
    # No more files to process - reset cursor to start fresh next time
    if start_after:
        context.log.info("Completed full scan, resetting cursor")
        context.update_cursor("")
    return  # ❌ Bug: returns without yielding anything

Problem: When the sensor completes scanning all files, it calls return instead of yield. Dagster sensors MUST yield something (even SkipReason) on every evaluation, otherwise Dagster may mark the sensor as failed or log errors.

Fix: Replace return with yield SkipReason("No more files to process") on lines 190, 236, and any other bare return statements.

Fix cursor return statements →

2. Edge Case: Cursor May Skip Files During Concurrent Uploads (line 232)

if last_key:
    context.update_cursor(last_key)  # ⚠️ Potential issue

Problem: If new files are uploaded to S3 between ticks with keys that lexicographically sort BEFORE last_key, those files will be permanently skipped because StartAfter=last_key excludes them.

Scenario:

  • Tick 1: Process files up to sec/year=2024/999999/file.zip (cursor = this)
  • Between ticks: New file uploaded: sec/year=2024/500000/new-file.zip
  • Tick 2: Starts after sec/year=2024/999999/..., skips the new file at 500000

Fix Options:

  • Option A (Recommended): Track processed files in Dagster instance storage or external state
  • Option B: Accept this limitation and document it clearly in the docstring
  • Option C: On cursor reset, do a full re-scan and rely on processed_partitions set to dedupe

For production use, I'd recommend Option A or documenting Option B with clear operational guidance.

3. Performance: Unbounded List Operation (lines 98-119)

def _list_processed_partitions(s3_client, bucket: str) -> set[str]:
    # Lists ALL processed files on every sensor tick
    for page in paginator.paginate(Bucket=bucket, Prefix=processed_prefix):

Problem: As the number of processed filings grows (potentially millions), this operation will become slow again. Currently scans all processed files on every tick.

Impact Timeline:

  • 1K filings: Fast (~1-2 seconds)
  • 10K filings: Acceptable (~10-15 seconds)
  • 100K+ filings: Slow (minutes)

Optimization Ideas:

  • Cache the processed set in Valkey with TTL (5-10 minutes)
  • Use incremental updates: track newest processed files only
  • Partition the processed check by year (only check relevant years)

Recommendation: Monitor sensor evaluation time as filing count grows. Add caching when it exceeds 30 seconds.

4. Minor: Incomplete Error Context (line 260)

except Exception as e:
    context.log.error(f"Error in SEC processing sensor: {type(e).__name__}: {e}")
    raise

Enhancement: Add traceback for debugging:

import traceback
context.log.error(f"Error in SEC processing sensor: {type(e).__name__}: {e}\n{traceback.format_exc()}")

5. Code Style: Docstring Update Needed (line 132)
The docstring claims "2. Compares against processed files (set comparison, not individual HEAD requests)" but doesn't mention the cursor-based batching limitation from issue #2 above.

Recommended Addition:

"""
...
Note: Uses cursor-based pagination. Files uploaded between ticks with keys
lexicographically before the cursor may be skipped. Run with cursor="" to force
a full re-scan if needed.
"""

🎯 Testing Recommendations

Since there are no test files for this sensor, I recommend adding tests for:

  1. Batch Processing: Verify cursor advances correctly across multiple ticks
  2. Cursor Reset: Confirm cursor resets after full scan completion
  3. Deduplication: Ensure already-processed files are skipped via set lookup
  4. Edge Case: Test behavior when S3 has more than 500 files
  5. Error Handling: Verify sensor fails gracefully on S3 errors

Add tests at: tests/dagster/sensors/test_sec.py


📊 Performance Impact Estimate

Before (for 1000 raw filings):

  • 1000 HEAD requests @ ~50ms each = 50 seconds
  • Sensor timeout risk: High

After (for 1000 raw filings):

  • 1 LIST operation for processed files @ ~2 seconds
  • 2 LIST pages for raw files @ ~1 second each
  • Total: ~4 seconds
  • Performance improvement: ~12x faster

As processed filings grow to 10K+, the processed list scan will become the bottleneck again (see issue #3).


🔍 Architecture Alignment

✅ Follows CLAUDE.md patterns:

  • Uses centralized config (env.SHARED_RAW_BUCKET)
  • Uses storage helpers (get_raw_key, get_processed_key)
  • Proper error handling with re-raise for Dagster retry
  • Self-documenting code with clear variable names

✅ Follows Dagster best practices:

  • Sensor uses minimum_interval_seconds
  • Idempotent run keys prevent duplicate processing
  • Dynamic partition registration before run requests
  • Proper logging at info/error levels

📝 Summary

Verdict: Strong refactor with one critical bug and one notable edge case.

Must Fix Before Merge:

  • ⛔ Replace bare return statements with yield SkipReason(...) (lines 190, 236)

Should Document:

Nice to Have:

  • 🔧 Add traceback to exception logging
  • 📝 Update docstring to explain cursor behavior
  • ✅ Add test coverage for new batch logic

Once the critical returnyield SkipReason() fix is applied, this PR will significantly improve sensor performance and scalability. Great work on identifying and addressing the HEAD request bottleneck!


Related Files for Context

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: a22117f329

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +171 to +173
paginate_kwargs = {"Bucket": raw_bucket, "Prefix": sec_prefix}
if start_after:
paginate_kwargs["StartAfter"] = start_after
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle late-arriving keys before current cursor

Because the sensor paginates using StartAfter from the persisted cursor and only resets once it reaches the end of the listing, any new raw ZIPs whose S3 keys sort lexicographically before the current cursor won’t be seen until a full scan completes. In a large bucket, that can delay processing of late-arriving filings for hours or days (e.g., older year/CIK keys uploaded after the cursor has advanced). Consider a strategy that periodically rescans earlier prefixes or avoids a strictly forward-only cursor so new keys are picked up promptly.

Useful? React with 👍 / 👎.

…g sensor

This update renames the test class and methods to reflect the new functionality of the _list_processed_partitions helper. The tests now cover scenarios for finding processed partitions, handling empty buckets, and ignoring non-Entity files. Additionally, the sensor tests have been updated to use the new helper, ensuring accurate validation of processed filings.
This update introduces additional yield statements in the sec_processing_sensor function to provide clearer feedback when no raw filings are found or when all filings in a batch have already been processed. These changes enhance the logging and user experience by explicitly stating the reasons for skipping processing in different scenarios.
…lings and already processed filings

This change modifies the assertions in the test cases to reflect the new behavior of the sec_processing_sensor, ensuring that it returns a SkipReason when no raw filings are found or when all filings have already been processed. This enhances the clarity of test outcomes and aligns with recent updates to the sensor's functionality.
@jfrench9 jfrench9 merged commit 279f736 into main Jan 8, 2026
6 checks passed
@jfrench9 jfrench9 deleted the refactor/sec-process-sensor-batches branch January 9, 2026 21:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant