Skip to content

Commit 039ddc7

Browse files
mvdanmasih
authored andcommitted
v2/blockstore: add ReadWrite.Discard
This allows closing a read-write blockstore without doing the extra work to finalize its header and index. Can be useful if an entire piece of work is cancelled, and also simplifies the tests. Also make ReadOnly error in a straightforward way if it is used after being closed. Before, this could lead to panics, as we'd attempt to read the CARv1 file when it's closed. Both mechanisms now use a "closed" boolean, which is consistent and simpler than checking a header field. Finally, add tests that ensure both ReadOnly and ReadWrite behave as intended once they have been closed. The tests also uncovered that AllKeysChan would not release the mutex if it encountered an error early on. Fix that, too. While at it, fix some now-obsolete references to panics on unsupported or after-close method calls. Fixes #205.
1 parent 6c87996 commit 039ddc7

File tree

6 files changed

+175
-79
lines changed

6 files changed

+175
-79
lines changed

v2/blockstore/doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
// The ReadWrite blockstore allows writing and reading of the blocks concurrently. The user of this
1212
// blockstore is responsible for calling ReadWrite.Finalize when finished writing blocks.
1313
// Upon finalization, the instance can no longer be used for reading or writing blocks and will
14-
// panic if used. To continue reading the blocks users are encouraged to use ReadOnly blockstore
14+
// error if used. To continue reading the blocks users are encouraged to use ReadOnly blockstore
1515
// instantiated from the same file path using OpenReadOnly.
1616
// A user may resume reading/writing from files produced by an instance of ReadWrite blockstore. The
1717
// resumption is attempted automatically, if the path passed to OpenReadWrite exists.

v2/blockstore/export_test.go

Lines changed: 0 additions & 11 deletions
This file was deleted.

v2/blockstore/readonly.go

Lines changed: 54 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,36 @@ var _ blockstore.Blockstore = (*ReadOnly)(nil)
2727
var (
2828
errZeroLengthSection = fmt.Errorf("zero-length carv2 section not allowed by default; see WithZeroLengthSectionAsEOF option")
2929
errReadOnly = fmt.Errorf("called write method on a read-only carv2 blockstore")
30+
errClosed = fmt.Errorf("cannot use a carv2 blockstore after closing")
3031
)
3132

32-
type (
33-
// ReadOnly provides a read-only CAR Block Store.
34-
ReadOnly struct {
35-
// mu allows ReadWrite to be safe for concurrent use.
36-
// It's in ReadOnly so that read operations also grab read locks,
37-
// given that ReadWrite embeds ReadOnly for methods like Get and Has.
38-
//
39-
// The main fields guarded by the mutex are the index and the underlying writers.
40-
// For simplicity, the entirety of the blockstore methods grab the mutex.
41-
mu sync.RWMutex
42-
43-
// The backing containing the data payload in CARv1 format.
44-
backing io.ReaderAt
45-
// The CARv1 content index.
46-
idx index.Index
47-
48-
// If we called carv2.NewReaderMmap, remember to close it too.
49-
carv2Closer io.Closer
50-
51-
ropts carv2.ReadOptions
52-
}
33+
// ReadOnly provides a read-only CAR Block Store.
34+
type ReadOnly struct {
35+
// mu allows ReadWrite to be safe for concurrent use.
36+
// It's in ReadOnly so that read operations also grab read locks,
37+
// given that ReadWrite embeds ReadOnly for methods like Get and Has.
38+
//
39+
// The main fields guarded by the mutex are the index and the underlying writers.
40+
// For simplicity, the entirety of the blockstore methods grab the mutex.
41+
mu sync.RWMutex
42+
43+
// When true, the blockstore has been closed via Close, Discard, or
44+
// Finalize, and must not be used. Any further blockstore method calls
45+
// will return errClosed to avoid panics or broken behavior.
46+
closed bool
47+
48+
// The backing containing the data payload in CARv1 format.
49+
backing io.ReaderAt
50+
// The CARv1 content index.
51+
idx index.Index
52+
53+
// If we called carv2.NewReaderMmap, remember to close it too.
54+
carv2Closer io.Closer
55+
56+
ropts carv2.ReadOptions
57+
}
5358

54-
contextKey string
55-
)
59+
type contextKey string
5660

