Skip to content

fix(cloud/client): push events with watermark + backfill script (Bug 2)#162

Merged
Gradata merged 2 commits into
mainfrom
fix/sync-push-events-clean
May 2, 2026
Merged

fix(cloud/client): push events with watermark + backfill script (Bug 2)#162
Gradata merged 2 commits into
mainfrom
fix/sync-push-events-clean

Conversation

@Gradata

@Gradata Gradata commented May 2, 2026

Copy link
Copy Markdown
Owner

Replaces #161 (which had 43 unrelated commits diverged from main, unmergeable).

What

  • client.sync() reads events.jsonl, filters by last_sync_at watermark, batches 500/req, advances cursor on 200, retries smaller batch on 413
  • Sync state at <BRAIN_DIR>/.gradata-sync-state.json (events.jsonl untouched, append-only)
  • Server-side idempotency on (brain_id, event_id) — safe to re-run
  • scripts/backfill_to_cloud.py — one-shot historical replay for the ~5,800 events the broken sync silently dropped

Tests

  • 9/9 new tests in tests/test_cloud_client_sync.py pass

Pairs with

Gradata added 2 commits May 2, 2026 12:22
…ug 2)

Pairs with gradata-cloud PR #12. Was Bug 2 from /tmp/audit-bug2-watermark.md.

- client.sync() now reads events.jsonl, filters by last_sync_at watermark,
  batches 500 at a time, advances cursor on 200, retries with smaller batch on 413.
- Sync state at <BRAIN_DIR>/.gradata-sync-state.json (separate from events.jsonl
  which stays append-only and untouched).
- 9/9 new tests pass in tests/test_cloud_client_sync.py.

Council perspective P3 (Skeptic) had this take after audit-gate blocked the
aggregate-only path — 3 cloud routes (analytics.py, activity.py, corrections.py)
read raw events directly, so telemetry-only would have flatlined them.
One-shot: counts events.jsonl, resets local sync state, calls client.sync()
in a loop until cursor catches up. Idempotent — server upserts on
(brain_id, event_id). Run after PRs #11/#12/#161 merge to backfill the
~5800 historical events the broken sync silently dropped.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@coderabbitai

coderabbitai Bot commented May 2, 2026

Copy link
Copy Markdown

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 6242acb6-fe6c-4ad8-ab83-9cf2aad3a4f1

📥 Commits

Reviewing files that changed from the base of the PR and between 951791e and ea99573.

📒 Files selected for processing (3)
  • Gradata/scripts/backfill_to_cloud.py
  • Gradata/src/gradata/cloud/client.py
  • Gradata/tests/test_cloud_client_sync.py

📝 Walkthrough
  • Breaking change: CloudClient.sync() signature updated from sync(self) -> dict to sync(self, batch_size: int = 500) -> int; now returns count of ingested events instead of a dict
  • Implements watermarked sync using .gradata-sync-state.json to filter and replay events from events.jsonl by ts
  • Batches events up to 500 per request; retries with halved batch size on HTTP 413 errors
  • Generates deterministic event_id (SHA-256 of ts:type:source) for idempotent server-side deduplication on (brain_id, event_id) tuple
  • Advances and persists local watermark cursor only on successful server responses (200); events.jsonl remains append-only
  • Added scripts/backfill_to_cloud.py: one-shot backfill script to reset sync state and replay ~5,800 historical events dropped by previous broken sync
  • 9 new tests added in tests/test_cloud_client_sync.py covering watermarking, batching, retry logic, idempotency, and event_id generation (all passing)

Walkthrough

Added event batch synchronization to CloudClient.sync() with watermark persistence, batch-based posting to /sync endpoint, and HTTP 413 retry handling. Includes a new backfill script for replaying historical events and comprehensive tests for sync behavior.

Changes

Event Batch Synchronization

Layer / File(s) Summary
Watermark Persistence
Gradata/src/gradata/cloud/client.py
Introduces .gradata-sync-state.json for persisting last_sync_at and last_event_id watermarks. Adds _read_sync_state() and _write_sync_state() helpers to track ingested events across sync runs.
Core Sync & Batching
Gradata/src/gradata/cloud/client.py
Replaces CloudClient.sync() to read events.jsonl, filter unsynced events by timestamp, split into batches, POST to /sync endpoint, and advance watermark after each successful batch. Returns total ingested count or 0 if disconnected.
Event Formatting
Gradata/src/gradata/cloud/client.py
Adds _format_event() to generate deterministic event_id via SHA-256 hash of (ts, type, source) tuple, ensuring idempotent replay and deduplication.
Error Handling & Retry
Gradata/src/gradata/cloud/client.py
Introduces internal _TooLargeError exception and updates _post() to map HTTP 413 to _TooLargeError (triggering batch-size halving and retry) and other HTTP/URL errors to ConnectionError. Retry loop halves batch size until success or abort.
Backfill Script
Gradata/scripts/backfill_to_cloud.py
One-shot script with CLI (--brain-dir, --dry-run) that scans events.jsonl to count events by type, resets sync state, then repeatedly calls client.sync() until all events ingested or 200-iteration limit reached. Logs progress and returns appropriate exit codes.
Tests & Validation
Gradata/tests/test_cloud_client_sync.py
Comprehensive test suite covering absent/empty events, watermark filtering idempotency, batch splitting (1500 events in batch_size=500 chunks), HTTP 413 retry halving behavior, deterministic event_id generation, and POST call counts.

Sequence Diagram

sequenceDiagram
    participant Script as Backfill Script
    participant Client as CloudClient
    participant FS as FileSystem
    participant Server as Cloud Server

    Script->>Client: sync(batch_size=500)
    Client->>FS: read .gradata-sync-state.json
    FS-->>Client: {last_sync_at, last_event_id}
    Client->>FS: read events.jsonl
    FS-->>Client: [event, event, ...]
    Client->>Client: filter events by last_sync_at
    Client->>Client: partition into batch_size chunks
    loop For each batch
        Client->>Client: format_event(ev) with deterministic event_id
        Client->>Server: POST /sync [batch]
        alt HTTP 413 (Too Large)
            Server-->>Client: 413
            Client->>Client: batch_size = batch_size / 2
            Client->>Server: POST /sync [smaller batch]
        end
        Server-->>Client: 200 OK
        Client->>Client: advance last_sync_at to newest event ts
        Client->>FS: write .gradata-sync-state.json
        FS-->>Client: ✓
    end
    Client-->>Script: ingested_count
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested labels

feature, breaking-change, infrastructure

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/sync-push-events-clean

Review rate limit: 4/5 reviews remaining, refill in 12 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

@Gradata Gradata merged commit b82a2dc into main May 2, 2026
8 of 9 checks passed
@Gradata Gradata deleted the fix/sync-push-events-clean branch May 2, 2026 19:23
@coderabbitai coderabbitai Bot added bug Something isn't working feature breaking-change labels May 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking-change bug Something isn't working feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant