diff --git a/sei-db/db_engine/rocksdb/mvcc/db.go b/sei-db/db_engine/rocksdb/mvcc/db.go index a6aeddc517..c3e0ac23d6 100644 --- a/sei-db/db_engine/rocksdb/mvcc/db.go +++ b/sei-db/db_engine/rocksdb/mvcc/db.go @@ -127,8 +127,8 @@ func OpenDB(dataDir string, config config.StateStoreConfig) (*Database, error) { } func (db *Database) getSlice(storeKey string, version int64, key []byte) (*grocksdb.Slice, error) { - readOpts := newTSReadOptions(version) - defer readOpts.Destroy() + readOpts, cleanup := newTSReadOptions(version) + defer cleanup() return db.storage.GetCF( readOpts, db.cfHandle, @@ -324,9 +324,9 @@ func (db *Database) Iterator(storeKey string, version int64, start, end []byte) prefix := storePrefix(storeKey) start, end = util.IterateWithPrefix(prefix, start, end) - readOpts := newTSReadOptions(version) + readOpts, cleanup := newTSReadOptions(version) itr := db.storage.NewIteratorCF(readOpts, db.cfHandle) - return NewRocksDBIterator(itr, readOpts, prefix, start, end, version, db.earliestVersion, false), nil + return NewRocksDBIterator(itr, cleanup, prefix, start, end, version, db.earliestVersion, false), nil } func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { @@ -341,9 +341,9 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [ prefix := storePrefix(storeKey) start, end = util.IterateWithPrefix(prefix, start, end) - readOpts := newTSReadOptions(version) + readOpts, cleanup := newTSReadOptions(version) itr := db.storage.NewIteratorCF(readOpts, db.cfHandle) - return NewRocksDBIterator(itr, readOpts, prefix, start, end, version, db.earliestVersion, true), nil + return NewRocksDBIterator(itr, cleanup, prefix, start, end, version, db.earliestVersion, true), nil } // Import loads the initial version of the state in parallel with numWorkers goroutines @@ -416,7 +416,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte readOpts.SetTimestamp(endTs[:]) itr := db.storage.NewIteratorCF(readOpts, db.cfHandle) - rocksItr := NewRocksDBIterator(itr, readOpts, prefix, start, end, latestVersion, 1, false) + rocksItr := NewRocksDBIterator(itr, func() { readOpts.Destroy() }, prefix, start, end, latestVersion, 1, false) defer func() { _ = rocksItr.Close() }() for rocksItr.Valid() { @@ -443,15 +443,17 @@ func (db *Database) GetLatestMigratedModule() (string, error) { panic("not implemented") } -// newTSReadOptions returns ReadOptions used in the RocksDB column family read. -func newTSReadOptions(version int64) *grocksdb.ReadOptions { +// newTSReadOptions returns ReadOptions used in the RocksDB column family read +// and a cleanup function that destroys them. The caller must ensure cleanup is +// called when the ReadOptions are no longer needed. +func newTSReadOptions(version int64) (*grocksdb.ReadOptions, func()) { var ts [TimestampSize]byte binary.LittleEndian.PutUint64(ts[:], uint64(version)) readOpts := grocksdb.NewDefaultReadOptions() readOpts.SetTimestamp(ts[:]) - return readOpts + return readOpts, func() { readOpts.Destroy() } } func storePrefix(storeKey string) []byte { diff --git a/sei-db/db_engine/rocksdb/mvcc/iterator.go b/sei-db/db_engine/rocksdb/mvcc/iterator.go index 3b1cd231cc..8aa245ecf0 100644 --- a/sei-db/db_engine/rocksdb/mvcc/iterator.go +++ b/sei-db/db_engine/rocksdb/mvcc/iterator.go @@ -14,25 +14,25 @@ var _ types.DBIterator = (*iterator)(nil) type iterator struct { source *grocksdb.Iterator - readOpts *grocksdb.ReadOptions + cleanup func() prefix, start, end []byte version int64 reverse bool invalid bool } -func NewRocksDBIterator(source *grocksdb.Iterator, readOpts *grocksdb.ReadOptions, prefix, start, end []byte, version int64, earliestVersion int64, reverse bool) *iterator { +func NewRocksDBIterator(source *grocksdb.Iterator, cleanup func(), prefix, start, end []byte, version int64, earliestVersion int64, reverse bool) *iterator { // Return invalid iterator if requested iterator height is lower than earliest version after pruning if version < earliestVersion { return &iterator{ - source: source, - readOpts: readOpts, - prefix: prefix, - start: start, - end: end, - version: version, - reverse: reverse, - invalid: true, + source: source, + cleanup: cleanup, + prefix: prefix, + start: start, + end: end, + version: version, + reverse: reverse, + invalid: true, } } @@ -60,14 +60,14 @@ func NewRocksDBIterator(source *grocksdb.Iterator, readOpts *grocksdb.ReadOption } return &iterator{ - source: source, - readOpts: readOpts, - prefix: prefix, - start: start, - end: end, - version: version, - reverse: reverse, - invalid: !source.Valid(), + source: source, + cleanup: cleanup, + prefix: prefix, + start: start, + end: end, + version: version, + reverse: reverse, + invalid: !source.Valid(), } } @@ -160,9 +160,9 @@ func (itr *iterator) Error() error { func (itr *iterator) Close() error { itr.source.Close() itr.source = nil - if itr.readOpts != nil { - itr.readOpts.Destroy() - itr.readOpts = nil + if itr.cleanup != nil { + itr.cleanup() + itr.cleanup = nil } itr.invalid = true return nil