5761
const asyncErrHandlerKey contextKey = "asyncErrorHandlerKey"
5862

@@ -177,7 +181,7 @@ func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) {
177181
return bcid, data, err
178182
}
179183

180-
// DeleteBlock is unsupported and always panics.
184+
// DeleteBlock is unsupported and always errors.
181185
func (b *ReadOnly) DeleteBlock(_ cid.Cid) error {
182186
return errReadOnly
183187
}
@@ -187,6 +191,10 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) {
187191
b.mu.RLock()
188192
defer b.mu.RUnlock()
189193

194+
if b.closed {
195+
return false, errClosed
196+
}
197+
190198
var fnFound bool
191199
var fnErr error
192200
err := b.idx.GetAll(key, func(offset uint64) bool {
@@ -223,6 +231,10 @@ func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) {
223231
b.mu.RLock()
224232
defer b.mu.RUnlock()
225233

234+
if b.closed {
235+
return nil, errClosed
236+
}
237+
226238
var fnData []byte
227239
var fnErr error
228240
err := b.idx.GetAll(key, func(offset uint64) bool {
@@ -263,6 +275,10 @@ func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
263275
b.mu.RLock()
264276
defer b.mu.RUnlock()
265277

278+
if b.closed {
279+
return 0, errClosed
280+
}
281+
266282
fnSize := -1
267283
var fnErr error
268284
err := b.idx.GetAll(key, func(offset uint64) bool {
@@ -329,16 +345,26 @@ func WithAsyncErrorHandler(ctx context.Context, errHandler func(error)) context.
329345
// See WithAsyncErrorHandler
330346
func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
331347
// We release the lock when the channel-sending goroutine stops.
348+
// Note that we can't use a deferred unlock here,
349+
// because if we return a nil error,
350+
// we only want to unlock once the async goroutine has stopped.
332351
b.mu.RLock()
333352

353+
if b.closed {
354+
b.mu.RUnlock() // don't hold the mutex forever
355+
return nil, errClosed
356+
}
357+
334358
// TODO we may use this walk for populating the index, and we need to be able to iterate keys in this way somewhere for index generation. In general though, when it's asked for all keys from a blockstore with an index, we should iterate through the index when possible rather than linear reads through the full car.
335359
rdr := internalio.NewOffsetReadSeeker(b.backing, 0)
336360
header, err := carv1.ReadHeader(rdr)
337361
if err != nil {
362+
b.mu.RUnlock() // don't hold the mutex forever
338363
return nil, fmt.Errorf("error reading car header: %w", err)
339364
}
340365
headerSize, err := carv1.HeaderSize(header)
341366
if err != nil {
367+
b.mu.RUnlock() // don't hold the mutex forever
342368
return nil, err
343369
}
344370

@@ -347,6 +373,7 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
347373

348374
// Seek to the end of header.
349375
if _, err = rdr.Seek(int64(headerSize), io.SeekStart); err != nil {
376+
b.mu.RUnlock() // don't hold the mutex forever
350377
return nil, err
351378
}
352379

@@ -424,10 +451,10 @@ func (b *ReadOnly) Roots() ([]cid.Cid, error) {
424451
}
425452

426453
// Close closes the underlying reader if it was opened by OpenReadOnly.
454+
// After this call, the blockstore can no longer be used.
427455
//
428456
// Note that this call may block if any blockstore operations are currently in
429-
// progress, including an AllKeysChan that hasn't been fully consumed or
430-
// cancelled.
457+
// progress, including an AllKeysChan that hasn't been fully consumed or cancelled.
431458
func (b *ReadOnly) Close() error {
432459
b.mu.Lock()
433460
defer b.mu.Unlock()
@@ -436,6 +463,7 @@ func (b *ReadOnly) Close() error {
436463
}
437464

438465
func (b *ReadOnly) closeWithoutMutex() error {
466+
b.closed = true
439467
if b.carv2Closer != nil {
440468
return b.carv2Closer.Close()
441469
}

v2/blockstore/readonly_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func TestReadOnly(t *testing.T) {
101101
require.NoError(t, err)
102102
require.Equal(t, wantBlock, gotBlock)
103103

104-
// Assert write operations panic
104+
// Assert write operations error
105105
require.Error(t, subject.Put(wantBlock))
106106
require.Error(t, subject.PutMany([]blocks.Block{wantBlock}))
107107
require.Error(t, subject.DeleteBlock(key))
@@ -235,3 +235,38 @@ func newV1Reader(r io.Reader, zeroLenSectionAsEOF bool) (*carv1.CarReader, error
235235
}
236236
return carv1.NewCarReader(r)
237237
}
238+
239+
func TestReadOnlyErrorAfterClose(t *testing.T) {
240+
bs, err := OpenReadOnly("../testdata/sample-v1.car")
241+
require.NoError(t, err)
242+
243+
roots, err := bs.Roots()
244+
require.NoError(t, err)
245+
_, err = bs.Has(roots[0])
246+
require.NoError(t, err)
247+
_, err = bs.Get(roots[0])
248+
require.NoError(t, err)
249+
_, err = bs.GetSize(roots[0])
250+
require.NoError(t, err)
251+
252+
ctx, cancel := context.WithCancel(context.Background())
253+
_, err = bs.AllKeysChan(ctx)
254+
require.NoError(t, err)
255+
cancel() // to stop the AllKeysChan goroutine
256+
257+
bs.Close()
258+
259+
_, err = bs.Roots()
260+
require.Error(t, err)
261+
_, err = bs.Has(roots[0])
262+
require.Error(t, err)
263+
_, err = bs.Get(roots[0])
264+
require.Error(t, err)
265+
_, err = bs.GetSize(roots[0])
266+
require.Error(t, err)
267+
_, err = bs.AllKeysChan(ctx)
268+
require.Error(t, err)
269+
270+
// TODO: test that closing blocks if an AllKeysChan operation is
271+
// in progress.
272+
}

v2/blockstore/readwrite.go

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323

2424
var _ blockstore.Blockstore = (*ReadWrite)(nil)
2525

26-
var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after finalizing")
27-
2826
// ReadWrite implements a blockstore that stores blocks in CARv2 format.
2927
// Blocks put into the blockstore can be read back once they are successfully written.
3028
// This implementation is preferable for a write-heavy workload.
@@ -33,7 +31,7 @@ var errFinalized = fmt.Errorf("cannot use a read-write carv2 blockstore after fi
3331
//
3432
// The Finalize function must be called once the putting blocks are finished.
3533
// Upon calling Finalize header is finalized and index is written out.
36-
// Once finalized, all read and write calls to this blockstore will result in panics.
34+
// Once finalized, all read and write calls to this blockstore will result in errors.
3735
type ReadWrite struct {
3836
ronly ReadOnly
3937

@@ -62,7 +60,7 @@ func AllowDuplicatePuts(allow bool) carv2.WriteOption {
6260
// ReadWrite.Finalize must be called once putting and reading blocks are no longer needed.
6361
// Upon calling ReadWrite.Finalize the CARv2 header and index are written out onto the file and the
6462
// backing file is closed. Once finalized, all read and write calls to this blockstore will result
65-
// in panics. Note, ReadWrite.Finalize must be called on an open instance regardless of whether any
63+
// in errors. Note, ReadWrite.Finalize must be called on an open instance regardless of whether any
6664
// blocks were put or not.
6765
//
6866
// If a file at given path does not exist, the instantiation will write car.Pragma and data payload
@@ -287,26 +285,22 @@ func (b *ReadWrite) unfinalize() error {
287285
return err
288286
}
289287

290-
func (b *ReadWrite) finalized() bool {
291-
return b.header.DataSize != 0
292-
}
293-
294288
// Put puts a given block to the underlying datastore
295289
func (b *ReadWrite) Put(blk blocks.Block) error {
296-
// PutMany already checks b.finalized.
290+
// PutMany already checks b.ronly.closed.
297291
return b.PutMany([]blocks.Block{blk})
298292
}
299293

300294
// PutMany puts a slice of blocks at the same time using batching
301295
// capabilities of the underlying datastore whenever possible.
302296
func (b *ReadWrite) PutMany(blks []blocks.Block) error {
303-
if b.finalized() {
304-
return errFinalized
305-
}
306-
307297
b.ronly.mu.Lock()
308298
defer b.ronly.mu.Unlock()
309299

300+
if b.ronly.closed {
301+
return errClosed
302+
}
303+
310304
for _, bl := range blks {
311305
c := bl.Cid()
312306

@@ -331,25 +325,37 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {
331325
return nil
332326
}
333327

328+
// Discard closes this blockstore without finalizing its header and index.
329+
// After this call, the blockstore can no longer be used.
330+
//
331+
// Note that this call may block if any blockstore operations are currently in
332+
// progress, including an AllKeysChan that hasn't been fully consumed or cancelled.
333+
func (b *ReadWrite) Discard() {
334+
// Same semantics as ReadOnly.Close, including allowing duplicate calls.
335+
// The only difference is that our method is called Discard,
336+
// to further clarify that we're not properly finalizing and writing a
337+
// CARv2 file.
338+
b.ronly.Close()
339+
}
340+
334341
// Finalize finalizes this blockstore by writing the CARv2 header, along with flattened index
335342
// for more efficient subsequent read.
336-
// After this call, this blockstore can no longer be used for read or write.
343+
// After this call, the blockstore can no longer be used.
337344
func (b *ReadWrite) Finalize() error {
338-
if b.header.DataSize != 0 {
345+
b.ronly.mu.Lock()
346+
defer b.ronly.mu.Unlock()
347+
348+
if b.ronly.closed {
339349
// Allow duplicate Finalize calls, just like Close.
340350
// Still error, just like ReadOnly.Close; it should be discarded.
341-
return fmt.Errorf("called Finalize twice")
351+
return fmt.Errorf("called Finalize on a closed blockstore")
342352
}
343353

344-
b.ronly.mu.Lock()
345-
defer b.ronly.mu.Unlock()
346354
// TODO check if add index option is set and don't write the index then set index offset to zero.
347355
b.header = b.header.WithDataSize(uint64(b.dataWriter.Position()))
348356

349357
// Note that we can't use b.Close here, as that tries to grab the same
350358
// mutex we're holding here.
351-
// TODO: should we check the error here? especially with OpenReadWrite,
352-
// we should care about close errors.
353359
defer b.ronly.closeWithoutMutex()
354360

355361
// TODO if index not needed don't bother flattening it.
@@ -360,39 +366,29 @@ func (b *ReadWrite) Finalize() error {
360366
if err := index.WriteTo(fi, internalio.NewOffsetWriter(b.f, int64(b.header.IndexOffset))); err != nil {
361367
return err
362368
}
363-
_, err = b.header.WriteTo(internalio.NewOffsetWriter(b.f, carv2.PragmaSize))
364-
return err
365-
}
369+
if _, err := b.header.WriteTo(internalio.NewOffsetWriter(b.f, carv2.PragmaSize)); err != nil {
370+
return err
371+
}
366372

367-
func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
368-
if b.finalized() {
369-
return nil, errFinalized
373+
if err := b.ronly.closeWithoutMutex(); err != nil {
374+
return err
370375
}
376+
return nil
377+
}
371378

379+
func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
372380
return b.ronly.AllKeysChan(ctx)
373381
}
374382

375383
func (b *ReadWrite) Has(key cid.Cid) (bool, error) {
376-
if b.finalized() {
377-
return false, errFinalized
378-
}
379-
380384
return b.ronly.Has(key)
381385
}
382386

383387
func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) {
384-
if b.finalized() {
385-
return nil, errFinalized
386-
}
387-
388388
return b.ronly.Get(key)
389389
}
390390

391391
func (b *ReadWrite) GetSize(key cid.Cid) (int, error) {
392-
if b.finalized() {
393-
return 0, errFinalized
394-
}
395-
396392
return b.ronly.GetSize(key)
397393
}
398394

0 commit comments

Comments
 (0)