Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ type EtcdServer struct {
kv mvcc.WatchableKV
lessor lease.Lessor
bemu sync.RWMutex
defragMu sync.Mutex
be backend.Backend
beHooks *serverstorage.BackendHooks
authStore auth.AuthStore
Expand Down Expand Up @@ -975,8 +976,10 @@ func (s *EtcdServer) Cleanup() {
}

func (s *EtcdServer) Defragment() error {
s.bemu.Lock()
defer s.bemu.Unlock()
s.defragMu.Lock()
defer s.defragMu.Unlock()
s.bemu.RLock()
defer s.bemu.RUnlock()
return s.be.Defrag()
}

Expand Down
253 changes: 178 additions & 75 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package backend

import (
"errors"
"fmt"
"hash/crc32"
"io"
Expand All @@ -28,6 +29,7 @@ import (
"go.uber.org/zap"

bolt "go.etcd.io/bbolt"
bolterrors "go.etcd.io/bbolt/errors"
"go.etcd.io/etcd/client/pkg/v3/verify"
)

Expand Down Expand Up @@ -469,20 +471,6 @@ func (b *backend) defrag() error {
isDefragActive.Set(1)
defer isDefragActive.Set(0)

// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()

// block concurrent read requests while resetting tx
b.readTx.Lock()
defer b.readTx.Unlock()

// Create a temporary file to ensure we start with a clean slate.
// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
dir := filepath.Dir(b.db.Path())
Expand All @@ -500,77 +488,156 @@ func (b *backend) defrag() error {
// return nil, fmt.Errorf(defragOpenFileError)
return temp, nil
}
// Don't load tmp db into memory regardless of opening options
options.Mlock = false
tdbp := temp.Name()
tmpdb, err := bolt.Open(tdbp, 0o600, &options)
if err != nil {
temp.Close()
if rmErr := os.Remove(temp.Name()); rmErr != nil {
b.lg.Error(
"failed to remove temporary file",
b.lg.Error("failed to remove temporary file",
zap.String("path", temp.Name()),
zap.Error(rmErr),
)
}

return err
}

dbp := b.db.Path()
size1, sizeInUse1 := b.Size(), b.SizeInUse()
b.lg.Info(
"defragmenting",
b.lg.Info("defragmenting",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes", size1),
zap.String("current-db-size", humanize.Bytes(uint64(size1))),
zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
)

defer func() {
// NOTE: We should exit as soon as possible because that tx
// might be closed. The inflight request might use invalid
// tx and then panic as well. The real panic reason might be
// shadowed by new panic. So, we should fatal here with lock.
if rerr := recover(); rerr != nil {
b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr))
}
}()
journal, snapTx, err := b.defragSetupSnapshot()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please improve the name?

  • snapshot already has a meaning in etcd
  • this only initializes the transaction, no? the real data is read in defragFromTx?

if err != nil {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we call defragCancelJournal for completeness if we fail here? Or in defer?

tmpdb.Close()
os.Remove(tdbp)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do the os.Remove error handling in a consistent way as above

return err
}

// Commit/stop and then reset current transactions (including the readTx)

@atiratree atiratree Jun 17, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this comment no longer apply?

b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil
b.lg.Info("defrag: copying data (writes unlocked)")

// gofail: var defragBeforeCopy struct{}
err = defragdb(b.db, tmpdb, defragLimit)
err = defragFromTx(snapTx, tmpdb, defragLimit)
snapTx.Rollback()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the journal? Aren't the writes disabled up until this point?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we need the journal because we're defragging from a read snapshot transaction, so we need something to catch any writes so they can be replayed. This is what allows this to be "less blocking", before, we had to block writes while we copied the entire database; with this, we allow writes while we copy the database by saving the incoming writes and replaying them later.

The journal replay and the database switchover are the only times we need to stop both reads and writes, which we do at the same time.


if err != nil {
b.defragCancelJournal(journal)
tmpdb.Close()
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
}

// restore the bbolt transactions if defragmentation fails
b.batchTx.tx = b.unsafeBegin(true)
b.readTx.tx = b.unsafeBegin(false)
Comment on lines -553 to -555

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the puerpose behind this restore?

os.RemoveAll(tdbp)
return err
}

b.lg.Info("defrag: replaying journal")
err = b.defragReplayAndSwap(journal, tmpdb, dbp, tdbp)
if err != nil {
return err
}

err = b.db.Close()
took := time.Since(now)
defragSec.Observe(took.Seconds())

size2, sizeInUse2 := b.Size(), b.SizeInUse()
b.lg.Info("finished defragmenting directory",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes-diff", size2-size1),
zap.Int64("current-db-size-bytes", size2),
zap.String("current-db-size", humanize.Bytes(uint64(size2))),
zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
zap.Duration("took", took),
)
return nil
}

// defragSetupSnapshot commits pending writes, takes a read-only
// snapshot, and installs a journal to capture writes during the copy.
func (b *backend) defragSetupSnapshot() (*defragJournal, *bolt.Tx, error) {
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

b.batchTx.commit(false)

b.mu.RLock()
snapTx, err := b.db.Begin(false)
b.mu.RUnlock()
if err != nil {
return nil, nil, fmt.Errorf("failed to begin snapshot tx for defrag: %w", err)
}

journal := newDefragJournal()
b.batchTx.defragJournal = journal
return journal, snapTx, nil
}

// defragCancelJournal removes the journal from the batch transaction
// and closes it. Called when the copy phase fails.
func (b *backend) defragCancelJournal(journal *defragJournal) {
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
b.batchTx.defragJournal = nil
journal.close()
}

// defragReplayAndSwap drains the journal, replays it into the temp
// database, then atomically swaps the old database for the new one.
// batchTx is held for the entire operation to prevent writes between
// journal drain and database swap.
func (b *backend) defragReplayAndSwap(journal *defragJournal, tmpdb *bolt.DB, dbp, tdbp string) error {
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

b.batchTx.defragJournal = nil
journal.close()
ops := journal.drain()
Comment on lines +595 to +597

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could use defragJournal instead of journal arg


if len(ops) > 0 {
b.lg.Info("defrag: replaying journal ops", zap.Int("count", len(ops)))
}

if err := replayJournal(tmpdb, ops, defragLimit); err != nil {
tmpdb.Close()
os.RemoveAll(tdbp)
return err
}

b.lg.Info("defrag: switching database")

b.mu.Lock()
defer b.mu.Unlock()
b.readTx.Lock()
defer b.readTx.Unlock()

// NOTE: We should exit as soon as possible because that tx
// might be closed. The inflight request might use invalid
// tx and then panic as well. The real panic reason might be
// shadowed by new panic. So, we should fatal here with lock.
defer func() {
if rerr := recover(); rerr != nil {
b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr))
}
}()

