From ffcf10499833b4768342756d0e7398884588fe5a Mon Sep 17 00:00:00 2001 From: Vladimir Erdman Date: Sun, 23 Nov 2025 16:45:12 +0400 Subject: [PATCH 1/3] Add fetched node size to progress tracker Signed-off-by: Vladimir Erdman --- ipld/merkledag/merkledag.go | 53 +++++++++++++++++++++++--------- ipld/merkledag/merkledag_test.go | 21 +++++++++++-- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/ipld/merkledag/merkledag.go b/ipld/merkledag/merkledag.go index 7e21383df..0c4db7a57 100644 --- a/ipld/merkledag/merkledag.go +++ b/ipld/merkledag/merkledag.go @@ -132,6 +132,20 @@ func GetLinksDirect(serv format.NodeGetter) GetLinks { } } +// GetLinksDirectWithProgressTracker creates a function as GetLinksDirect, but +// updates the ProgressTracker with the block size of the retrieved node. +func GetLinksDirectWithProgressTracker(serv format.NodeGetter, tracker *ProgressTracker) GetLinks { + return func(ctx context.Context, c cid.Cid) ([]*format.Link, error) { + nd, err := serv.Get(ctx, c) + if err != nil { + return nil, err + } + // We don't use Size() as it returns cumulative size including linked nodes. + tracker.Update(uint64(len(nd.RawData()))) + return nd.Links(), nil + } +} + type sesGetter struct { bs *bserv.Session decoder *legacy.Decoder @@ -208,20 +222,13 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s // We default to Concurrent() walk. opts = append([]WalkOption{Concurrent()}, opts...) - // If we have a ProgressTracker, we wrap the visit function to handle it. + // If we have a ProgressTracker, we wrap the get links function to handle it. v, _ := ctx.Value(progressContextKey).(*ProgressTracker) if v == nil { return WalkDepth(ctx, GetLinksDirect(ng), root, visit, opts...) } - visitProgress := func(c cid.Cid, depth int) bool { - if visit(c, depth) { - v.Increment() - return true - } - return false - } - return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, opts...) + return WalkDepth(ctx, GetLinksDirectWithProgressTracker(ng, v), root, visit, opts...) } // GetMany gets many nodes from the DAG at once. @@ -457,10 +464,18 @@ func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, d return nil } +// ProgressStat represents the progress of a fetch operation. +type ProgressStat struct { + // Total is the total number of nodes fetched. + Total int + // TotalSize is the total size of the nodes fetched. + TotalSize uint64 +} + // ProgressTracker is used to show progress when fetching nodes. type ProgressTracker struct { - Total int - lk sync.Mutex + stat ProgressStat + lk sync.Mutex } // DeriveContext returns a new context with value "progress" derived from the @@ -469,18 +484,26 @@ func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context { return context.WithValue(ctx, progressContextKey, p) } -// Increment adds one to the total progress. -func (p *ProgressTracker) Increment() { +// Update adds one to the total and updates the total size. +func (p *ProgressTracker) Update(size uint64) { p.lk.Lock() defer p.lk.Unlock() - p.Total++ + p.stat.Total++ + p.stat.TotalSize += size } // Value returns the current progress. func (p *ProgressTracker) Value() int { p.lk.Lock() defer p.lk.Unlock() - return p.Total + return p.stat.Total +} + +// ProgressStat returns the current progress stat. +func (p *ProgressTracker) ProgressStat() ProgressStat { + p.lk.Lock() + defer p.lk.Unlock() + return p.stat } func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *walkOptions) error { diff --git a/ipld/merkledag/merkledag_test.go b/ipld/merkledag/merkledag_test.go index 08b240d47..6f3f210a9 100644 --- a/ipld/merkledag/merkledag_test.go +++ b/ipld/merkledag/merkledag_test.go @@ -1161,7 +1161,7 @@ func TestProgressIndicatorNoChildren(t *testing.T) { func testProgressIndicator(t *testing.T, depth int) { ds := dstest.Mock() - top, numChildren := mkDag(ds, depth) + top, numChildren, totalSize := mkDag(ds, depth) v := new(ProgressTracker) ctx := v.DeriveContext(context.Background()) @@ -1175,9 +1175,19 @@ func testProgressIndicator(t *testing.T, depth int) { t.Errorf("wrong number of children reported in progress indicator, expected %d, got %d", numChildren+1, v.Value()) } + + if v.ProgressStat().Total != numChildren+1 { + t.Errorf("wrong number of children reported in progress stat indicator, expected %d, got %d", + numChildren+1, v.ProgressStat().Total) + } + + if v.ProgressStat().TotalSize != totalSize { + t.Errorf("wrong total size reported in progress stat indicator, expected %d, got %d", + totalSize, v.ProgressStat().TotalSize) + } } -func mkDag(ds ipld.DAGService, depth int) (cid.Cid, int) { +func mkDag(ds ipld.DAGService, depth int) (cid.Cid, int, uint64) { ctx := context.Background() totalChildren := 0 @@ -1213,7 +1223,12 @@ func mkDag(ds ipld.DAGService, depth int) (cid.Cid, int) { panic(err) } - return nd.Cid(), totalChildren + totalSize, err := nd.Size() + if err != nil { + panic(err) + } + + return nd.Cid(), totalChildren, totalSize } func mkNodeWithChildren(getChild func() *ProtoNode, width int) *ProtoNode { From dfa7a6d42f99249c59c0881c0ce4c676aead5136 Mon Sep 17 00:00:00 2001 From: Vladimir Erdman Date: Sun, 23 Nov 2025 17:01:29 +0400 Subject: [PATCH 2/3] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c76d622d4..ec8bc2872 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes: ### Added - `routing/http`: `GET /routing/v1/dht/closest/peers/{key}` per [IPIP-476](https://github.com/ipfs/specs/pull/476) +- `ipld/merkledag`: Added fetched node size reporting to the progress tracker. See [kubo#8915](https://github.com/ipfs/kubo/issues/8915) ### Changed From 467065c7af0bc30f6002f3d44902483537c032b7 Mon Sep 17 00:00:00 2001 From: Vladimir Erdman Date: Mon, 24 Nov 2025 12:08:08 +0400 Subject: [PATCH 3/3] fix naming --- ipld/merkledag/merkledag.go | 20 ++++++++++---------- ipld/merkledag/merkledag_test.go | 10 +++++----- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ipld/merkledag/merkledag.go b/ipld/merkledag/merkledag.go index 0c4db7a57..590b4b1fd 100644 --- a/ipld/merkledag/merkledag.go +++ b/ipld/merkledag/merkledag.go @@ -133,7 +133,7 @@ func GetLinksDirect(serv format.NodeGetter) GetLinks { } // GetLinksDirectWithProgressTracker creates a function as GetLinksDirect, but -// updates the ProgressTracker with the block size of the retrieved node. +// updates the ProgressTracker with the raw block data size of the retrieved node. func GetLinksDirectWithProgressTracker(serv format.NodeGetter, tracker *ProgressTracker) GetLinks { return func(ctx context.Context, c cid.Cid) ([]*format.Link, error) { nd, err := serv.Get(ctx, c) @@ -466,10 +466,10 @@ func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, d // ProgressStat represents the progress of a fetch operation. type ProgressStat struct { - // Total is the total number of nodes fetched. - Total int - // TotalSize is the total size of the nodes fetched. - TotalSize uint64 + // Nodes is the total number of nodes fetched. + Nodes int + // Bytes is the total bytes of raw block data. + Bytes uint64 } // ProgressTracker is used to show progress when fetching nodes. @@ -484,19 +484,19 @@ func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context { return context.WithValue(ctx, progressContextKey, p) } -// Update adds one to the total and updates the total size. -func (p *ProgressTracker) Update(size uint64) { +// Update adds one to the total nodes and updates the total bytes. +func (p *ProgressTracker) Update(bytes uint64) { p.lk.Lock() defer p.lk.Unlock() - p.stat.Total++ - p.stat.TotalSize += size + p.stat.Nodes++ + p.stat.Bytes += bytes } // Value returns the current progress. func (p *ProgressTracker) Value() int { p.lk.Lock() defer p.lk.Unlock() - return p.stat.Total + return p.stat.Nodes } // ProgressStat returns the current progress stat. diff --git a/ipld/merkledag/merkledag_test.go b/ipld/merkledag/merkledag_test.go index 6f3f210a9..6ac926a61 100644 --- a/ipld/merkledag/merkledag_test.go +++ b/ipld/merkledag/merkledag_test.go @@ -1176,14 +1176,14 @@ func testProgressIndicator(t *testing.T, depth int) { numChildren+1, v.Value()) } - if v.ProgressStat().Total != numChildren+1 { + if v.ProgressStat().Nodes != numChildren+1 { t.Errorf("wrong number of children reported in progress stat indicator, expected %d, got %d", - numChildren+1, v.ProgressStat().Total) + numChildren+1, v.ProgressStat().Nodes) } - if v.ProgressStat().TotalSize != totalSize { - t.Errorf("wrong total size reported in progress stat indicator, expected %d, got %d", - totalSize, v.ProgressStat().TotalSize) + if v.ProgressStat().Bytes != totalSize { + t.Errorf("wrong bytes reported in progress stat indicator, expected %d, got %d", + totalSize, v.ProgressStat().Bytes) } }