Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions enterprise/server/backends/distributed/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,10 @@ func (c *Cache) getLookasideEntry(ctx context.Context, r *rspb.ResourceName) ([]
func (c *Cache) lookasideWriter(r *rspb.ResourceName, lookasideKey string) (interfaces.CommittedWriteCloser, error) {
buffer := bytes.NewBuffer(make([]byte, 0, r.GetDigest().GetSizeBytes()))
wc := ioutil.NewCustomCommitWriteCloser(buffer)
wc.CommitFn = func(int64) error {
wc.SetCommitFn(func(int64) error {
c.setLookasideEntry(lookasideKey, buffer.Bytes())
return nil
}
})
return wc, nil
}

Expand All @@ -527,13 +527,13 @@ func combineCommittedWriteClosers(a, b interfaces.CommittedWriteCloser) interfac

c := io.MultiWriter(a, b)
cwc := ioutil.NewCustomCommitWriteCloser(c)
cwc.CommitFn = func(n int64) error {
cwc.SetCommitFn(func(n int64) error {
if err := a.Commit(); err != nil {
return err
}
return b.Commit()
}
cwc.CloseFn = func() error {
})
cwc.SetCloseFn(func() error {
var firstErr error
if err := a.Close(); err != nil {
firstErr = err
Expand All @@ -542,7 +542,7 @@ func combineCommittedWriteClosers(a, b interfaces.CommittedWriteCloser) interfac
firstErr = err
}
return firstErr
}
})
return cwc
}