b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil

if err := b.db.Close(); err != nil {
b.lg.Fatal("failed to close database", zap.Error(err))
}
err = tmpdb.Close()
if err != nil {
if err := tmpdb.Close(); err != nil {
b.lg.Fatal("failed to close tmp database", zap.Error(err))
}
// gofail: var defragBeforeRename struct{}
err = os.Rename(tdbp, dbp)
if err != nil {
if err := os.Rename(tdbp, dbp); err != nil {
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
}

var err error
b.db, err = bolt.Open(dbp, 0o600, b.bopts)
if err != nil {
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
Expand All @@ -585,29 +652,10 @@ func (b *backend) defrag() error {
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

took := time.Since(now)
defragSec.Observe(took.Seconds())

size2, sizeInUse2 := b.Size(), b.SizeInUse()
b.lg.Info(
"finished defragmenting directory",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes-diff", size2-size1),
zap.Int64("current-db-size-bytes", size2),
zap.String("current-db-size", humanize.Bytes(uint64(size2))),
zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
zap.Duration("took", took),
)
return nil
}

func defragdb(odb, tmpdb *bolt.DB, limit int) error {
// gofail: var defragdbFail string
// return fmt.Errorf(defragdbFail)

// open a tx on tmpdb for writes
func defragFromTx(srcTx *bolt.Tx, tmpdb *bolt.DB, limit int) error {
tmptx, err := tmpdb.Begin(true)
if err != nil {
return err
Expand All @@ -618,18 +666,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
}()

// open a tx on old db for read
tx, err := odb.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()

c := tx.Cursor()

c := srcTx.Cursor()
count := 0
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
b := srcTx.Bucket(next)
if b == nil {
return fmt.Errorf("backend: cannot defrag bucket %s", next)
}
Expand Down Expand Up @@ -665,6 +705,69 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return tmptx.Commit()
}

func replayJournal(tmpdb *bolt.DB, ops []defragJournalOp, limit int) error {
if len(ops) == 0 {
return nil
}

tx, err := tmpdb.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
}
}()

count := 0
for _, op := range ops {
count++
if count > limit {
if err = tx.Commit(); err != nil {
return err
}
tx, err = tmpdb.Begin(true)
if err != nil {
return err
}
count = 0
}

switch op.opType {
case opCreateBucket:
if _, err = tx.CreateBucketIfNotExists(op.bucketName); err != nil {
return fmt.Errorf("replay: create bucket %s: %w", op.bucketName, err)
}
case opDeleteBucket:
if delErr := tx.DeleteBucket(op.bucketName); delErr != nil && !errors.Is(delErr, bolterrors.ErrBucketNotFound) {
return fmt.Errorf("replay: delete bucket %s: %w", op.bucketName, delErr)
}
case opPut:
b := tx.Bucket(op.bucketName)
if b == nil {
return fmt.Errorf("replay: bucket %s not found for put", op.bucketName)
}
if op.seq {
b.FillPercent = 0.9
}
if err = b.Put(op.key, op.value); err != nil {
return fmt.Errorf("replay: put in bucket %s: %w", op.bucketName, err)
}
case opDelete:
b := tx.Bucket(op.bucketName)
if b == nil {
continue
}
if err = b.Delete(op.key); err != nil {
return fmt.Errorf("replay: delete from bucket %s: %w", op.bucketName, err)
}
}
}

return tx.Commit()
}

func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock()
tx := b.unsafeBegin(write)
Expand Down
Loading