-
Notifications
You must be signed in to change notification settings - Fork 0
Sync engine: backfill, streaming, and gap recovery #11
Copy link
Copy link
Closed
Description
Summary
Implement the sync engine (pkg/sync/) that orchestrates fetching namespace data from upstream and persisting it to the store.
State machine
INITIALIZING → BACKFILLING → STREAMING
↑ │
└────────────┘ (gap detected)
Components
Coordinator (pkg/sync/coordinator.go)
- Manages state transitions between backfill and streaming modes
- Loads checkpoint from store on startup
- Determines start height from config or last checkpoint
Backfill (pkg/sync/backfill.go)
- Fetches heights
[start, network_head]in configurable batches (default: 100) - Concurrent fetchers (default: 4)
- Checkpoint saved every N heights
- Progress reporting for observability (Observability: metrics, logging, and tracing #7)
Subscription manager (pkg/sync/subscriptions.go)
- Subscribes to new headers from upstream
- For each header, fetches blobs for tracked namespaces
- Detects gaps and triggers backfill mode
- Manages downstream subscriptions (see Configuration system: YAML loading and validation #13 for improvements)
Interface dependency
Consumes DataFetcher interface (pkg/fetch/fetcher.go):
type DataFetcher interface {
GetHeader(ctx context.Context, height uint64) (*header.ExtendedHeader, error)
GetBlobs(ctx context.Context, height uint64, ns Namespace) ([]*blob.Blob, error)
GetNetworkHead(ctx context.Context) (uint64, error)
SubscribeHeaders(ctx context.Context) (<-chan *header.ExtendedHeader, error)
Close() error
}References
- Design doc:
test_plan.md— Sync Engine section
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels