Skip to content

Commit 0731582

Browse files
authored
fix: overflow slices (#1595)
In the unusual case where a sharding stream has to create a new shard because adding the root CID to the CAR header pushes it over the shard limit the slice indexing info for the new shard was set for the blocks in the previous shard, not the blocks in the new shard 🤦, this means that one or more blocks in DAG were not indexed at all.
1 parent b737073 commit 0731582

File tree

2 files changed

+43
-13
lines changed

2 files changed

+43
-13
lines changed

packages/upload-client/src/sharding.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ export class ShardingStream extends TransformStream {
105105
overflowCurrentLength = 0
106106
/** @type {Map<import('./types.js').SliceDigest, import('./types.js').Position>} */
107107
const overflowSlices = new DigestMap()
108-
for (const block of blocks) {
108+
for (const block of overflowBlocks) {
109109
const overflowBlockHeaderLength = blockHeaderEncodingLength(block)
110110
overflowSlices.set(block.cid.multihash, [
111111
headerLength + overflowCurrentLength + overflowBlockHeaderLength,

packages/upload-client/test/sharding.test.js

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,24 +78,22 @@ describe('ShardingStream', () => {
7878
await randomBlock(32), // encoded block length = 70
7979
]
8080

81-
/** @type {import('../src/types.js').CARFile[]} */
81+
/** @type {import('../src/types.js').IndexedCARFile[]} */
8282
const shards = []
83+
let i = 0
8384
await new ReadableStream({
8485
pull(controller) {
85-
const block = blocks.shift()
86+
const block = blocks[i]
8687
if (!block) return controller.close()
8788
controller.enqueue(block)
89+
i++
8890
},
8991
})
90-
// shard with no roots = encoded block (166) + CAR header (18) = 183
91-
// shard with no roots = encoded block (102) + CAR header (18) = 120
92-
// shard with 1 root = encoded block (70) + CAR header (18) = 88
93-
// shard with 1 root = encoded block (70) + CAR header (59) = 155
94-
// i.e. shard size of 208 (120 + 88) should allow us 1 shard with 0 roots
95-
// and then 1 shard with 2 blocks that, when encoded as a CAR with 1 root
96-
// will actually exceed the shard size. It must then be refactored into
97-
// 2 shards.
98-
.pipeThrough(new ShardingStream({ shardSize: 208 }))
92+
// 166 + 102 + 70 + 18 (0 root CAR header) = 356
93+
// 166 + 102 + 70 + 59 (1 root CAR header) = 397
94+
// Choose 360 as shard size so when CAR header with a root is added, the
95+
// 3rd block is moved into a new shard.
96+
.pipeThrough(new ShardingStream({ shardSize: 360 }))
9997
.pipeTo(
10098
new WritableStream({
10199
write: (s) => {
@@ -104,7 +102,39 @@ describe('ShardingStream', () => {
104102
})
105103
)
106104

107-
assert.equal(shards.length, 3)
105+
assert.equal(shards.length, 2)
106+
107+
const shard0Bytes = new Uint8Array(await shards[0].arrayBuffer())
108+
const shard1Bytes = new Uint8Array(await shards[1].arrayBuffer())
109+
110+
// block 0 and 1 should be in shard 0
111+
const slice0 = shards[0].slices.get(blocks[0].cid.multihash)
112+
assert.ok(slice0)
113+
assert(
114+
equals(
115+
blocks[0].bytes,
116+
shard0Bytes.slice(slice0[0], slice0[0] + slice0[1])
117+
)
118+
)
119+
120+
const slice1 = shards[0].slices.get(blocks[1].cid.multihash)
121+
assert.ok(slice1)
122+
assert(
123+
equals(
124+
blocks[1].bytes,
125+
shard0Bytes.slice(slice1[0], slice1[0] + slice1[1])
126+
)
127+
)
128+
129+
// block 2 should be in shard 1
130+
const slice2 = shards[1].slices.get(blocks[2].cid.multihash)
131+
assert.ok(slice2)
132+
assert(
133+
equals(
134+
blocks[2].bytes,
135+
shard1Bytes.slice(slice2[0], slice2[0] + slice2[1])
136+
)
137+
)
108138
})
109139

110140
it('exceeds shard size when block is encoded with root CID', async () => {

0 commit comments

Comments
 (0)