Skip to content

Implement SEC EFTS-based discovery and downloader with improved logging#134

Merged
jfrench9 merged 3 commits into
mainfrom
feature/improved-sec-downloader
Jan 4, 2026
Merged

Implement SEC EFTS-based discovery and downloader with improved logging#134
jfrench9 merged 3 commits into
mainfrom
feature/improved-sec-downloader

Conversation

@jfrench9
Copy link
Copy Markdown
Member

@jfrench9 jfrench9 commented Jan 4, 2026

Summary

This PR introduces a new SEC EFTS (Electronic Filing Transfer System) based discovery mechanism and downloader, replacing the previous implementation with a more robust and efficient solution. The changes include comprehensive logging improvements and a modular client architecture for SEC data processing.

Key Accomplishments

  • New EFTS Client: Implemented a complete EFTS-based discovery system for SEC filings with support for various filing types and date ranges
  • Enhanced Downloader: Created a new downloader module with improved error handling, retry logic, and progress tracking
  • Rate Limiting: Added intelligent rate limiting to comply with SEC API guidelines and prevent throttling
  • Modular Architecture: Restructured SEC client code into separate modules (downloader, efts, rate_limiter) for better maintainability
  • Improved Logging: Refactored logging statements across the SEC pipeline for enhanced readability and debugging capabilities
  • Comprehensive Testing: Added extensive test coverage for all new client modules with 947 new test lines

Breaking Changes

  • Modified SEC asset definitions in the Dagster pipeline - existing workflows may need to be updated
  • Changes to job configurations and pipeline scripts may require deployment adjustments
  • Updated module imports and initialization patterns for SEC client components

Testing Notes

  • All new modules include comprehensive unit tests covering success paths, error scenarios, and edge cases
  • Rate limiter tests validate proper throttling and compliance with SEC guidelines
  • EFTS client tests cover various filing types and discovery scenarios
  • Downloader tests include retry logic, error handling, and progress tracking validation

Infrastructure Considerations

  • The new rate limiting mechanism may affect processing throughput - monitor performance metrics
  • Enhanced logging may increase log volume - consider log retention policies
  • EFTS-based discovery may have different network requirements than the previous implementation
  • Updated Dagster asset definitions will require pipeline redeployment

🤖 Generated with Claude Code

Branch Info:

  • Source: feature/improved-sec-downloader
  • Target: main
  • Type: feature

Co-Authored-By: Claude noreply@anthropic.com

- Introduced a new `downloader.py` module for async downloading of SEC filings using the EFTS API, featuring rate limiting and progress tracking.
- Added `efts.py` for querying the EFTS API, enabling efficient discovery of filings by form type and date.
- Updated `__init__.py` to include lazy imports for new modules, preventing circular dependencies.
- Refactored SEC pipeline to utilize EFTS for filing discovery, replacing the previous per-company iteration approach with O(1) discovery.
- Enhanced Dagster assets and jobs to support the new EFTS-based workflow, including configuration updates for submission fetching and filing downloads.
…improved readability

- Updated logging statements in `downloader.py` and `sec.py` to use multi-line formatting for better clarity and maintainability.
- Ensured consistent formatting across log messages to enhance code readability.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Jan 4, 2026

@claude please review this PR

@chatgpt-codex-connector
Copy link
Copy Markdown

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.
To continue using code reviews, you can upgrade your account or add credits to your account and enable them for code reviews in your settings.

@claude
Copy link
Copy Markdown

claude Bot commented Jan 4, 2026

Claude finished @jfrench9's task in 2m 13s —— View job


PR Review Complete

I've completed a thorough review of this PR. Overall, this is an excellent refactoring that modernizes the SEC pipeline with EFTS-based discovery, async downloads, and intelligent rate limiting. The code quality is high, test coverage is comprehensive, and the architecture is well-designed. Below are my findings organized by category.


✅ Strengths

Architecture & Design

  • O(1) EFTS Discovery: The shift from per-company iteration to bulk EFTS queries is a significant performance improvement
  • Modular Design: Clean separation of concerns between efts.py (discovery), downloader.py (downloads), and rate_limiter.py (rate control)
  • Lazy Imports: Smart use of __getattr__ in __init__.py to avoid circular dependencies (robosystems/adapters/sec/client/init.py:14-32)
  • Context Managers: Proper async context manager patterns ensure resources are cleaned up

