diff --git a/.gitignore b/.gitignore index d84c24a..3981ed1 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,9 @@ coverage.html # Local config config.local.yaml +# Project +ISSUES.md + # OS .DS_Store Thumbs.db diff --git a/cmd/apex/main.go b/cmd/apex/main.go index 8863283..f40fed4 100644 --- a/cmd/apex/main.go +++ b/cmd/apex/main.go @@ -1,14 +1,21 @@ package main import ( + "context" + "errors" "fmt" "os" + "os/signal" + "syscall" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/evstack/apex/config" + "github.com/evstack/apex/pkg/fetch" + "github.com/evstack/apex/pkg/store" + syncer "github.com/evstack/apex/pkg/sync" ) // Set via ldflags at build time. @@ -90,10 +97,7 @@ func startCmd() *cobra.Command { Int("namespaces", len(cfg.DataSource.Namespaces)). Msg("starting apex indexer") - // TODO(phase1): wire store, fetcher, and sync coordinator. - log.Info().Msg("apex indexer is not yet implemented — scaffolding only") - - return nil + return runIndexer(cmd.Context(), cfg) }, } } @@ -112,3 +116,56 @@ func setupLogger(cfg config.LogConfig) { log.Logger = log.Output(os.Stdout) } } + +func runIndexer(ctx context.Context, cfg *config.Config) error { + // Parse namespaces from config. + namespaces, err := cfg.ParsedNamespaces() + if err != nil { + return fmt.Errorf("parse namespaces: %w", err) + } + + // Open store. + db, err := store.Open(cfg.Storage.DBPath) + if err != nil { + return fmt.Errorf("open store: %w", err) + } + defer db.Close() //nolint:errcheck + + // Persist configured namespaces. + ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + for _, ns := range namespaces { + if err := db.PutNamespace(ctx, ns); err != nil { + return fmt.Errorf("put namespace: %w", err) + } + } + + // Connect to Celestia node. + fetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger) + if err != nil { + return fmt.Errorf("connect to celestia node: %w", err) + } + defer fetcher.Close() //nolint:errcheck + + // Build and run the sync coordinator. + coord := syncer.New(db, fetcher, + syncer.WithStartHeight(cfg.Sync.StartHeight), + syncer.WithBatchSize(cfg.Sync.BatchSize), + syncer.WithConcurrency(cfg.Sync.Concurrency), + syncer.WithLogger(log.Logger), + ) + + log.Info(). + Int("namespaces", len(namespaces)). + Uint64("start_height", cfg.Sync.StartHeight). + Msg("sync coordinator starting") + + err = coord.Run(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("coordinator: %w", err) + } + + log.Info().Msg("apex indexer stopped") + return nil +} diff --git a/go.mod b/go.mod index 6d31969..49ac63e 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,35 @@ module github.com/evstack/apex go 1.24.0 require ( + github.com/filecoin-project/go-jsonrpc v0.10.1 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.10.2 gopkg.in/yaml.v3 v3.0.1 + modernc.org/sqlite v1.46.1 ) require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/ipfs/go-log/v2 v2.0.8 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/spf13/pflag v1.0.9 // indirect + go.opencensus.io v0.22.3 // indirect + go.uber.org/atomic v1.6.0 // indirect + go.uber.org/multierr v1.5.0 // indirect + go.uber.org/zap v1.14.1 // indirect + golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/sys v0.41.0 // indirect + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + modernc.org/libc v1.67.6 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 42cf336..ce87694 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,68 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/filecoin-project/go-jsonrpc v0.10.1 h1:iEhgrjO0+rawwOZWRNgexLrWGLA+IEUyWiRRL134Ob8= +github.com/filecoin-project/go-jsonrpc v0.10.1/go.mod h1:OG7kVBVh/AbDFHIwx7Kw0l9ARmKOS6gGOr0LbdBpbLc= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/ipfs/go-log/v2 v2.0.8 h1:3b3YNopMHlj4AvyhWAx0pDxqSQWYi4/WuWO7yRV6/Qg= +github.com/ipfs/go-log/v2 v2.0.8/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= @@ -17,13 +71,114 @@ github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= +go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= +go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= +modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= +modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= +modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= +modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI= +modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.46.1 h1:eFJ2ShBLIEnUWlLy12raN0Z1plqmFX9Qe3rjQTKt6sU= +modernc.org/sqlite v1.46.1/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go new file mode 100644 index 0000000..21ce956 --- /dev/null +++ b/pkg/fetch/celestia_node.go @@ -0,0 +1,283 @@ +package fetch + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + "time" + + jsonrpc "github.com/filecoin-project/go-jsonrpc" + "github.com/rs/zerolog" + + "github.com/evstack/apex/pkg/types" +) + +// headerAPI defines the JSON-RPC stubs for the Celestia "header" namespace. +type headerAPI struct { + GetByHeight func(ctx context.Context, height uint64) (json.RawMessage, error) + NetworkHead func(ctx context.Context) (json.RawMessage, error) + Subscribe func(ctx context.Context) (<-chan json.RawMessage, error) +} + +// blobAPI defines the JSON-RPC stubs for the Celestia "blob" namespace. +type blobAPI struct { + GetAll func(ctx context.Context, height uint64, namespaces [][]byte) (json.RawMessage, error) +} + +// CelestiaNodeFetcher implements DataFetcher using a Celestia node's JSON-RPC API. +type CelestiaNodeFetcher struct { + header headerAPI + blob blobAPI + headerCloser jsonrpc.ClientCloser + blobCloser jsonrpc.ClientCloser + log zerolog.Logger + mu sync.Mutex + closed bool +} + +// NewCelestiaNodeFetcher connects to a Celestia node at the given WebSocket address. +func NewCelestiaNodeFetcher(ctx context.Context, addr, token string, log zerolog.Logger) (*CelestiaNodeFetcher, error) { + headers := http.Header{} + if token != "" { + headers.Set("Authorization", "Bearer "+token) + } + + f := &CelestiaNodeFetcher{log: log} + + var err error + f.headerCloser, err = jsonrpc.NewClient(ctx, addr, "header", &f.header, headers) + if err != nil { + return nil, fmt.Errorf("connect header client: %w", err) + } + + f.blobCloser, err = jsonrpc.NewClient(ctx, addr, "blob", &f.blob, headers) + if err != nil { + f.headerCloser() + return nil, fmt.Errorf("connect blob client: %w", err) + } + + return f, nil +} + +func (f *CelestiaNodeFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { + raw, err := f.header.GetByHeight(ctx, height) + if err != nil { + return nil, fmt.Errorf("header.GetByHeight(%d): %w", height, err) + } + return mapHeader(raw) +} + +func (f *CelestiaNodeFetcher) GetBlobs(ctx context.Context, height uint64, namespaces []types.Namespace) ([]types.Blob, error) { + nsBytes := namespacesToBytes(namespaces) + raw, err := f.blob.GetAll(ctx, height, nsBytes) + if err != nil { + if isNotFoundErr(err) { + return nil, nil + } + return nil, fmt.Errorf("blob.GetAll(%d): %w", height, err) + } + + return mapBlobs(raw, height) +} + +func (f *CelestiaNodeFetcher) GetNetworkHead(ctx context.Context) (*types.Header, error) { + raw, err := f.header.NetworkHead(ctx) + if err != nil { + return nil, fmt.Errorf("header.NetworkHead: %w", err) + } + return mapHeader(raw) +} + +func (f *CelestiaNodeFetcher) SubscribeHeaders(ctx context.Context) (<-chan *types.Header, error) { + rawCh, err := f.header.Subscribe(ctx) + if err != nil { + return nil, fmt.Errorf("header.Subscribe: %w", err) + } + + out := make(chan *types.Header, 64) + go func() { + defer close(out) + for { + select { + case raw, ok := <-rawCh: + if !ok { + return + } + hdr, err := mapHeader(raw) + if err != nil { + f.log.Error().Err(err).Msg("failed to parse subscribed header") + continue + } + select { + case out <- hdr: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + +func (f *CelestiaNodeFetcher) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return nil + } + f.closed = true + f.headerCloser() + f.blobCloser() + return nil +} + +// rpcExtendedHeader is the minimal JSON structure we parse from Celestia's ExtendedHeader. +type rpcExtendedHeader struct { + Header rpcCometHeader `json:"header"` + Commit rpcCommit `json:"commit"` +} + +type rpcCometHeader struct { + Height jsonInt64 `json:"height"` + Time string `json:"time"` + DataHash hexBytes `json:"data_hash"` +} + +type rpcCommit struct { + BlockID rpcBlockID `json:"block_id"` +} + +type rpcBlockID struct { + Hash hexBytes `json:"hash"` +} + +// rpcBlob is the minimal JSON structure we parse from Celestia's blob response. +type rpcBlob struct { + Namespace []byte `json:"namespace"` + Data []byte `json:"data"` + ShareVersion uint32 `json:"share_version"` + Commitment []byte `json:"commitment"` + Index int `json:"index"` +} + +func mapHeader(raw json.RawMessage) (*types.Header, error) { + var h rpcExtendedHeader + if err := json.Unmarshal(raw, &h); err != nil { + return nil, fmt.Errorf("unmarshal header: %w", err) + } + + t, err := time.Parse(time.RFC3339Nano, h.Header.Time) + if err != nil { + return nil, fmt.Errorf("parse header time %q: %w", h.Header.Time, err) + } + + return &types.Header{ + Height: uint64(h.Header.Height), + Hash: []byte(h.Commit.BlockID.Hash), + DataHash: []byte(h.Header.DataHash), + Time: t, + RawHeader: []byte(raw), + }, nil +} + +func mapBlobs(raw json.RawMessage, height uint64) ([]types.Blob, error) { + // Celestia returns null/empty for no blobs. + if len(raw) == 0 || string(raw) == "null" { + return nil, nil + } + + var rpcBlobs []rpcBlob + if err := json.Unmarshal(raw, &rpcBlobs); err != nil { + return nil, fmt.Errorf("unmarshal blobs: %w", err) + } + + blobs := make([]types.Blob, len(rpcBlobs)) + for i := range rpcBlobs { + b := &rpcBlobs[i] + var ns types.Namespace + if len(b.Namespace) != types.NamespaceSize { + return nil, fmt.Errorf("blob %d: invalid namespace size %d", i, len(b.Namespace)) + } + copy(ns[:], b.Namespace) + + blobs[i] = types.Blob{ + Height: height, + Namespace: ns, + Data: b.Data, + Commitment: b.Commitment, + ShareVersion: b.ShareVersion, + Index: b.Index, + } + } + return blobs, nil +} + +func namespacesToBytes(nss []types.Namespace) [][]byte { + out := make([][]byte, len(nss)) + for i, ns := range nss { + b := make([]byte, types.NamespaceSize) + copy(b, ns[:]) + out[i] = b + } + return out +} + +func isNotFoundErr(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "blob: not found") || + strings.Contains(msg, "header: not found") +} + +// jsonInt64 handles CometBFT's int64 fields encoded as JSON strings. +type jsonInt64 int64 + +func (i *jsonInt64) UnmarshalJSON(data []byte) error { + // Try string first (CometBFT style: "12345"). + var s string + if err := json.Unmarshal(data, &s); err == nil { + n, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return fmt.Errorf("parse int64 string %q: %w", s, err) + } + *i = jsonInt64(n) + return nil + } + // Fall back to number. + var n int64 + if err := json.Unmarshal(data, &n); err != nil { + return fmt.Errorf("parse int64: %w", err) + } + *i = jsonInt64(n) + return nil +} + +// hexBytes handles CometBFT's HexBytes (uppercase hex-encoded strings in JSON). +type hexBytes []byte + +func (h *hexBytes) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + if s == "" { + *h = nil + return nil + } + b, err := hex.DecodeString(s) + if err != nil { + return fmt.Errorf("decode hex: %w", err) + } + *h = b + return nil +} diff --git a/pkg/fetch/celestia_node_test.go b/pkg/fetch/celestia_node_test.go new file mode 100644 index 0000000..88bc1a9 --- /dev/null +++ b/pkg/fetch/celestia_node_test.go @@ -0,0 +1,226 @@ +package fetch + +import ( + "encoding/json" + "errors" + "testing" + "time" + + "github.com/evstack/apex/pkg/types" +) + +func TestMapHeader(t *testing.T) { + raw := json.RawMessage(`{ + "header": { + "height": "12345", + "time": "2025-06-15T10:30:00.123456789Z", + "data_hash": "AABBCCDD" + }, + "commit": { + "block_id": { + "hash": "11223344" + } + } + }`) + + hdr, err := mapHeader(raw) + if err != nil { + t.Fatalf("mapHeader: %v", err) + } + + if hdr.Height != 12345 { + t.Errorf("Height = %d, want 12345", hdr.Height) + } + wantTime := time.Date(2025, 6, 15, 10, 30, 0, 123456789, time.UTC) + if !hdr.Time.Equal(wantTime) { + t.Errorf("Time = %v, want %v", hdr.Time, wantTime) + } + if len(hdr.Hash) != 4 { + t.Errorf("Hash len = %d, want 4", len(hdr.Hash)) + } + if hdr.Hash[0] != 0x11 || hdr.Hash[1] != 0x22 { + t.Errorf("Hash = %x, want 11223344", hdr.Hash) + } + if len(hdr.DataHash) != 4 { + t.Errorf("DataHash len = %d, want 4", len(hdr.DataHash)) + } + if hdr.DataHash[0] != 0xAA || hdr.DataHash[1] != 0xBB { + t.Errorf("DataHash = %x, want AABBCCDD", hdr.DataHash) + } + if len(hdr.RawHeader) == 0 { + t.Error("RawHeader is empty") + } +} + +func TestMapHeaderNumericHeight(t *testing.T) { + // Some nodes may return height as a number instead of string. + raw := json.RawMessage(`{ + "header": { + "height": 42, + "time": "2025-01-01T00:00:00Z", + "data_hash": "" + }, + "commit": { + "block_id": { + "hash": "" + } + } + }`) + + hdr, err := mapHeader(raw) + if err != nil { + t.Fatalf("mapHeader: %v", err) + } + if hdr.Height != 42 { + t.Errorf("Height = %d, want 42", hdr.Height) + } +} + +func TestMapBlobs(t *testing.T) { + // Construct a 29-byte namespace, base64-encoded. + var ns types.Namespace + ns[0] = 0x00 + ns[types.NamespaceSize-1] = 0x01 + nsJSON, _ := json.Marshal(ns[:]) + + raw := json.RawMessage(`[ + { + "namespace": ` + string(nsJSON) + `, + "data": "aGVsbG8=", + "share_version": 0, + "commitment": "Y29tbWl0", + "index": 0 + }, + { + "namespace": ` + string(nsJSON) + `, + "data": "d29ybGQ=", + "share_version": 1, + "commitment": "Y29tbWl0Mg==", + "index": 1 + } + ]`) + + blobs, err := mapBlobs(raw, 100) + if err != nil { + t.Fatalf("mapBlobs: %v", err) + } + if len(blobs) != 2 { + t.Fatalf("got %d blobs, want 2", len(blobs)) + } + + if blobs[0].Height != 100 { + t.Errorf("blob[0].Height = %d, want 100", blobs[0].Height) + } + if blobs[0].Namespace != ns { + t.Errorf("blob[0].Namespace = %v, want %v", blobs[0].Namespace, ns) + } + if string(blobs[0].Data) != "hello" { + t.Errorf("blob[0].Data = %q, want %q", blobs[0].Data, "hello") + } + if blobs[0].Index != 0 { + t.Errorf("blob[0].Index = %d, want 0", blobs[0].Index) + } + if blobs[1].Index != 1 { + t.Errorf("blob[1].Index = %d, want 1", blobs[1].Index) + } + if blobs[1].ShareVersion != 1 { + t.Errorf("blob[1].ShareVersion = %d, want 1", blobs[1].ShareVersion) + } +} + +func TestMapBlobsEmpty(t *testing.T) { + for _, input := range []string{"null", "[]", ""} { + blobs, err := mapBlobs(json.RawMessage(input), 1) + if err != nil { + t.Errorf("mapBlobs(%q): %v", input, err) + } + if len(blobs) != 0 { + t.Errorf("mapBlobs(%q) = %v, want empty", input, blobs) + } + } +} + +func TestIsNotFoundErr(t *testing.T) { + tests := []struct { + err error + want bool + }{ + {nil, false}, + {errors.New("something else"), false}, + {errors.New("blob: not found"), true}, + {errors.New("header: not found"), true}, + {errors.New("rpc error: blob: not found at height 100"), true}, + } + for _, tt := range tests { + got := isNotFoundErr(tt.err) + if got != tt.want { + t.Errorf("isNotFoundErr(%v) = %v, want %v", tt.err, got, tt.want) + } + } +} + +func TestNamespacesToBytes(t *testing.T) { + var ns1, ns2 types.Namespace + ns1[0] = 1 + ns2[0] = 2 + + out := namespacesToBytes([]types.Namespace{ns1, ns2}) + if len(out) != 2 { + t.Fatalf("got %d, want 2", len(out)) + } + if len(out[0]) != types.NamespaceSize { + t.Errorf("out[0] len = %d, want %d", len(out[0]), types.NamespaceSize) + } + if out[0][0] != 1 || out[1][0] != 2 { + t.Errorf("values mismatch: %v, %v", out[0], out[1]) + } +} + +func TestJsonInt64(t *testing.T) { + tests := []struct { + input string + want int64 + }{ + {`"12345"`, 12345}, + {`42`, 42}, + {`"0"`, 0}, + {`0`, 0}, + } + for _, tt := range tests { + var got jsonInt64 + if err := json.Unmarshal([]byte(tt.input), &got); err != nil { + t.Errorf("unmarshal %s: %v", tt.input, err) + continue + } + if int64(got) != tt.want { + t.Errorf("unmarshal %s = %d, want %d", tt.input, got, tt.want) + } + } +} + +func TestHexBytes(t *testing.T) { + tests := []struct { + input string + want []byte + }{ + {`"AABB"`, []byte{0xAA, 0xBB}}, + {`"aabb"`, []byte{0xAA, 0xBB}}, + {`""`, nil}, + } + for _, tt := range tests { + var got hexBytes + if err := json.Unmarshal([]byte(tt.input), &got); err != nil { + t.Errorf("unmarshal %s: %v", tt.input, err) + continue + } + if len(got) != len(tt.want) { + t.Errorf("unmarshal %s: len = %d, want %d", tt.input, len(got), len(tt.want)) + continue + } + for i := range got { + if got[i] != tt.want[i] { + t.Errorf("unmarshal %s: byte %d = %02x, want %02x", tt.input, i, got[i], tt.want[i]) + } + } + } +} diff --git a/pkg/store/migrations/001_init.sql b/pkg/store/migrations/001_init.sql new file mode 100644 index 0000000..ff00fed --- /dev/null +++ b/pkg/store/migrations/001_init.sql @@ -0,0 +1,33 @@ +CREATE TABLE IF NOT EXISTS sync_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + state INTEGER NOT NULL DEFAULT 0, + latest_height INTEGER NOT NULL DEFAULT 0, + network_height INTEGER NOT NULL DEFAULT 0, + updated_at INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS headers ( + height INTEGER PRIMARY KEY, + hash BLOB NOT NULL, + data_hash BLOB NOT NULL, + time_ns INTEGER NOT NULL, + raw_header BLOB NOT NULL +); + +CREATE TABLE IF NOT EXISTS blobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + height INTEGER NOT NULL, + namespace BLOB NOT NULL, + commitment BLOB NOT NULL, + data BLOB NOT NULL, + share_version INTEGER NOT NULL DEFAULT 0, + signer BLOB, + blob_index INTEGER NOT NULL, + UNIQUE(height, namespace, commitment) +); + +CREATE INDEX IF NOT EXISTS idx_blobs_ns_height ON blobs(namespace, height); + +CREATE TABLE IF NOT EXISTS namespaces ( + namespace BLOB PRIMARY KEY +); diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go new file mode 100644 index 0000000..60aea2e --- /dev/null +++ b/pkg/store/sqlite.go @@ -0,0 +1,275 @@ +package store + +import ( + "context" + "database/sql" + "embed" + "errors" + "fmt" + "time" + + "github.com/evstack/apex/pkg/types" + + _ "modernc.org/sqlite" +) + +//go:embed migrations/*.sql +var migrations embed.FS + +// SQLiteStore implements Store using modernc.org/sqlite (CGo-free). +type SQLiteStore struct { + db *sql.DB +} + +// Open creates or opens a SQLite database at the given path. +// The database is configured with WAL journal mode, a single connection, +// and a 5-second busy timeout. +func Open(path string) (*SQLiteStore, error) { + db, err := sql.Open("sqlite", path) + if err != nil { + return nil, fmt.Errorf("open sqlite: %w", err) + } + + // TODO(phase2): split into a write pool (max 1 conn) and a read pool + // (max N conns) so API reads don't block behind sync writes. WAL mode + // supports concurrent readers alongside a single writer. + db.SetMaxOpenConns(1) + + if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil { + _ = db.Close() + return nil, fmt.Errorf("set WAL mode: %w", err) + } + if _, err := db.Exec("PRAGMA busy_timeout=5000"); err != nil { + _ = db.Close() + return nil, fmt.Errorf("set busy_timeout: %w", err) + } + if _, err := db.Exec("PRAGMA foreign_keys=ON"); err != nil { + _ = db.Close() + return nil, fmt.Errorf("set foreign_keys: %w", err) + } + + s := &SQLiteStore{db: db} + if err := s.migrate(); err != nil { + _ = db.Close() + return nil, fmt.Errorf("migrate: %w", err) + } + return s, nil +} + +func (s *SQLiteStore) migrate() error { + var version int + if err := s.db.QueryRow("PRAGMA user_version").Scan(&version); err != nil { + return fmt.Errorf("read user_version: %w", err) + } + + if version >= 1 { + return nil + } + + ddl, err := migrations.ReadFile("migrations/001_init.sql") + if err != nil { + return fmt.Errorf("read migration: %w", err) + } + + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf("begin migration tx: %w", err) + } + defer tx.Rollback() //nolint:errcheck + + if _, err := tx.Exec(string(ddl)); err != nil { + return fmt.Errorf("exec migration: %w", err) + } + if _, err := tx.Exec("PRAGMA user_version = 1"); err != nil { + return fmt.Errorf("set user_version: %w", err) + } + + return tx.Commit() +} + +func (s *SQLiteStore) PutBlobs(ctx context.Context, blobs []types.Blob) error { + if len(blobs) == 0 { + return nil + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer tx.Rollback() //nolint:errcheck + + stmt, err := tx.PrepareContext(ctx, + `INSERT OR IGNORE INTO blobs (height, namespace, commitment, data, share_version, signer, blob_index) + VALUES (?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + return fmt.Errorf("prepare insert blob: %w", err) + } + defer stmt.Close() //nolint:errcheck + + for i := range blobs { + b := &blobs[i] + if _, err := stmt.ExecContext(ctx, + b.Height, b.Namespace[:], b.Commitment, b.Data, b.ShareVersion, b.Signer, b.Index, + ); err != nil { + return fmt.Errorf("insert blob at height %d index %d: %w", b.Height, b.Index, err) + } + } + + return tx.Commit() +} + +func (s *SQLiteStore) GetBlob(ctx context.Context, ns types.Namespace, height uint64, index int) (*types.Blob, error) { + row := s.db.QueryRowContext(ctx, + `SELECT height, namespace, commitment, data, share_version, signer, blob_index + FROM blobs WHERE namespace = ? AND height = ? AND blob_index = ?`, + ns[:], height, index) + + return scanBlob(row) +} + +func (s *SQLiteStore) GetBlobs(ctx context.Context, ns types.Namespace, startHeight, endHeight uint64) ([]types.Blob, error) { + rows, err := s.db.QueryContext(ctx, + `SELECT height, namespace, commitment, data, share_version, signer, blob_index + FROM blobs WHERE namespace = ? AND height >= ? AND height <= ? + ORDER BY height, blob_index`, + ns[:], startHeight, endHeight) + if err != nil { + return nil, fmt.Errorf("query blobs: %w", err) + } + defer rows.Close() //nolint:errcheck + + var blobs []types.Blob + for rows.Next() { + b, err := scanBlobRow(rows) + if err != nil { + return nil, err + } + blobs = append(blobs, b) + } + return blobs, rows.Err() +} + +func (s *SQLiteStore) PutHeader(ctx context.Context, header *types.Header) error { + _, err := s.db.ExecContext(ctx, + `INSERT OR IGNORE INTO headers (height, hash, data_hash, time_ns, raw_header) + VALUES (?, ?, ?, ?, ?)`, + header.Height, header.Hash, header.DataHash, header.Time.UnixNano(), header.RawHeader) + if err != nil { + return fmt.Errorf("insert header at height %d: %w", header.Height, err) + } + return nil +} + +func (s *SQLiteStore) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { + var h types.Header + var timeNs int64 + err := s.db.QueryRowContext(ctx, + `SELECT height, hash, data_hash, time_ns, raw_header FROM headers WHERE height = ?`, + height).Scan(&h.Height, &h.Hash, &h.DataHash, &timeNs, &h.RawHeader) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("query header at height %d: %w", height, err) + } + h.Time = time.Unix(0, timeNs) + return &h, nil +} + +func (s *SQLiteStore) PutNamespace(ctx context.Context, ns types.Namespace) error { + _, err := s.db.ExecContext(ctx, + `INSERT OR IGNORE INTO namespaces (namespace) VALUES (?)`, ns[:]) + if err != nil { + return fmt.Errorf("insert namespace: %w", err) + } + return nil +} + +func (s *SQLiteStore) GetNamespaces(ctx context.Context) ([]types.Namespace, error) { + rows, err := s.db.QueryContext(ctx, `SELECT namespace FROM namespaces`) + if err != nil { + return nil, fmt.Errorf("query namespaces: %w", err) + } + defer rows.Close() //nolint:errcheck + + var namespaces []types.Namespace + for rows.Next() { + var nsBytes []byte + if err := rows.Scan(&nsBytes); err != nil { + return nil, fmt.Errorf("scan namespace: %w", err) + } + if len(nsBytes) != types.NamespaceSize { + return nil, fmt.Errorf("invalid namespace size: got %d, want %d", len(nsBytes), types.NamespaceSize) + } + var ns types.Namespace + copy(ns[:], nsBytes) + namespaces = append(namespaces, ns) + } + return namespaces, rows.Err() +} + +func (s *SQLiteStore) GetSyncState(ctx context.Context) (*types.SyncStatus, error) { + var state int + var latestHeight, networkHeight uint64 + err := s.db.QueryRowContext(ctx, + `SELECT state, latest_height, network_height FROM sync_state WHERE id = 1`). + Scan(&state, &latestHeight, &networkHeight) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("query sync_state: %w", err) + } + return &types.SyncStatus{ + State: types.SyncState(state), + LatestHeight: latestHeight, + NetworkHeight: networkHeight, + }, nil +} + +func (s *SQLiteStore) SetSyncState(ctx context.Context, status types.SyncStatus) error { + _, err := s.db.ExecContext(ctx, + `INSERT INTO sync_state (id, state, latest_height, network_height, updated_at) + VALUES (1, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + state = excluded.state, + latest_height = excluded.latest_height, + network_height = excluded.network_height, + updated_at = excluded.updated_at`, + int(status.State), status.LatestHeight, status.NetworkHeight, time.Now().UnixNano()) + if err != nil { + return fmt.Errorf("upsert sync_state: %w", err) + } + return nil +} + +func (s *SQLiteStore) Close() error { + return s.db.Close() +} + +// scanBlob scans a single blob from a *sql.Row. +func scanBlob(row *sql.Row) (*types.Blob, error) { + var b types.Blob + var nsBytes []byte + err := row.Scan(&b.Height, &nsBytes, &b.Commitment, &b.Data, &b.ShareVersion, &b.Signer, &b.Index) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + return nil, fmt.Errorf("scan blob: %w", err) + } + copy(b.Namespace[:], nsBytes) + return &b, nil +} + +// scanBlobRow scans a single blob from *sql.Rows. +func scanBlobRow(rows *sql.Rows) (types.Blob, error) { + var b types.Blob + var nsBytes []byte + err := rows.Scan(&b.Height, &nsBytes, &b.Commitment, &b.Data, &b.ShareVersion, &b.Signer, &b.Index) + if err != nil { + return types.Blob{}, fmt.Errorf("scan blob row: %w", err) + } + copy(b.Namespace[:], nsBytes) + return b, nil +} diff --git a/pkg/store/sqlite_test.go b/pkg/store/sqlite_test.go new file mode 100644 index 0000000..1c69872 --- /dev/null +++ b/pkg/store/sqlite_test.go @@ -0,0 +1,282 @@ +package store + +import ( + "context" + "errors" + "path/filepath" + "testing" + "time" + + "github.com/evstack/apex/pkg/types" +) + +func openTestDB(t *testing.T) *SQLiteStore { + t.Helper() + path := filepath.Join(t.TempDir(), "test.db") + s, err := Open(path) + if err != nil { + t.Fatalf("Open(%q): %v", path, err) + } + t.Cleanup(func() { _ = s.Close() }) + return s +} + +func testNamespace(b byte) types.Namespace { + var ns types.Namespace + ns[types.NamespaceSize-1] = b + return ns +} + +func TestPutGetHeader(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + + want := &types.Header{ + Height: 42, + Hash: []byte("hash42"), + DataHash: []byte("datahash42"), + Time: time.Date(2025, 1, 15, 12, 0, 0, 0, time.UTC), + RawHeader: []byte("raw42"), + } + + if err := s.PutHeader(ctx, want); err != nil { + t.Fatalf("PutHeader: %v", err) + } + + got, err := s.GetHeader(ctx, 42) + if err != nil { + t.Fatalf("GetHeader: %v", err) + } + + if got.Height != want.Height { + t.Errorf("Height = %d, want %d", got.Height, want.Height) + } + if string(got.Hash) != string(want.Hash) { + t.Errorf("Hash = %q, want %q", got.Hash, want.Hash) + } + if string(got.DataHash) != string(want.DataHash) { + t.Errorf("DataHash = %q, want %q", got.DataHash, want.DataHash) + } + if !got.Time.Equal(want.Time) { + t.Errorf("Time = %v, want %v", got.Time, want.Time) + } + if string(got.RawHeader) != string(want.RawHeader) { + t.Errorf("RawHeader = %q, want %q", got.RawHeader, want.RawHeader) + } +} + +func TestGetHeaderNotFound(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + + _, err := s.GetHeader(ctx, 999) + if !errors.Is(err, ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } +} + +func TestPutGetBlobs(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + ns := testNamespace(1) + + blobs := []types.Blob{ + {Height: 10, Namespace: ns, Commitment: []byte("c1"), Data: []byte("d1"), ShareVersion: 0, Index: 0}, + {Height: 10, Namespace: ns, Commitment: []byte("c2"), Data: []byte("d2"), ShareVersion: 0, Index: 1}, + {Height: 11, Namespace: ns, Commitment: []byte("c3"), Data: []byte("d3"), ShareVersion: 0, Index: 0}, + } + if err := s.PutBlobs(ctx, blobs); err != nil { + t.Fatalf("PutBlobs: %v", err) + } + + // GetBlob single + got, err := s.GetBlob(ctx, ns, 10, 0) + if err != nil { + t.Fatalf("GetBlob: %v", err) + } + if string(got.Data) != "d1" { + t.Errorf("GetBlob data = %q, want %q", got.Data, "d1") + } + if got.Height != 10 { + t.Errorf("GetBlob height = %d, want 10", got.Height) + } + + // GetBlobs range + all, err := s.GetBlobs(ctx, ns, 10, 11) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + if len(all) != 3 { + t.Fatalf("GetBlobs returned %d blobs, want 3", len(all)) + } + // Verify ordering + if all[0].Index != 0 || all[0].Height != 10 { + t.Errorf("first blob: height=%d index=%d", all[0].Height, all[0].Index) + } + if all[1].Index != 1 || all[1].Height != 10 { + t.Errorf("second blob: height=%d index=%d", all[1].Height, all[1].Index) + } + if all[2].Index != 0 || all[2].Height != 11 { + t.Errorf("third blob: height=%d index=%d", all[2].Height, all[2].Index) + } +} + +func TestGetBlobNotFound(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + + _, err := s.GetBlob(ctx, testNamespace(1), 999, 0) + if !errors.Is(err, ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } +} + +func TestPutBlobsIdempotent(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + ns := testNamespace(1) + + blob := types.Blob{ + Height: 10, Namespace: ns, Commitment: []byte("c1"), + Data: []byte("d1"), ShareVersion: 0, Index: 0, + } + + // Insert twice — second should be a no-op. + if err := s.PutBlobs(ctx, []types.Blob{blob}); err != nil { + t.Fatalf("PutBlobs (first): %v", err) + } + if err := s.PutBlobs(ctx, []types.Blob{blob}); err != nil { + t.Fatalf("PutBlobs (second): %v", err) + } + + all, err := s.GetBlobs(ctx, ns, 10, 10) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + if len(all) != 1 { + t.Fatalf("expected 1 blob after idempotent insert, got %d", len(all)) + } +} + +func TestPutHeaderIdempotent(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + + hdr := &types.Header{ + Height: 5, Hash: []byte("h"), DataHash: []byte("dh"), + Time: time.Now(), RawHeader: []byte("r"), + } + if err := s.PutHeader(ctx, hdr); err != nil { + t.Fatalf("PutHeader (first): %v", err) + } + if err := s.PutHeader(ctx, hdr); err != nil { + t.Fatalf("PutHeader (second): %v", err) + } +} + +func TestPutGetNamespaces(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + + ns1 := testNamespace(1) + ns2 := testNamespace(2) + + if err := s.PutNamespace(ctx, ns1); err != nil { + t.Fatalf("PutNamespace: %v", err) + } + if err := s.PutNamespace(ctx, ns2); err != nil { + t.Fatalf("PutNamespace: %v", err) + } + // Idempotent + if err := s.PutNamespace(ctx, ns1); err != nil { + t.Fatalf("PutNamespace (dup): %v", err) + } + + nss, err := s.GetNamespaces(ctx) + if err != nil { + t.Fatalf("GetNamespaces: %v", err) + } + if len(nss) != 2 { + t.Fatalf("expected 2 namespaces, got %d", len(nss)) + } +} + +func TestSyncStateUpsertAndRead(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + + // Fresh DB returns ErrNotFound. + _, err := s.GetSyncState(ctx) + if !errors.Is(err, ErrNotFound) { + t.Fatalf("expected ErrNotFound on fresh DB, got %v", err) + } + + // Set initial state. + want := types.SyncStatus{State: types.Backfilling, LatestHeight: 100, NetworkHeight: 200} + if err := s.SetSyncState(ctx, want); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + got, err := s.GetSyncState(ctx) + if err != nil { + t.Fatalf("GetSyncState: %v", err) + } + if *got != want { + t.Errorf("SyncState = %+v, want %+v", *got, want) + } + + // Update (upsert). + want2 := types.SyncStatus{State: types.Streaming, LatestHeight: 200, NetworkHeight: 300} + if err := s.SetSyncState(ctx, want2); err != nil { + t.Fatalf("SetSyncState (update): %v", err) + } + got2, err := s.GetSyncState(ctx) + if err != nil { + t.Fatalf("GetSyncState (after update): %v", err) + } + if *got2 != want2 { + t.Errorf("SyncState = %+v, want %+v", *got2, want2) + } +} + +func TestMigrationsIdempotent(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.db") + + s1, err := Open(path) + if err != nil { + t.Fatalf("Open (first): %v", err) + } + _ = s1.Close() + + s2, err := Open(path) + if err != nil { + t.Fatalf("Open (second): %v", err) + } + _ = s2.Close() +} + +func TestPutBlobsEmpty(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + + if err := s.PutBlobs(ctx, nil); err != nil { + t.Fatalf("PutBlobs(nil): %v", err) + } + if err := s.PutBlobs(ctx, []types.Blob{}); err != nil { + t.Fatalf("PutBlobs([]): %v", err) + } +} + +func TestGetBlobsEmptyRange(t *testing.T) { + s := openTestDB(t) + ctx := context.Background() + + blobs, err := s.GetBlobs(ctx, testNamespace(1), 1, 100) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + if len(blobs) != 0 { + t.Fatalf("expected 0 blobs, got %d", len(blobs)) + } +} diff --git a/pkg/sync/backfill.go b/pkg/sync/backfill.go new file mode 100644 index 0000000..1138d5a --- /dev/null +++ b/pkg/sync/backfill.go @@ -0,0 +1,137 @@ +package syncer + +import ( + "context" + "fmt" + "sync" + + "github.com/rs/zerolog" + + "github.com/evstack/apex/pkg/fetch" + "github.com/evstack/apex/pkg/store" + "github.com/evstack/apex/pkg/types" +) + +// Backfiller fetches historical blocks in batched, concurrent chunks. +type Backfiller struct { + store store.Store + fetcher fetch.DataFetcher + batchSize int + concurrency int + log zerolog.Logger +} + +// Run backfills from fromHeight to toHeight (inclusive). +func (b *Backfiller) Run(ctx context.Context, fromHeight, toHeight uint64) error { + namespaces, err := b.store.GetNamespaces(ctx) + if err != nil { + return fmt.Errorf("get namespaces: %w", err) + } + + for batchStart := fromHeight; batchStart <= toHeight; batchStart += uint64(b.batchSize) { + batchEnd := batchStart + uint64(b.batchSize) - 1 + if batchEnd > toHeight { + batchEnd = toHeight + } + + if err := b.processBatch(ctx, batchStart, batchEnd, namespaces); err != nil { + return err + } + + // Checkpoint after each batch. + if err := b.store.SetSyncState(ctx, types.SyncStatus{ + State: types.Backfilling, + LatestHeight: batchEnd, + NetworkHeight: toHeight, + }); err != nil { + return fmt.Errorf("checkpoint at height %d: %w", batchEnd, err) + } + + b.log.Debug(). + Uint64("batch_start", batchStart). + Uint64("batch_end", batchEnd). + Msg("batch complete") + } + + return nil +} + +func (b *Backfiller) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error { + heights := make(chan uint64, to-from+1) + for h := from; h <= to; h++ { + heights <- h + } + close(heights) + + workers := b.concurrency + if int(to-from+1) < workers { + workers = int(to - from + 1) + } + + var ( + wg sync.WaitGroup + mu sync.Mutex + firstErr error + ) + + for range workers { + wg.Add(1) + go func() { + defer wg.Done() + for height := range heights { + // Check for prior error or context cancellation. + mu.Lock() + failed := firstErr != nil + mu.Unlock() + if failed { + return + } + + if err := ctx.Err(); err != nil { + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() + return + } + + if err := b.processHeight(ctx, height, namespaces); err != nil { + mu.Lock() + if firstErr == nil { + firstErr = fmt.Errorf("height %d: %w", height, err) + } + mu.Unlock() + return + } + } + }() + } + + wg.Wait() + return firstErr +} + +func (b *Backfiller) processHeight(ctx context.Context, height uint64, namespaces []types.Namespace) error { + hdr, err := b.fetcher.GetHeader(ctx, height) + if err != nil { + return fmt.Errorf("get header: %w", err) + } + if err := b.store.PutHeader(ctx, hdr); err != nil { + return fmt.Errorf("put header: %w", err) + } + + if len(namespaces) > 0 { + blobs, err := b.fetcher.GetBlobs(ctx, height, namespaces) + if err != nil { + return fmt.Errorf("get blobs: %w", err) + } + if len(blobs) > 0 { + if err := b.store.PutBlobs(ctx, blobs); err != nil { + return fmt.Errorf("put blobs: %w", err) + } + } + } + + return nil +} diff --git a/pkg/sync/backfill_test.go b/pkg/sync/backfill_test.go new file mode 100644 index 0000000..0c54323 --- /dev/null +++ b/pkg/sync/backfill_test.go @@ -0,0 +1,129 @@ +package syncer + +import ( + "context" + "fmt" + "testing" + + "github.com/evstack/apex/pkg/types" +) + +func TestBackfiller(t *testing.T) { + t.Run("BatchProcessing", func(t *testing.T) { + t.Parallel() + st := newMockStore() + ft := newMockFetcher(10) + + ns := types.Namespace{0: 1} + if err := st.PutNamespace(context.Background(), ns); err != nil { + t.Fatalf("PutNamespace: %v", err) + } + + for h := uint64(1); h <= 10; h++ { + ft.addHeader(makeHeader(h)) + ft.addBlobs(h, []types.Blob{ + {Height: h, Namespace: ns, Data: []byte("data"), Commitment: []byte(fmt.Sprintf("c%d", h)), Index: 0}, + }) + } + + bf := &Backfiller{ + store: st, + fetcher: ft, + batchSize: 3, + concurrency: 2, + } + + if err := bf.Run(context.Background(), 1, 10); err != nil { + t.Fatalf("Run: %v", err) + } + + for h := uint64(1); h <= 10; h++ { + if _, err := st.GetHeader(context.Background(), h); err != nil { + t.Errorf("header %d not stored: %v", h, err) + } + } + + st.mu.Lock() + blobCount := len(st.blobs) + st.mu.Unlock() + if blobCount != 10 { + t.Errorf("stored %d blobs, want 10", blobCount) + } + + ss, err := st.GetSyncState(context.Background()) + if err != nil { + t.Fatalf("GetSyncState: %v", err) + } + if ss.LatestHeight != 10 { + t.Errorf("checkpoint LatestHeight = %d, want 10", ss.LatestHeight) + } + }) + + t.Run("ContextCancellation", func(t *testing.T) { + t.Parallel() + st := newMockStore() + ft := newMockFetcher(100) + + for h := uint64(1); h <= 100; h++ { + ft.addHeader(makeHeader(h)) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + bf := &Backfiller{ + store: st, + fetcher: ft, + batchSize: 10, + concurrency: 4, + } + + if err := bf.Run(ctx, 1, 100); err == nil { + t.Fatal("expected error from cancelled context") + } + }) + + t.Run("FetchError", func(t *testing.T) { + t.Parallel() + st := newMockStore() + ft := newMockFetcher(5) + + // Only add headers 1-3, so 4 and 5 will fail. + for h := uint64(1); h <= 3; h++ { + ft.addHeader(makeHeader(h)) + } + + bf := &Backfiller{ + store: st, + fetcher: ft, + batchSize: 5, + concurrency: 1, + } + + if err := bf.Run(context.Background(), 1, 5); err == nil { + t.Fatal("expected error when fetcher returns not found") + } + }) + + t.Run("SingleHeight", func(t *testing.T) { + t.Parallel() + st := newMockStore() + ft := newMockFetcher(1) + ft.addHeader(makeHeader(1)) + + bf := &Backfiller{ + store: st, + fetcher: ft, + batchSize: 10, + concurrency: 4, + } + + if err := bf.Run(context.Background(), 1, 1); err != nil { + t.Fatalf("Run: %v", err) + } + + if _, err := st.GetHeader(context.Background(), 1); err != nil { + t.Errorf("header 1 not stored: %v", err) + } + }) +} diff --git a/pkg/sync/coordinator.go b/pkg/sync/coordinator.go index de6685b..a36739c 100644 --- a/pkg/sync/coordinator.go +++ b/pkg/sync/coordinator.go @@ -1,19 +1,31 @@ package syncer import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/rs/zerolog" + "github.com/evstack/apex/pkg/fetch" "github.com/evstack/apex/pkg/store" "github.com/evstack/apex/pkg/types" ) +// ErrGapDetected is returned by SubscriptionManager when a height gap is found. +var ErrGapDetected = errors.New("gap detected") + // Coordinator manages the sync lifecycle between a data fetcher and a store. type Coordinator struct { store store.Store fetcher fetch.DataFetcher state types.SyncState + stateMu sync.RWMutex batchSize int concurrency int startHeight uint64 + log zerolog.Logger } // Option configures a Coordinator. @@ -44,6 +56,11 @@ func WithStartHeight(h uint64) Option { return func(c *Coordinator) { c.startHeight = h } } +// WithLogger sets the logger for the coordinator and its sub-components. +func WithLogger(log zerolog.Logger) Option { + return func(c *Coordinator) { c.log = log } +} + // New creates a Coordinator with the given store, fetcher, and options. func New(s store.Store, f fetch.DataFetcher, opts ...Option) *Coordinator { coord := &Coordinator{ @@ -52,9 +69,106 @@ func New(s store.Store, f fetch.DataFetcher, opts ...Option) *Coordinator { state: types.Initializing, batchSize: 64, concurrency: 4, + log: zerolog.Nop(), } for _, opt := range opts { opt(coord) } return coord } + +// Status returns the current sync status (concurrent-safe). +func (c *Coordinator) Status() types.SyncStatus { + c.stateMu.RLock() + defer c.stateMu.RUnlock() + return types.SyncStatus{ + State: c.state, + } +} + +func (c *Coordinator) setState(s types.SyncState) { + c.stateMu.Lock() + c.state = s + c.stateMu.Unlock() +} + +// Run executes the sync lifecycle: initialize -> backfill -> stream. +// It blocks until ctx is cancelled or a fatal error occurs. +// On ErrGapDetected during streaming, it re-backfills and re-enters streaming. +func (c *Coordinator) Run(ctx context.Context) error { + for { + fromHeight, networkHeight, err := c.initialize(ctx) + if err != nil { + return fmt.Errorf("initialize: %w", err) + } + + if fromHeight <= networkHeight { + c.setState(types.Backfilling) + c.log.Info(). + Uint64("from", fromHeight). + Uint64("to", networkHeight). + Msg("starting backfill") + + bf := &Backfiller{ + store: c.store, + fetcher: c.fetcher, + batchSize: c.batchSize, + concurrency: c.concurrency, + log: c.log.With().Str("component", "backfiller").Logger(), + } + if err := bf.Run(ctx, fromHeight, networkHeight); err != nil { + return fmt.Errorf("backfill: %w", err) + } + c.log.Info().Uint64("height", networkHeight).Msg("backfill complete") + } + + c.setState(types.Streaming) + c.log.Info().Msg("entering streaming mode") + + sm := &SubscriptionManager{ + store: c.store, + fetcher: c.fetcher, + log: c.log.With().Str("component", "subscription").Logger(), + } + err = sm.Run(ctx) + if errors.Is(err, ErrGapDetected) { + c.log.Warn().Msg("gap detected, re-entering backfill") + continue + } + if err != nil { + return fmt.Errorf("streaming: %w", err) + } + // ctx was cancelled; clean exit. + return nil + } +} + +// initialize determines the starting height and network head. +func (c *Coordinator) initialize(ctx context.Context) (fromHeight, networkHeight uint64, err error) { + c.setState(types.Initializing) + + // Try to resume from persisted state. + ss, err := c.store.GetSyncState(ctx) + if err != nil && !errors.Is(err, store.ErrNotFound) { + return 0, 0, fmt.Errorf("get sync state: %w", err) + } + if ss != nil && ss.LatestHeight > 0 { + fromHeight = ss.LatestHeight + 1 + c.log.Info().Uint64("resumed_from", fromHeight).Msg("resuming from checkpoint") + } else { + fromHeight = c.startHeight + if fromHeight == 0 { + fromHeight = 1 + } + c.log.Info().Uint64("start_height", fromHeight).Msg("starting fresh") + } + + head, err := c.fetcher.GetNetworkHead(ctx) + if err != nil { + return 0, 0, fmt.Errorf("get network head: %w", err) + } + networkHeight = head.Height + c.log.Info().Uint64("network_height", networkHeight).Msg("network head") + + return fromHeight, networkHeight, nil +} diff --git a/pkg/sync/coordinator_test.go b/pkg/sync/coordinator_test.go new file mode 100644 index 0000000..bc77179 --- /dev/null +++ b/pkg/sync/coordinator_test.go @@ -0,0 +1,244 @@ +package syncer + +import ( + "context" + "testing" + "time" + + "github.com/evstack/apex/pkg/types" +) + +func makeHeader(height uint64) *types.Header { + return &types.Header{ + Height: height, + Hash: []byte("hash"), + DataHash: []byte("datahash"), + Time: time.Now(), + RawHeader: []byte("raw"), + } +} + +func TestCoordinatorFullCycle(t *testing.T) { + st := newMockStore() + ft := newMockFetcher(5) + + // Populate fetcher with headers 1..5. + for h := uint64(1); h <= 5; h++ { + ft.addHeader(makeHeader(h)) + } + + coord := New(st, ft, + WithStartHeight(1), + WithBatchSize(3), + WithConcurrency(2), + ) + + ctx, cancel := context.WithCancel(context.Background()) + + // Set up subscription channel that sends headers 6..8 then triggers cancel. + subCh := make(chan *types.Header, 10) + ft.mu.Lock() + ft.subCh = subCh + ft.mu.Unlock() + + go func() { + // Wait for backfill to finish before sending subscription headers. + for { + time.Sleep(10 * time.Millisecond) + s := coord.Status() + if s.State == types.Streaming { + break + } + } + for h := uint64(6); h <= 8; h++ { + ft.addHeader(makeHeader(h)) + subCh <- makeHeader(h) + } + // Give time for processing then cancel. + time.Sleep(50 * time.Millisecond) + cancel() + }() + + err := coord.Run(ctx) + if err != nil { + t.Fatalf("Run: %v", err) + } + + // Verify headers were stored. + for h := uint64(1); h <= 8; h++ { + if _, err := st.GetHeader(context.Background(), h); err != nil { + t.Errorf("header %d not stored: %v", h, err) + } + } + + // Verify sync state was checkpointed. + ss, err := st.GetSyncState(context.Background()) + if err != nil { + t.Fatalf("GetSyncState: %v", err) + } + if ss.LatestHeight != 8 { + t.Errorf("LatestHeight = %d, want 8", ss.LatestHeight) + } +} + +func TestCoordinatorResumeFromCheckpoint(t *testing.T) { + st := newMockStore() + + // Simulate prior run that synced to height 5. + if err := st.SetSyncState(context.Background(), types.SyncStatus{ + State: types.Streaming, + LatestHeight: 5, + }); err != nil { + t.Fatalf("SetSyncState: %v", err) + } + + ft := newMockFetcher(8) + for h := uint64(6); h <= 8; h++ { + ft.addHeader(makeHeader(h)) + } + + ctx, cancel := context.WithCancel(context.Background()) + + subCh := make(chan *types.Header, 10) + ft.mu.Lock() + ft.subCh = subCh + ft.mu.Unlock() + + coord := New(st, ft, WithBatchSize(10), WithConcurrency(2)) + + go func() { + for { + time.Sleep(10 * time.Millisecond) + s := coord.Status() + if s.State == types.Streaming { + break + } + } + time.Sleep(20 * time.Millisecond) + cancel() + }() + + err := coord.Run(ctx) + if err != nil { + t.Fatalf("Run: %v", err) + } + + // Should have backfilled 6..8. + for h := uint64(6); h <= 8; h++ { + if _, err := st.GetHeader(context.Background(), h); err != nil { + t.Errorf("header %d not stored after resume: %v", h, err) + } + } +} + +func TestCoordinatorGapTriggersRebackfill(t *testing.T) { + st := newMockStore() + ft := newMockFetcher(3) + + for h := uint64(1); h <= 3; h++ { + ft.addHeader(makeHeader(h)) + } + + // After first streaming, fetcher will report head=6 on re-init. + callCount := 0 + origGetNetworkHead := ft.headH + + ctx, cancel := context.WithCancel(context.Background()) + coord := New(st, ft, WithStartHeight(1), WithBatchSize(10), WithConcurrency(2)) + + subCh := make(chan *types.Header, 10) + ft.mu.Lock() + ft.subCh = subCh + ft.mu.Unlock() + + go func() { + // Wait for streaming mode. + for { + time.Sleep(10 * time.Millisecond) + if coord.Status().State == types.Streaming { + break + } + } + + if callCount == 0 { + callCount++ + // Send header with gap (skip 4, send 5). + ft.addHeader(makeHeader(4)) + ft.addHeader(makeHeader(5)) + ft.addHeader(makeHeader(6)) + ft.mu.Lock() + ft.headH = 6 + ft.mu.Unlock() + subCh <- makeHeader(5) // gap: expected 4, got 5 + + // Wait for re-backfill to complete and new streaming. + for { + time.Sleep(10 * time.Millisecond) + ss, err := st.GetSyncState(context.Background()) + if err == nil && ss.LatestHeight >= 6 { + break + } + } + + // Create new sub channel for re-subscribe. + newSubCh := make(chan *types.Header, 10) + ft.mu.Lock() + ft.subCh = newSubCh + ft.mu.Unlock() + + // Wait for streaming again, then cancel. + for { + time.Sleep(10 * time.Millisecond) + if coord.Status().State == types.Streaming { + break + } + } + time.Sleep(20 * time.Millisecond) + cancel() + } + }() + + _ = origGetNetworkHead + err := coord.Run(ctx) + if err != nil { + t.Fatalf("Run: %v", err) + } + + // Headers 4-6 should have been backfilled after gap. + for h := uint64(4); h <= 6; h++ { + if _, err := st.GetHeader(context.Background(), h); err != nil { + t.Errorf("header %d not stored after gap recovery: %v", h, err) + } + } +} + +func TestCoordinatorContextCancellation(t *testing.T) { + st := newMockStore() + ft := newMockFetcher(0) // head=0, so fromHeight > networkHeight, skip backfill + + ctx, cancel := context.WithCancel(context.Background()) + + subCh := make(chan *types.Header, 10) + ft.mu.Lock() + ft.subCh = subCh + ft.mu.Unlock() + + done := make(chan error, 1) + coord := New(st, ft, WithStartHeight(1)) + go func() { + done <- coord.Run(ctx) + }() + + // Wait for streaming, then cancel. + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case err := <-done: + if err != nil { + t.Fatalf("Run: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run did not return after cancellation") + } +} diff --git a/pkg/sync/mock_test.go b/pkg/sync/mock_test.go new file mode 100644 index 0000000..35cc415 --- /dev/null +++ b/pkg/sync/mock_test.go @@ -0,0 +1,188 @@ +package syncer + +import ( + "context" + "sync" + + "github.com/evstack/apex/pkg/store" + "github.com/evstack/apex/pkg/types" +) + +// mockStore is an in-memory Store for testing. +type mockStore struct { + mu sync.Mutex + headers map[uint64]*types.Header + blobs []types.Blob + namespaces []types.Namespace + syncState *types.SyncStatus + + putHeaderErr error + putBlobsErr error +} + +func newMockStore() *mockStore { + return &mockStore{ + headers: make(map[uint64]*types.Header), + } +} + +func (m *mockStore) PutHeader(_ context.Context, h *types.Header) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.putHeaderErr != nil { + return m.putHeaderErr + } + m.headers[h.Height] = h + return nil +} + +func (m *mockStore) GetHeader(_ context.Context, height uint64) (*types.Header, error) { + m.mu.Lock() + defer m.mu.Unlock() + h, ok := m.headers[height] + if !ok { + return nil, store.ErrNotFound + } + return h, nil +} + +func (m *mockStore) PutBlobs(_ context.Context, blobs []types.Blob) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.putBlobsErr != nil { + return m.putBlobsErr + } + m.blobs = append(m.blobs, blobs...) + return nil +} + +func (m *mockStore) GetBlob(_ context.Context, ns types.Namespace, height uint64, index int) (*types.Blob, error) { + m.mu.Lock() + defer m.mu.Unlock() + for i := range m.blobs { + b := &m.blobs[i] + if b.Namespace == ns && b.Height == height && b.Index == index { + return b, nil + } + } + return nil, store.ErrNotFound +} + +func (m *mockStore) GetBlobs(_ context.Context, ns types.Namespace, startHeight, endHeight uint64) ([]types.Blob, error) { + m.mu.Lock() + defer m.mu.Unlock() + var result []types.Blob + for _, b := range m.blobs { + if b.Namespace == ns && b.Height >= startHeight && b.Height <= endHeight { + result = append(result, b) + } + } + return result, nil +} + +func (m *mockStore) PutNamespace(_ context.Context, ns types.Namespace) error { + m.mu.Lock() + defer m.mu.Unlock() + for _, existing := range m.namespaces { + if existing == ns { + return nil + } + } + m.namespaces = append(m.namespaces, ns) + return nil +} + +func (m *mockStore) GetNamespaces(_ context.Context) ([]types.Namespace, error) { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]types.Namespace, len(m.namespaces)) + copy(out, m.namespaces) + return out, nil +} + +func (m *mockStore) GetSyncState(_ context.Context) (*types.SyncStatus, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.syncState == nil { + return nil, store.ErrNotFound + } + cp := *m.syncState + return &cp, nil +} + +func (m *mockStore) SetSyncState(_ context.Context, status types.SyncStatus) error { + m.mu.Lock() + defer m.mu.Unlock() + m.syncState = &status + return nil +} + +func (m *mockStore) Close() error { return nil } + +// mockFetcher is a test DataFetcher that serves data from maps. +type mockFetcher struct { + mu sync.Mutex + headers map[uint64]*types.Header + blobs map[uint64][]types.Blob + headH uint64 + + subCh chan *types.Header + getHdrFn func(ctx context.Context, height uint64) (*types.Header, error) +} + +func newMockFetcher(headHeight uint64) *mockFetcher { + return &mockFetcher{ + headers: make(map[uint64]*types.Header), + blobs: make(map[uint64][]types.Blob), + headH: headHeight, + } +} + +func (f *mockFetcher) addHeader(h *types.Header) { + f.mu.Lock() + defer f.mu.Unlock() + f.headers[h.Height] = h +} + +func (f *mockFetcher) addBlobs(height uint64, blobs []types.Blob) { + f.mu.Lock() + defer f.mu.Unlock() + f.blobs[height] = blobs +} + +func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { + f.mu.Lock() + fn := f.getHdrFn + h, ok := f.headers[height] + f.mu.Unlock() + if fn != nil { + return fn(ctx, height) + } + if !ok { + return nil, store.ErrNotFound + } + return h, nil +} + +func (f *mockFetcher) GetBlobs(_ context.Context, height uint64, _ []types.Namespace) ([]types.Blob, error) { + f.mu.Lock() + defer f.mu.Unlock() + return f.blobs[height], nil +} + +func (f *mockFetcher) GetNetworkHead(_ context.Context) (*types.Header, error) { + f.mu.Lock() + defer f.mu.Unlock() + return &types.Header{Height: f.headH}, nil +} + +func (f *mockFetcher) SubscribeHeaders(_ context.Context) (<-chan *types.Header, error) { + f.mu.Lock() + defer f.mu.Unlock() + if f.subCh == nil { + f.subCh = make(chan *types.Header, 64) + } + return f.subCh, nil +} + +func (f *mockFetcher) Close() error { return nil } diff --git a/pkg/sync/subscription.go b/pkg/sync/subscription.go new file mode 100644 index 0000000..84eda98 --- /dev/null +++ b/pkg/sync/subscription.go @@ -0,0 +1,105 @@ +package syncer + +import ( + "context" + "errors" + "fmt" + + "github.com/rs/zerolog" + + "github.com/evstack/apex/pkg/fetch" + "github.com/evstack/apex/pkg/store" + "github.com/evstack/apex/pkg/types" +) + +// SubscriptionManager processes new headers from a live subscription. +type SubscriptionManager struct { + store store.Store + fetcher fetch.DataFetcher + log zerolog.Logger +} + +// Run subscribes to new headers and processes them sequentially. +// Returns ErrGapDetected if a height discontinuity is found. +// Returns nil when ctx is cancelled. +func (sm *SubscriptionManager) Run(ctx context.Context) error { + ch, err := sm.fetcher.SubscribeHeaders(ctx) + if err != nil { + return fmt.Errorf("subscribe headers: %w", err) + } + + namespaces, err := sm.store.GetNamespaces(ctx) + if err != nil { + return fmt.Errorf("get namespaces: %w", err) + } + + // Determine the last processed height and network height from the store. + var lastHeight, networkHeight uint64 + ss, err := sm.store.GetSyncState(ctx) + if err != nil && !errors.Is(err, store.ErrNotFound) { + return fmt.Errorf("get sync state: %w", err) + } + if ss != nil { + lastHeight = ss.LatestHeight + networkHeight = ss.NetworkHeight + } + + for { + select { + case <-ctx.Done(): + return nil + case hdr, ok := <-ch: + if !ok { + // Channel closed (disconnect or ctx cancelled). + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("header subscription closed unexpectedly") + } + + // Check contiguity. + if lastHeight > 0 && hdr.Height != lastHeight+1 { + sm.log.Warn(). + Uint64("expected", lastHeight+1). + Uint64("got", hdr.Height). + Msg("gap detected") + return ErrGapDetected + } + + if err := sm.processHeader(ctx, hdr, namespaces, networkHeight); err != nil { + return fmt.Errorf("process height %d: %w", hdr.Height, err) + } + + lastHeight = hdr.Height + } + } +} + +func (sm *SubscriptionManager) processHeader(ctx context.Context, hdr *types.Header, namespaces []types.Namespace, networkHeight uint64) error { + if err := sm.store.PutHeader(ctx, hdr); err != nil { + return fmt.Errorf("put header: %w", err) + } + + if len(namespaces) > 0 { + blobs, err := sm.fetcher.GetBlobs(ctx, hdr.Height, namespaces) + if err != nil { + return fmt.Errorf("get blobs: %w", err) + } + if len(blobs) > 0 { + if err := sm.store.PutBlobs(ctx, blobs); err != nil { + return fmt.Errorf("put blobs: %w", err) + } + } + } + + if err := sm.store.SetSyncState(ctx, types.SyncStatus{ + State: types.Streaming, + LatestHeight: hdr.Height, + NetworkHeight: networkHeight, + }); err != nil { + return fmt.Errorf("set sync state: %w", err) + } + + sm.log.Debug().Uint64("height", hdr.Height).Msg("processed header") + return nil +} diff --git a/pkg/types/types.go b/pkg/types/types.go index dd6f658..b901f91 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -33,6 +33,7 @@ func (n Namespace) String() string { // Blob represents a blob submitted to a Celestia namespace. type Blob struct { + Height uint64 Namespace Namespace Data []byte Commitment []byte