Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 46 additions & 45 deletions cmd/deal/send-manual-pdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (

var SendManualPDPCmd = &cli.Command{
Name: "send-manual-pdp",
Usage: "Send a manual PDP deal on-chain",
Description: `Create/reuse a proof set and add a piece to it on-chain via PDPVerifier.
Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1`,
Usage: "Send a manual PDP deal via the FWSS-pull flow",
Description: `Push a single piece to an SP via Curio's /pdp/piece/pull, then trigger the
SP's on-chain commit (createDataSet+addPieces if no assembling set yet, or addPieces
into the existing one). Useful for e2e/diagnostic testing of the FWSS pull path.
Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1 --source-url-base https://static.example.org`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "client",
Expand All @@ -34,12 +36,12 @@ var SendManualPDPCmd = &cli.Command{
},
&cli.StringFlag{
Name: "piece-cid",
Usage: "Piece CID (commp)",
Usage: "Piece CID (commp v1)",
Required: true,
},
&cli.Int64Flag{
Name: "piece-size",
Usage: "Piece size in bytes",
Usage: "Padded piece size in bytes",
Required: true,
},
&cli.StringFlag{
Expand All @@ -48,10 +50,21 @@ var SendManualPDPCmd = &cli.Command{
EnvVars: []string{"ETH_RPC_URL"},
Required: true,
},
&cli.Uint64Flag{
Name: "confirmation-depth",
Usage: "Blocks to wait for tx confirmation",
Value: 5,
&cli.StringFlag{
Name: "source-url-base",
Usage: "HTTPS base where Curio fetches the piece (sourceUrl = <base>/piece/<pieceCidV2>)",
EnvVars: []string{"PDP_SOURCE_URL_BASE"},
Required: true,
},
&cli.StringFlag{
Name: "record-keeper",
Usage: "FWSS contract address. Defaults to network FWSS from go-synapse.",
EnvVars: []string{"PDP_RECORD_KEEPER"},
},
&cli.DurationFlag{
Name: "pull-timeout",
Usage: "How long to wait for Curio to finish each phase",
Value: 5 * time.Minute,
},
},
Action: func(c *cli.Context) error {
Expand All @@ -61,13 +74,17 @@ var SendManualPDPCmd = &cli.Command{
}
defer closer.Close()

pdp, err := dealpusher.NewOnChainPDP(c.Context, db, c.String("eth-rpc"))
pdp, err := dealpusher.NewOnChainPDP(c.Context, dealpusher.OnChainPDPConfig{
DB: db,
RPCURL: c.String("eth-rpc"),
SourceURLBase: c.String("source-url-base"),
RecordKeeper: c.String("record-keeper"),
})
if err != nil {
return errors.WithStack(err)
}
defer pdp.Close()

// load wallet
var walletObj model.Wallet
err = db.WithContext(c.Context).Where("address = ?", c.String("client")).First(&walletObj).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand All @@ -86,56 +103,40 @@ var SendManualPDPCmd = &cli.Command{
return errors.WithStack(err)
}

provider := c.String("provider")

cfg := dealpusher.PDPSchedulingConfig{
BatchSize: 1,
MaxPiecesPerProofSet: 1024,
ConfirmationDepth: c.Uint64("confirmation-depth"),
PollingInterval: 5 * time.Second,
}

// ensure proof set exists (or create one)
fmt.Println("ensuring proof set...")
proofSetID, err := pdp.EnsureProofSet(c.Context, evmSigner, provider, cfg)
if err != nil {
return errors.Wrap(err, "failed to ensure proof set")
}
fmt.Printf("proof set ID: %d\n", proofSetID)

// parse piece cid
pieceCID, err := cid.Parse(c.String("piece-cid"))
if err != nil {
return errors.Wrap(err, "invalid piece CID")
}

// add piece to proof set
fmt.Println("submitting add-roots tx...")
pieceSize := c.Int64("piece-size")
queuedTx, err := pdp.QueueAddRoots(c.Context, evmSigner, proofSetID, []cid.Cid{pieceCID}, []int64{pieceSize}, cfg)
if err != nil {
return errors.Wrap(err, "failed to add roots")

cfg := dealpusher.PDPSchedulingConfig{
BatchSize: 1,
MaxPiecesPerProofSet: 1024,
PullTimeout: c.Duration("pull-timeout"),
}
fmt.Printf("tx: %s\n", queuedTx.Hash)

// wait for confirmation
fmt.Println("waiting for confirmation...")
receipt, err := pdp.WaitForConfirmations(c.Context, queuedTx.Hash, cfg.ConfirmationDepth, cfg.PollingInterval)
fmt.Println("pushing piece to SP via /pdp/piece/pull + on-chain commit...")
result, err := pdp.PullPiecesToFWSS(
c.Context,
evmSigner,
c.String("provider"),
[]dealpusher.PDPPieceInput{{PieceCID: pieceCID, PieceSize: pieceSize}},
cfg,
)
if err != nil {
return errors.Wrap(err, "tx failed")
return errors.Wrap(err, "FWSS pull push failed")
}
fmt.Printf("confirmed at block %d (gas: %d)\n", receipt.BlockNumber, receipt.GasUsed)
fmt.Printf("data set ID: %d\n", result.DataSetID)

// save deal record
proofSetIDCopy := proofSetID
dataSetIDCopy := result.DataSetID
dealModel := &model.Deal{
State: model.DealProposed,
DealType: model.DealTypePDP,
Provider: provider,
Provider: c.String("provider"),
PieceCID: model.CID(pieceCID),
PieceSize: pieceSize,
WalletID: &walletObj.ID,
ProofSetID: &proofSetIDCopy,
ProofSetID: &dataSetIDCopy,
}
if err := db.WithContext(c.Context).Create(dealModel).Error; err != nil {
return errors.Wrap(err, "failed to save deal")
Expand Down
41 changes: 24 additions & 17 deletions cmd/run/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,28 @@ var DealPusherCmd = &cli.Command{
},
&cli.IntFlag{
Name: "pdp-batch-size",
Usage: "Number of roots to include in each PDP add-roots transaction",
Usage: "Number of pieces to include in each /pdp/piece/pull request",
Value: 128,
},
&cli.IntFlag{
Name: "pdp-max-pieces-per-proofset",
Usage: "Maximum pieces per proof set before handing off to the storage provider",
Usage: "Maximum pieces per proof set before starting a new one",
Value: 1024,
},
&cli.Uint64Flag{
Name: "pdp-confirmation-depth",
Usage: "Number of block confirmations required for PDP transactions",
Value: 5,
},
&cli.DurationFlag{
Name: "pdp-poll-interval",
Usage: "Polling interval for PDP transaction confirmation checks",
Value: 30 * time.Second,
Name: "pdp-pull-timeout",
Usage: "How long to wait for Curio to finish pulling a batch (per request)",
Value: 5 * time.Minute,
},
&cli.StringFlag{
Name: "pdp-source-url-base",
Usage: "HTTPS base URL where Curio fetches pieces from; sourceUrl is built as <base>/piece/<pieceCid>",
EnvVars: []string{"PDP_SOURCE_URL_BASE"},
},
&cli.StringFlag{
Name: "pdp-record-keeper",
Usage: "FWSS contract address (recordKeeper). Defaults to the network default from go-synapse.",
EnvVars: []string{"PDP_RECORD_KEEPER"},
},
&cli.StringFlag{
Name: "eth-rpc",
Expand Down Expand Up @@ -114,8 +119,7 @@ var DealPusherCmd = &cli.Command{
pdpCfg := dealpusher.PDPSchedulingConfig{
BatchSize: c.Int("pdp-batch-size"),
MaxPiecesPerProofSet: c.Int("pdp-max-pieces-per-proofset"),
ConfirmationDepth: c.Uint64("pdp-confirmation-depth"),
PollingInterval: c.Duration("pdp-poll-interval"),
PullTimeout: c.Duration("pdp-pull-timeout"),
}
if err := pdpCfg.Validate(); err != nil {
return errors.WithStack(err)
Expand All @@ -125,16 +129,19 @@ var DealPusherCmd = &cli.Command{
dealpusher.WithPDPSchedulingConfig(pdpCfg),
}
if rpcURL := c.String("eth-rpc"); rpcURL != "" {
pdpAdapter, err := dealpusher.NewOnChainPDP(c.Context, db, rpcURL)
adapterCfg := dealpusher.OnChainPDPConfig{
DB: db,
RPCURL: rpcURL,
SourceURLBase: c.String("pdp-source-url-base"),
RecordKeeper: c.String("pdp-record-keeper"),
}
pdpAdapter, err := dealpusher.NewOnChainPDP(c.Context, adapterCfg)
if err != nil {
return errors.Wrap(err, "failed to initialize PDP on-chain adapter")
}
defer pdpAdapter.Close()

opts = append(opts,
dealpusher.WithPDPProofSetManager(pdpAdapter),
dealpusher.WithPDPTransactionConfirmer(pdpAdapter),
)
opts = append(opts, dealpusher.WithPDPProofSetManager(pdpAdapter))
}

if ddoContract := c.String("ddo-contract"); ddoContract != "" {
Expand Down
2 changes: 1 addition & 1 deletion docs/en/cli-reference/deal/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 15 additions & 11 deletions docs/en/cli-reference/deal/send-manual-pdp.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions docs/en/cli-reference/run/deal-pusher.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/bcicen/jstream v1.0.1
github.com/brianvoe/gofakeit/v6 v6.23.2
github.com/cockroachdb/errors v1.11.3
github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17
github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1
github.com/data-preservation-programs/table v0.0.3
github.com/dustin/go-humanize v1.0.1
github.com/ethereum/go-ethereum v1.14.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ github.com/cronokirby/saferith v0.33.0 h1:TgoQlfsD4LIwx71+ChfRcIpjkw+RPOapDEVxa+
github.com/cronokirby/saferith v0.33.0/go.mod h1:QKJhjoqUtBsXCAVEjw38mFqoi7DebT7kthcD7UzbnoA=
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis=
github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17 h1:pTOIdr5W7pHrxFaQdONxHWiCtIOgdc6zfKL82SS4xhI=
github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc=
github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1 h1:SGQs5b7eyjycfxKDRHmzZW613BjjRkDT0XITafKK50A=
github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc=
github.com/data-preservation-programs/table v0.0.3 h1:hboeauxPXybE8KlMA+RjDXz/J4xaG5CAFCcxyOm8yWo=
github.com/data-preservation-programs/table v0.0.3/go.mod h1:sRGP/IuuqFc/y9QfmDyb5h6Q2wrnhhnBofEOj9aDRJg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
28 changes: 18 additions & 10 deletions model/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,22 @@ const (
// This is a materialized view built from Shovel-indexed events, replacing
// the per-cycle RPC scans of GetProofSets/GetProofSetsForClient.
type PDPProofSet struct {
SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"`
ClientAddress string `gorm:"not null;index" json:"clientAddress"`
Provider string `gorm:"not null" json:"provider"`
IsLive bool `gorm:"default:false" json:"isLive"`
ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"`
CreatedBlock int64 `gorm:"not null" json:"createdBlock"`
Deleted bool `gorm:"default:false" json:"deleted"`
HandoffState ProofSetHandoffState `gorm:"default:'assembling'" json:"handoffState"`
ProposedProviderEVM string ` json:"proposedProviderEVM,omitempty"`
PieceCount int `gorm:"default:0" json:"pieceCount"`
SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"`
ClientAddress string `gorm:"not null;index" json:"clientAddress"`
Provider string `gorm:"not null" json:"provider"`
IsLive bool `gorm:"default:false" json:"isLive"`
ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"`
CreatedBlock int64 `gorm:"not null" json:"createdBlock"`
Deleted bool `gorm:"default:false" json:"deleted"`
HandoffState ProofSetHandoffState `gorm:"default:'assembling'" json:"handoffState"`
PieceCount int `gorm:"default:0" json:"pieceCount"`
// ClientDataSetID is the per-(payer, set) nonce the client signs into
// CreateDataSet/AddPieces extraData. FWSS rejects reused IDs via its
// clientNonces[payer][id] check, so we persist before signing to make
// retries idempotent. Stored decimal because uint256 doesn't fit any
// native int.
ClientDataSetID string ` json:"clientDataSetId,omitempty"`
// ServiceURL caches the SP's PDP HTTP endpoint, fetched from
// ServiceProviderRegistry the first time we push to this provider.
ServiceURL string ` json:"serviceUrl,omitempty"`
}
17 changes: 8 additions & 9 deletions service/dealpusher/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ var waitPendingInterval = time.Minute

// DealPusher represents a struct that encapsulates the data and functionality related to pushing deals in a replication process.
type DealPusher struct {
dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
keyStore keystore.KeyStore // Keystore for loading private keys
lotusClient jsonrpc.RPCClient // Lotus JSON-RPC client for chain queries
dealMaker replication.DealMaker // Object responsible for making a deal in replication.
pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer.
pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config for root batching and tx confirmation.
ddoDealManager DDODealManager // Optional DDO deal lifecycle manager.
ddoSchedulingConfig DDOSchedulingConfig // DDO scheduling config for allocation batching and tx confirmation.
dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
keyStore keystore.KeyStore // Keystore for loading private keys
lotusClient jsonrpc.RPCClient // Lotus JSON-RPC client for chain queries
dealMaker replication.DealMaker // Object responsible for making a deal in replication.
pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config for batch sizing and pull timeouts.
ddoDealManager DDODealManager // Optional DDO deal lifecycle manager.
ddoSchedulingConfig DDOSchedulingConfig // DDO scheduling config for allocation batching and tx confirmation.
// Resolver is injected so tests and future wiring can switch deal type behavior without coupling DealPusher to config storage.
scheduleDealTypeResolver func(schedule *model.Schedule) model.DealType
workerID uuid.UUID // UUID identifying the associated worker.
Expand Down
6 changes: 0 additions & 6 deletions service/dealpusher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ func WithPDPProofSetManager(manager PDPProofSetManager) Option {
}
}

func WithPDPTransactionConfirmer(confirmer PDPTransactionConfirmer) Option {
return func(d *DealPusher) {
d.pdpTxConfirmer = confirmer
}
}

func WithPDPSchedulingConfig(cfg PDPSchedulingConfig) Option {
return func(d *DealPusher) {
d.pdpSchedulingConfig = cfg
Expand Down
Loading
Loading