Code Quality

  • Type Hints: Excellent use of type annotations throughout, including TYPE_CHECKING for avoiding runtime circular imports
  • Error Handling: Good error recovery with retry logic for 429 rate limit responses
  • Logging: Comprehensive logging with appropriate levels (info for progress, debug for details, warning for issues)
  • Documentation: Clear docstrings with usage examples

Testing

  • 947 lines of tests for new modules shows excellent coverage
  • Tests cover success paths, error scenarios, edge cases, and boundary conditions
  • Good use of mocking to isolate units under test

🔍 Issues Found

1. CRITICAL: Potential Infinite Recursion (High Priority)

Location: robosystems/adapters/sec/client/downloader.py:146-150 and robosystems/adapters/sec/client/efts.py:124-128

Both retry handlers can cause unbounded recursion if rate limiting persists:

if response.status == 429:
    retry_after = int(response.headers.get("Retry-After", 60))
    logger.warning(f"Rate limited, waiting {retry_after}s")
    await asyncio.sleep(retry_after)
    return await self._download_filing(hit, year, bucket)  # Recursive call with no limit

Impact: Stack overflow if SEC API continues returning 429s
Fix: Add max retry counter or use exponential backoff with a limit

Recommendation:

async def _download_filing(self, hit: EFTSHit, year: int, bucket: str, retry_count: int = 0) -> bool:
    MAX_RETRIES = 3
    # ... existing code ...
    if response.status == 429:
        if retry_count >= MAX_RETRIES:
            logger.error(f"Max retries exceeded for {hit.accession_number}")
            self._stats.failed += 1
            return False
        retry_after = int(response.headers.get("Retry-After", 60))
        logger.warning(f"Rate limited, waiting {retry_after}s (retry {retry_count + 1}/{MAX_RETRIES})")
        await asyncio.sleep(retry_after)
        return await self._download_filing(hit, year, bucket, retry_count + 1)

Apply the same pattern to efts.py:_fetch_page().


2. Security: Unvalidated Retry-After Header

Location: robosystems/adapters/sec/client/downloader.py:147 and robosystems/adapters/sec/client/efts.py:125

retry_after = int(response.headers.get("Retry-After", 60))

Issue: No validation on Retry-After value. A malicious or misconfigured server could send extreme values (e.g., "999999999") causing DoS.

Fix:

retry_after = min(int(response.headers.get("Retry-After", 60)), 300)  # Cap at 5 minutes

3. Resource Leak Risk: S3 Upload Outside Semaphore

Location: robosystems/adapters/sec/client/downloader.py:167-181

async with self._semaphore:
    async with self._limiter:
        # Download happens here
        content = await response.read()

# Upload to S3 (OUTSIDE semaphore)
try:
    self._get_s3_client().s3_client.put_object(...)

Issue: The S3 upload happens outside the semaphore, meaning you could have unbounded concurrent S3 uploads if many downloads complete simultaneously. This could exhaust file descriptors or network connections.

Impact: Potential resource exhaustion under high concurrency

Recommendation: Either:

  1. Keep S3 upload inside the semaphore (simplest)
  2. Add a separate semaphore for S3 uploads with a different limit

4. Dagster Asset Duplication of Code

Location: robosystems/dagster/assets/sec.py:325-590

Issue: The sec_raw_filings asset contains ~265 lines of inline async download logic that largely duplicates functionality from the new SECDownloader and EFTSClient classes. This violates DRY and creates maintenance burden.

Current:

@asset
def sec_raw_filings(...):
    async def run_efts_download():
        # 250+ lines of inline download logic
        # Reimplements rate limiting, semaphores, retry logic, etc.

Recommendation: Refactor to use the new classes:

@asset
def sec_raw_filings(...):
    import asyncio
    from robosystems.adapters.sec.client import SECDownloader
    
    async def run_download():
        async with SECDownloader(
            requests_per_second=config.download_rate,
            max_concurrent=config.download_concurrency,
            skip_existing=config.skip_existing,
        ) as downloader:
            return await downloader.download_year(
                year=year,
                form_types=config.form_types,
                ciks=cik_filter,
            )
    
    stats = asyncio.run(run_download())
    # Handle submissions separately if needed

