Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions zfs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,49 @@ package zfs

import (
"fmt"
"sync"
"sync/atomic"
"time"

zfs "github.com/mistifyio/go-zfs"
"k8s.io/klog/v2"
)

// usageCache is a typed wrapper around atomic.Value that eliminates the need
// for type assertions at every call site. It stores filesystem name strings
// mapped to usage values (uint64).
type usageCache struct {
v atomic.Value
}

// Load retrieves the current cache map.
func (c *usageCache) Load() map[string]uint64 {
return c.v.Load().(map[string]uint64)
}

// Store saves a new cache map.
func (c *usageCache) Store(m map[string]uint64) {
c.v.Store(m)
}

// zfsWatcher maintains a cache of filesystem -> usage stats for a
// zfs filesystem
type ZfsWatcher struct {
filesystem string
lock *sync.RWMutex
cache map[string]uint64
cache usageCache
period time.Duration
stopChan chan struct{}
}

// NewThinPoolWatcher returns a new ThinPoolWatcher for the given devicemapper
// thin pool name and metadata device or an error.
func NewZfsWatcher(filesystem string) (*ZfsWatcher, error) {

return &ZfsWatcher{
w := &ZfsWatcher{
filesystem: filesystem,
lock: &sync.RWMutex{},
cache: make(map[string]uint64),
period: 15 * time.Second,
stopChan: make(chan struct{}),
}, nil
}
w.cache.Store(map[string]uint64{})
return w, nil
}

// Start starts the ZfsWatcher.
Expand Down Expand Up @@ -78,22 +93,16 @@ func (w *ZfsWatcher) Stop() {

// GetUsage gets the cached usage value of the given filesystem.
func (w *ZfsWatcher) GetUsage(filesystem string) (uint64, error) {
w.lock.RLock()
defer w.lock.RUnlock()

v, ok := w.cache[filesystem]
cache := w.cache.Load()
v, ok := cache[filesystem]
if !ok {
return 0, fmt.Errorf("no cached value for usage of filesystem %v", filesystem)
}

return v, nil
}

// Refresh performs a zfs get
func (w *ZfsWatcher) Refresh() error {
w.lock.Lock()
defer w.lock.Unlock()
newCache := make(map[string]uint64)
parent, err := zfs.GetDataset(w.filesystem)
if err != nil {
klog.Errorf("encountered error getting zfs filesystem: %s: %v", w.filesystem, err)
Expand All @@ -105,10 +114,11 @@ func (w *ZfsWatcher) Refresh() error {
return err
}

newCache := make(map[string]uint64)
for _, ds := range children {
newCache[ds.Name] = ds.Used
}

w.cache = newCache
w.cache.Store(newCache)
return nil
}
Loading