Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions devicemapper/thin_pool_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,35 @@ package devicemapper
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"k8s.io/klog/v2"
)

// usageCache is a typed wrapper around atomic.Value that eliminates the need
// for type assertions at every call site. It stores device ID strings mapped
// to usage values (uint64).
type usageCache struct {
v atomic.Value
}

// Load retrieves the current cache map.
func (c *usageCache) Load() map[string]uint64 {
return c.v.Load().(map[string]uint64)
}

// Store saves a new cache map.
func (c *usageCache) Store(m map[string]uint64) {
c.v.Store(m)
}

// ThinPoolWatcher maintains a cache of device name -> usage stats for a
// devicemapper thin-pool using thin_ls.
type ThinPoolWatcher struct {
poolName string
metadataDevice string
lock *sync.RWMutex
cache map[string]uint64
cache usageCache
period time.Duration
stopChan chan struct{}
dmsetup DmsetupClient
Expand All @@ -44,15 +60,16 @@ func NewThinPoolWatcher(poolName, metadataDevice string) (*ThinPoolWatcher, erro
return nil, fmt.Errorf("encountered error creating thin_ls client: %v", err)
}

return &ThinPoolWatcher{poolName: poolName,
w := &ThinPoolWatcher{
poolName: poolName,
metadataDevice: metadataDevice,
lock: &sync.RWMutex{},
cache: make(map[string]uint64),
period: 15 * time.Second,
stopChan: make(chan struct{}),
dmsetup: NewDmsetupClient(),
thinLsClient: thinLsClient,
}, nil
}
w.cache.Store(map[string]uint64{})
return w, nil
}

// Start starts the ThinPoolWatcher.
Expand Down Expand Up @@ -87,14 +104,11 @@ func (w *ThinPoolWatcher) Stop() {

// GetUsage gets the cached usage value of the given device.
func (w *ThinPoolWatcher) GetUsage(deviceID string) (uint64, error) {
w.lock.RLock()
defer w.lock.RUnlock()

v, ok := w.cache[deviceID]
cache := w.cache.Load()
v, ok := cache[deviceID]
if !ok {
return 0, fmt.Errorf("no cached value for usage of device %v", deviceID)
}

return v, nil
}

Expand All @@ -106,9 +120,6 @@ const (
// Refresh performs a `thin_ls` of the pool being watched and refreshes the
// cached data with the result.
func (w *ThinPoolWatcher) Refresh() error {
w.lock.Lock()
defer w.lock.Unlock()

currentlyReserved, err := w.checkReservation(w.poolName)
if err != nil {
err = fmt.Errorf("error determining whether snapshot is reserved: %v", err)
Expand Down Expand Up @@ -148,7 +159,7 @@ func (w *ThinPoolWatcher) Refresh() error {
return err
}

w.cache = newCache
w.cache.Store(newCache)
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions devicemapper/thin_pool_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package devicemapper

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -132,7 +131,6 @@ func TestRefresh(t *testing.T) {
watcher := &ThinPoolWatcher{
poolName: "test pool name",
metadataDevice: "/dev/mapper/metadata-device",
lock: &sync.RWMutex{},
period: 15 * time.Second,
stopChan: make(chan struct{}),
dmsetup: dmsetup,
Expand Down
Loading