This would reduce the asset from ~300 lines to ~50 lines and eliminate duplication.


5. Missing Error Handling: Empty Content Check

Location: robosystems/adapters/sec/client/downloader.py:155

if not content or len(content) == 0:

Issue: len(content) == 0 is redundant with not content, but more importantly, this should also handle cases where content is None (though response.read() shouldn't return None, defensive coding is better).

Minor Issue: The check is fine, but the error handling only logs and marks as failed. Consider if some filings genuinely have empty XBRL (unlikely but possible).


6. Test Gap: No Integration Tests for Full Pipeline

Observation: While unit tests are comprehensive (947 lines), there don't appear to be integration tests that verify:

  • End-to-end flow from EFTS query → download → S3 upload
  • Actual EFTS API behavior (all tests use mocks)
  • Rate limiting behavior under real load

Recommendation: Add at least one integration test marked with @pytest.mark.integration that:

  • Queries EFTS for a small set of filings (e.g., 5 filings from a test year)
  • Downloads them to a test S3 bucket
  • Verifies S3 objects exist and have correct content

7. Inconsistent Naming: SECSingleFilingConfig

Location: robosystems/dagster/assets/sec.py:262-266

class SECSingleFilingConfig(Config):
    """Configuration for single filing processing."""
    pass  # No config needed - partition key contains all info

Issue: This config class is defined but completely empty. If no config is needed, consider removing it entirely or documenting why it exists as a placeholder.


📝 Minor Issues & Suggestions

Code Style

  1. Import Organization (robosystems/adapters/sec/client/downloader.py:11-22): Imports are well-organized, but consider grouping third-party vs. first-party more explicitly

  2. Magic Numbers:

    • robosystems/adapters/sec/client/rate_limiter.py:56: Token time calculation could benefit from a comment explaining the formula
    • Progress logging interval (100) appears in multiple places - consider making it a constant
  3. Documentation:

    • EFTS module has excellent API docs link (robosystems/adapters/sec/client/efts.py:11)
    • Consider adding similar architecture docs to downloader.py

Potential Improvements

  1. Progress Callbacks: The downloader logs progress but doesn't support callbacks. For Dagster integration, consider adding optional progress callbacks:

    async def download_filings(
        self,
        hits: list[EFTSHit],
        year: int,
        bucket: str | None = None,
        progress_callback: Callable[[int, int], None] | None = None,
    ):
  2. Metrics: Consider exposing download metrics (bytes/sec, req/sec) as structured data rather than just logs

  3. Cancellation: Add proper cancellation support for long-running downloads (Python 3.11+ task groups would be ideal)


🧪 Test Quality Assessment

Excellent Coverage

  • Unit tests cover all major code paths
  • Edge cases well-tested (empty CIKs, missing fields, 404 responses, rate limiting)
  • Mock usage is appropriate and not over-mocked
  • Async testing properly uses @pytest.mark.asyncio

Minor Test Improvements

  1. Test Isolation (tests/adapters/sec/client/test_downloader.py:241-265): Some tests manually construct mocks when fixtures would improve readability

  2. Parameterized Tests: Consider using @pytest.mark.parametrize for testing multiple similar scenarios (e.g., different HTTP status codes)


📊 Performance Considerations

Positive

  • Async/await throughout for I/O-bound operations ✅
  • Semaphore-based concurrency control prevents overwhelming systems ✅
  • Token bucket rate limiting is more efficient than sleep-based approaches ✅

Concerns

  1. Memory Usage: Large result sets from EFTS (up to 10,000 filings) are held in memory. For production, consider streaming or pagination.

  2. S3 Upload Strategy: Uploads use sync boto3 client in async context (robosystems/adapters/sec/client/downloader.py:169-174). Consider using aiobotocore for truly async S3 operations.


🔒 Security Review

Good Practices

  • ✅ User-Agent headers set correctly (SEC requirement)
  • ✅ No secrets in code
  • ✅ Input validation on CIKs (zero-padding)

Recommendations

  1. Add input validation for year parameter (reasonable range like 1990-2030)
  2. Validate form types against a known list to prevent injection-style attacks
  3. Consider sanitizing file paths derived from user input (though CIK is numeric, defense in depth is good)

📋 Migration & Breaking Changes

The PR description mentions breaking changes. Here's what I found:

  1. Dagster Asset Changes: sec_raw_filings signature changed - existing materializations may break
  2. Import Path Changes: New modules require import updates if anyone was importing internal SEC client code
  3. Config Schema Changes: SECDownloadConfig has new fields that may require config updates

Recommendation: Add migration guide to PR description with specific upgrade steps.


✨ Final Recommendations

Must Fix (Before Merge)

  1. ❗ Add max retry limit to prevent infinite recursion (Issue Inital open source release of robosystems #1)
  2. ❗ Validate and cap Retry-After header (Issue Fix MCP tools query results to include units and improve dimensions #2)
  3. ❗ Move S3 upload inside semaphore or add separate semaphore (Issue Enhance SEC pipeline security and add reconsolidation support #3)

Should Fix (This PR or Follow-up)

  1. Refactor Dagster asset to use new classes instead of duplicating logic (Issue chore: improve documentation and suppress build output #4)
  2. Add at least one integration test (Issue Fix worker monitor Valkey connection with Secrets Manager auth #6)

Nice to Have (Follow-up PRs)

  • Add progress callbacks for better Dagster integration
  • Consider aiobotocore for async S3
  • Add structured metrics output
  • Parameterize tests for better coverage visualization

🎯 Summary

This is high-quality work that represents a significant improvement to the SEC pipeline. The architecture is sound, the code is well-tested, and the performance improvements are substantial. The critical issues identified are fixable with minor changes and don't undermine the overall design.

Recommendation: Approve with required changes - fix the three must-fix issues, then merge. The remaining suggestions can be addressed in follow-up PRs.

Great job on this implementation! The EFTS-based approach is a major upgrade. 🚀


Tasks:

  • Read and analyze new client modules (downloader, efts, rate_limiter)
  • Review modified Dagster assets and jobs
  • Examine test coverage and quality
  • Check for security issues and best practices
  • Assess architecture and code quality
  • Provide comprehensive review feedback

Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
Comment thread robosystems/adapters/sec/client/__init__.py Fixed
- Added retry count and maximum retry limits to handle rate limiting more effectively in `downloader.py` and `efts.py`.
- Improved logging for rate limiting scenarios to provide clearer feedback on retry attempts and wait times.
- Updated `__init__.py` to clarify which modules are available via lazy imports, improving module organization and usability.
@jfrench9
Copy link
Copy Markdown
Member Author

jfrench9 commented Jan 4, 2026

@claude have we addressed all issues?

@claude
Copy link
Copy Markdown

claude Bot commented Jan 4, 2026

Claude finished @jfrench9's task in 1m 56s —— View job


Issue Status Review Complete ✅

I've verified which issues from the previous review have been addressed:


Critical Issues Successfully Fixed

1. Infinite Recursion in Retry Logic (CRITICAL)

Status: ✅ FIXED

Both downloader.py and efts.py now have proper retry limits:

  • downloader.py:123-163: Added MAX_RETRIES = 3 with retry counter
  • efts.py:115-139: Added MAX_RETRIES = 3 with retry counter

The retry logic now properly exits after 3 attempts and logs errors appropriately. This prevents stack overflow.

2. Unvalidated Retry-After Header (Security)

Status: ✅ FIXED

Both files now cap the Retry-After value:

  • downloader.py:155-156: retry_after = min(int(response.headers.get("Retry-After", 60)), MAX_RETRY_AFTER) where MAX_RETRY_AFTER = 300
  • efts.py:131-132: Same pattern with 5-minute cap

This prevents DoS attacks from malicious servers sending extreme retry values.

3. S3 Upload Semaphore Placement (Resource Leak)

Status: ✅ FIXED

The S3 upload is now inside the semaphore block:

  • downloader.py:140-194: The entire download + S3 upload is wrapped in the semaphore context
  • Line 180: Comment explicitly states "Upload to S3 (inside semaphore to limit concurrent uploads)"

This prevents unbounded concurrent S3 uploads.


Issues NOT Fixed

4. Code Duplication in Dagster Asset (Should Fix)

Status: ❌ NOT FIXED

The sec_raw_filings asset in robosystems/dagster/assets/sec.py:325-590 still contains ~265 lines of inline async download logic that duplicates functionality from the new SECDownloader and EFTSClient classes.

Current state:

  • Lines 325-590: Contains inline run_efts_download() function with duplicated logic
  • Lines 498-566: Duplicated download_filing() function with retry logic, rate limiting, semaphores
  • Lines 414-449: Duplicated fetch_submission() function

Why this matters:

  • Violates DRY (Don't Repeat Yourself) principle
  • Creates maintenance burden (bugs need to be fixed in two places)
  • The new classes were created to centralize this logic, but the Dagster asset doesn't use them

Recommendation: Refactor the asset to use SECDownloader and EFTSClient directly. The asset could be reduced from ~300 lines to ~50 lines.

5. Missing Integration Tests (Nice to Have)

Status: ❌ NOT ADDED

While unit test coverage is excellent (947 lines of tests), there are no integration tests that verify:

  • End-to-end flow from EFTS query → download → S3 upload
  • Actual EFTS API behavior (all tests use mocks)
  • Rate limiting behavior under real load

Note: This was marked as "nice to have" in the original review, so not critical for merge.


⚠️ New Issue: __init__.py Export Problem

Status: ⚠️ NEW ISSUE (flagged by github-code-quality bot)

The robosystems/adapters/sec/client/__init__.py file has a mismatch between __all__ and actual module attributes.

Problem: The file uses lazy imports via __getattr__() but documents lazy-loaded names as if they're exported. This causes static analysis tools to flag them.

Current state (lines 36-47):

__all__ = [
  # Eagerly loaded (safe for `from ... import *`)
  "SEC_BASE_URL",
  "ArelleClient",
  "SECClient",
  "enable_test_mode",
]

# Note: The following are available via lazy import (use direct imports):
# - EFTSClient, EFTSHit, query_efts, query_efts_sync (from .efts)
# - SECDownloader, DownloadStats, download_sec_filings, download_sec_filings_sync (from .downloader)
# - AsyncRateLimiter, RateMonitor, RateStats (from .rate_limiter)

Impact:

  • Low severity for runtime (lazy imports work fine)
  • Breaks static analysis tools (code-quality bot flags this)
  • Documentation is already correct (tells users to use direct imports)

Recommendation: The comment already documents the correct usage pattern. The code-quality warnings can be safely ignored, as the lazy import pattern is intentional.


📊 Summary

Issue Status Priority Fixed?
Infinite recursion in retry handlers Critical Must Fix YES
Unvalidated Retry-After header Security Must Fix YES
S3 upload outside semaphore High Must Fix YES
Code duplication in Dagster asset Medium Should Fix NO
Missing integration tests Low Nice to Have NO
__init__.py export mismatch Low Static Analysis ⚠️ NEW (safe to ignore)

🎯 Final Answer

Yes, all critical issues have been addressed!

The three must-fix issues from the original review are now resolved:

  • ✅ Stack overflow prevention (retry limits added)
  • ✅ Security hardening (Retry-After capped)
  • ✅ Resource leak prevention (S3 upload in semaphore)

Remaining work (not blockers):

  1. Code duplication in the Dagster asset - recommend addressing in a follow-up PR
  2. __init__.py static analysis warnings - intentional design choice, safe to ignore

The PR is safe to merge from a correctness and security standpoint. Great work fixing the critical issues! 🚀


s3.client.head_object(Bucket=bucket, Key=submissions_key)
submissions_skipped += 1
continue
except Exception:
# Execute with progress logging
completed = 0
for coro in asyncio.as_completed(tasks):
await coro
tasks = [fetch_submission(cik) for cik in ciks_to_fetch]
completed = 0
for coro in asyncio.as_completed(tasks):
await coro
completed = 0

for coro in asyncio.as_completed(tasks):
await coro
@jfrench9 jfrench9 merged commit 60fc02c into main Jan 4, 2026
11 checks passed
@jfrench9 jfrench9 linked an issue Jan 4, 2026 that may be closed by this pull request
19 tasks
@jfrench9 jfrench9 deleted the feature/improved-sec-downloader branch January 4, 2026 09:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SEC Shared Repository Production Launch

1 participant