Expand Down
4 changes: 2 additions & 2 deletions enterprise/server/backends/memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,10 @@ func (c *Cache) Writer(ctx context.Context, r *rspb.ResourceName) (interfaces.Co
}
var buffer bytes.Buffer
wc := ioutil.NewCustomCommitWriteCloser(&buffer)
wc.CommitFn = func(int64) error {
wc.SetCommitFn(func(int64) error {
// Locking and key prefixing are handled in Set.
return c.mcSet(k, buffer.Bytes())
}
})
return wc, nil

}
Expand Down
4 changes: 2 additions & 2 deletions enterprise/server/backends/metacache/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (c *Cache) newWrappedWriter(ctx context.Context, fileRecord *sgpb.FileRecor

var encryptionMetadata *sgpb.EncryptionMetadata
cwc := ioutil.NewCustomCommitWriteCloser(wcm)
cwc.CommitFn = func(bytesWritten int64) error {
cwc.SetCommitFn(func(bytesWritten int64) error {
now := c.opts.Clock.Now().UnixMicro()
md := &sgpb.FileMetadata{
FileRecord: fileRecord,
Expand All @@ -410,7 +410,7 @@ func (c *Cache) newWrappedWriter(ctx context.Context, fileRecord *sgpb.FileRecor
return status.UnavailableError("zero-length writes are not allowed")
}
return fn(md)
}
})

wc := interfaces.CommittedWriteCloser(cwc)
shouldEncrypt, err := c.encryptionEnabled(ctx)
Expand Down
4 changes: 2 additions & 2 deletions enterprise/server/backends/migration_cache/migration_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,12 +915,12 @@ func (mc *MigrationCache) Writer(ctx context.Context, r *rspb.ResourceName) (int
}
if conf.asyncDestWrites {
w := ioutil.NewCustomCommitWriteCloser(srcWriter)
w.CommitFn = func(int64) error {
w.SetCommitFn(func(int64) error {
// This is only called when the source writer is successfully committed.
// We will force a write to the destination cache in the background.
mc.sendNonBlockingCopy(ctx, r, false /*=onlyCopyMissing*/, conf)
return nil
}
})
return w, nil
}

Expand Down
12 changes: 6 additions & 6 deletions enterprise/server/backends/pebble_cache/pebble_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2079,8 +2079,8 @@ func (p *PebbleCache) newCDCCommitedWriteCloser(ctx context.Context, fileRecord
cdcw.chunker = chunker

cwc := ioutil.NewCustomCommitWriteCloser(wc)
cwc.CloseFn = db.Close
cwc.CommitFn = func(bytesWritten int64) error {
cwc.SetCloseFn(db.Close)
cwc.SetCommitFn(func(bytesWritten int64) error {
if decompressor != nil {
if err := decompressor.Close(); err != nil {
return status.InternalErrorf("failed to close decompressor: %s", err)
Expand Down Expand Up @@ -2122,7 +2122,7 @@ func (p *PebbleCache) newCDCCommitedWriteCloser(ctx context.Context, fileRecord
return status.InternalErrorf("invalid number of chunks (%d)", numChunks)
}
return p.writeMetadata(ctx, db, key, md)
}
})
return cwc, nil
}

Expand Down Expand Up @@ -2326,8 +2326,8 @@ func (p *PebbleCache) newWrappedWriter(ctx context.Context, fileRecord *sgpb.Fil

var encryptionMetadata *sgpb.EncryptionMetadata
cwc := ioutil.NewCustomCommitWriteCloser(wcm)
cwc.CloseFn = db.Close
cwc.CommitFn = func(bytesWritten int64) error {
cwc.SetCloseFn(db.Close)
cwc.SetCommitFn(func(bytesWritten int64) error {
now := p.clock.Now().UnixMicro()
md := &sgpb.FileMetadata{
FileRecord: fileRecord,
Expand All @@ -2344,7 +2344,7 @@ func (p *PebbleCache) newWrappedWriter(ctx context.Context, fileRecord *sgpb.Fil
}

return p.writeMetadata(ctx, db, key, md)
}
})

wc := interfaces.CommittedWriteCloser(cwc)
shouldEncrypt, err := p.encryptionEnabled(ctx)
Expand Down
4 changes: 2 additions & 2 deletions enterprise/server/backends/redis_cache/redis_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,12 @@ func (c *Cache) Writer(ctx context.Context, r *rspb.ResourceName) (interfaces.Co
timer := cache_metrics.NewCacheTimer(cacheLabels)
var buffer bytes.Buffer
wc := ioutil.NewCustomCommitWriteCloser(&buffer)
wc.CommitFn = func(int64) error {
wc.SetCommitFn(func(int64) error {
err := c.rdbSet(ctx, k, buffer.Bytes())
timer.ObserveWrite(int64(buffer.Len()), err)
// Locking and key prefixing are handled in Set.
return err
}
})
return wc, nil
}

Expand Down
4 changes: 2 additions & 2 deletions enterprise/server/raft/replica/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func writer(t *testing.T, em *entryMaker, r *replica.Replica, h *rfpb.Header, fi
writeCloserMetadata := fs.InlineWriter(context.TODO(), fileRecord.GetDigest().GetSizeBytes())

wc := ioutil.NewCustomCommitWriteCloser(writeCloserMetadata)
wc.CommitFn = func(bytesWritten int64) error {
wc.SetCommitFn(func(bytesWritten int64) error {
now := time.Now()
md := &sgpb.FileMetadata{
FileRecord: fileRecord,
Expand All @@ -162,7 +162,7 @@ func writer(t *testing.T, em *entryMaker, r *replica.Replica, h *rfpb.Header, fi
require.Equal(t, 1, len(writeRsp))

return rbuilder.NewBatchResponse(writeRsp[0].Result.Data).AnyError()
}
})
return wc
}

Expand Down
7 changes: 6 additions & 1 deletion enterprise/server/remote_execution/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,12 @@ func TestExecuteTaskAndStreamResults_InternalInputDownloadTimeout(t *testing.T)
require.NoError(t, err)
retry, err := exec.ExecuteTaskAndStreamResults(ctx, task, publisher)
require.True(t, status.IsUnavailableError(err), "expected Unavailable error, got: %v", err)
require.ErrorContains(t, err, "timed out waiting for Read response")
require.True(
t,
strings.Contains(err.Error(), "timed out waiting for Read response") || strings.Contains(err.Error(), "context deadline exceeded"),
"expected read timeout error, got: %v",
err,
)
require.False(t, retry, "bazel will retry Unavailable errors, so we should not retry internally")

<-mockServer.finished
Expand Down
8 changes: 4 additions & 4 deletions server/backends/blobstore/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (a *AwsS3BlobStore) Writer(ctx context.Context, blobName string) (interface

zw := util.NewCompressWriter(pw)
cwc := ioutil.NewCustomCommitWriteCloser(zw)
cwc.CommitFn = func(int64) error {
cwc.SetCommitFn(func(int64) error {
if compresserCloseErr := zw.Close(); compresserCloseErr != nil {
cancel() // Don't try to finish the commit op if Close() failed.
if pipeCloseErr := pw.Close(); pipeCloseErr != nil {
Expand All @@ -328,10 +328,10 @@ func (a *AwsS3BlobStore) Writer(ctx context.Context, blobName string) (interface
return writerCloseErr
}
return <-errch
}
cwc.CloseFn = func() error {
})
cwc.SetCloseFn(func() error {
cancel()
return pw.Close()
}
})
return cwc, nil
}
8 changes: 4 additions & 4 deletions server/backends/blobstore/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (z *AzureBlobStore) Writer(ctx context.Context, blobName string) (interface

zw := util.NewCompressWriter(pw)
cwc := ioutil.NewCustomCommitWriteCloser(zw)
cwc.CommitFn = func(int64) error {
cwc.SetCommitFn(func(int64) error {
if compresserCloseErr := zw.Close(); compresserCloseErr != nil {
cancel() // Don't try to finish the commit op if Close() failed.
if pipeCloseErr := pw.Close(); pipeCloseErr != nil {
Expand All @@ -200,10 +200,10 @@ func (z *AzureBlobStore) Writer(ctx context.Context, blobName string) (interface
return writerCloseErr
}
return <-errch
}
cwc.CloseFn = func() error {
})
cwc.SetCloseFn(func() error {
cancel()
return nil
}
})
return cwc, nil
}
6 changes: 3 additions & 3 deletions server/backends/blobstore/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func (d *DiskBlobStore) Writer(ctx context.Context, blobName string) (interfaces
}
zw := util.NewCompressWriter(fw)
cwc := ioutil.NewCustomCommitWriteCloser(zw)
cwc.CommitFn = func(int64) error {
cwc.SetCommitFn(func(int64) error {
if compresserCloseErr := zw.Close(); compresserCloseErr != nil {
if writerCloseErr := fw.Close(); writerCloseErr != nil {
log.Errorf("Error encountered when closing FileWriter for %s: %s", blobName, err)
}
return compresserCloseErr
}
return fw.Commit()
}
cwc.CloseFn = fw.Close
})
cwc.SetCloseFn(fw.Close)
return cwc, nil
}
16 changes: 8 additions & 8 deletions server/backends/blobstore/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (g *GCSBlobStore) ConditionalWriter(ctx context.Context, blobName string, o
ow.ObjectAttrs.CustomTime = customTime

cwc := ioutil.NewCustomCommitWriteCloser(ow)
cwc.CommitFn = func(n int64) error {
cwc.SetCommitFn(func(n int64) error {
err := ow.Close()
if gerr, ok := err.(*googleapi.Error); ok {
if gerr.Code == http.StatusPreconditionFailed {
Expand All @@ -283,11 +283,11 @@ func (g *GCSBlobStore) ConditionalWriter(ctx context.Context, blobName string, o
}
util.RecordWriteMetrics(g.metricLabel, start, int(n), err)
return err
}
cwc.CloseFn = func() error {
})
cwc.SetCloseFn(func() error {
cancel()
return nil
}
})
return cwc, nil
}

Expand All @@ -307,7 +307,7 @@ func (g *GCSBlobStore) Writer(ctx context.Context, blobName string) (interfaces.
zw = ow
}
cwc := ioutil.NewCustomCommitWriteCloser(zw)
cwc.CommitFn = func(n int64) error {
cwc.SetCommitFn(func(n int64) error {
err := zw.Close()
if err != nil {
cancel() // Don't try to finish the commit op if Close() failed.
Expand All @@ -317,11 +317,11 @@ func (g *GCSBlobStore) Writer(ctx context.Context, blobName string) (interfaces.
}
util.RecordWriteMetrics(g.metricLabel, start, int(n), err)
return err
}
cwc.CloseFn = func() error {
})
cwc.SetCloseFn(func() error {
cancel()
return nil
}
})
return cwc, nil
}

Expand Down
4 changes: 2 additions & 2 deletions server/backends/disk_cache/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ func (p *partition) writer(ctx context.Context, r *rspb.ResourceName) (interface
return nil, err
}
cwc := ioutil.NewCustomCommitWriteCloser(fw)
cwc.CommitFn = func(totalBytesWritten int64) error {
cwc.SetCommitFn(func(totalBytesWritten int64) error {
record, err := makeRecordFromPath(k, k.FullPath())
if err != nil {
return err
Expand All @@ -1222,7 +1222,7 @@ func (p *partition) writer(ctx context.Context, r *rspb.ResourceName) (interface
p.lruAdd(record)
metrics.DiskCacheAddedFileSizeBytes.With(prometheus.Labels{metrics.CacheNameLabel: cacheName}).Observe(float64(totalBytesWritten))
return nil
}
})
return cwc, nil
}

Expand Down
4 changes: 2 additions & 2 deletions server/backends/memory_cache/memory_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ func (m *MemoryCache) Reader(ctx context.Context, rn *rspb.ResourceName, uncompr
func (m *MemoryCache) Writer(ctx context.Context, r *rspb.ResourceName) (interfaces.CommittedWriteCloser, error) {
var buffer bytes.Buffer
wc := ioutil.NewCustomCommitWriteCloser(&buffer)
wc.CommitFn = func(int64) error {
wc.SetCommitFn(func(int64) error {
// Locking and key prefixing are handled in SetDeprecated.
return m.Set(ctx, r, buffer.Bytes())
}
})
return wc, nil
}

Expand Down
8 changes: 4 additions & 4 deletions server/testutil/mockgcs/mockgcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,22 @@ func (m *mockGCS) ConditionalWriter(ctx context.Context, blobName string, overwr
exists = exists && !m.expired(blobName)
if exists && !overwriteExisting {
cwc := ioutil.NewCustomCommitWriteCloser(ioutil.DiscardWriteCloser())
cwc.CommitFn = func(int64) error {
cwc.SetCommitFn(func(int64) error {
return status.AlreadyExistsError("mock gcs blob already exists")
}
})
return cwc, nil
}
var buf bytes.Buffer
cwc := ioutil.NewCustomCommitWriteCloser(&buf)
cwc.CommitFn = func(int64) error {
cwc.SetCommitFn(func(int64) error {
m.mu.Lock()
defer m.mu.Unlock()
m.items[blobName] = &timestampedBlob{
data: buf.Bytes(),
customTime: customTime,
}
return nil
}
})
return cwc, nil
}

Expand Down
6 changes: 3 additions & 3 deletions server/util/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewZstdCompressingWriteCommiter(wc interfaces.CommittedWriteCloser, bufferP
return nil, err
}
compressWC := ioutil.NewCustomCommitWriteCloser(compressor)
compressWC.CommitFn = func(inputBytes int64) error {
compressWC.SetCommitFn(func(inputBytes int64) error {
// Close the compressor to flush the buffer and return it to the pool.
if err := compressor.Close(); err != nil {
return err
Expand All @@ -205,8 +205,8 @@ func NewZstdCompressingWriteCommiter(wc interfaces.CommittedWriteCloser, bufferP
With(prometheus.Labels{metrics.CompressionType: "zstd", metrics.CacheNameLabel: cacheName}).
Observe(float64(compressor.CompressedBytesWritten) / float64(inputBytes))
return wc.Commit()
}
compressWC.CloseFn = wc.Close
})
compressWC.SetCloseFn(wc.Close)
return compressWC, nil
}

Expand Down
8 changes: 4 additions & 4 deletions server/util/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,15 @@ func TestNewZstdCompressingWriteCommiter(t *testing.T) {
counter := &ioutil.Counter{}
wc := ioutil.NewCustomCommitWriteCloser(counter)
var commited int64
wc.CommitFn = func(bytesWritten int64) error {
wc.SetCommitFn(func(bytesWritten int64) error {
commited = bytesWritten
return nil
}
})
closed := false
wc.CloseFn = func() error {
wc.SetCloseFn(func() error {
closed = true
return nil
}
})
src := []byte("hello worldddddddddddddddddddddddddddddddddddddddddd")
compressWC, err := compression.NewZstdCompressingWriteCommiter(wc, bufPool, int64(len(src)), "test_cache")
require.NoError(t, err)
Expand Down
Loading
Loading