@@ -2,6 +2,7 @@ package receiver
22
33import (
44 "bytes"
5+ "context"
56 "fmt"
67 "io"
78 "io/fs"
@@ -13,6 +14,7 @@ import (
1314 "strings"
1415 "sync"
1516 "text/template"
17+ "time"
1618
1719 "github.com/go-kit/log"
1820 "github.com/go-sourcemap/sourcemap"
@@ -28,6 +30,8 @@ import (
2830// transforming minified source locations to the original source location.
2931type sourceMapsStore interface {
3032 GetSourceMap (sourceURL string , release string ) (* sourcemap.Consumer , error )
33+ Start ()
34+ Stop ()
3135}
3236
3337// Stub interfaces for easier mocking.
@@ -67,14 +71,14 @@ func (fs osFileService) ReadFile(name string) ([]byte, error) {
6771}
6872
6973type sourceMapMetrics struct {
70- cacheSize * prometheus.CounterVec
74+ cacheSize * prometheus.GaugeVec
7175 downloads * prometheus.CounterVec
7276 fileReads * prometheus.CounterVec
7377}
7478
7579func newSourceMapMetrics (reg prometheus.Registerer ) * sourceMapMetrics {
7680 m := & sourceMapMetrics {
77- cacheSize : prometheus .NewCounterVec (prometheus.CounterOpts {
81+ cacheSize : prometheus .NewGaugeVec (prometheus.GaugeOpts {
7882 Name : "faro_receiver_sourcemap_cache_size" ,
7983 Help : "number of items in source map cache, per origin" ,
8084 }, []string {"origin" }),
@@ -88,7 +92,7 @@ func newSourceMapMetrics(reg prometheus.Registerer) *sourceMapMetrics {
8892 }, []string {"origin" , "status" }),
8993 }
9094
91- m .cacheSize = util .MustRegisterOrGet (reg , m .cacheSize ).(* prometheus.CounterVec )
95+ m .cacheSize = util .MustRegisterOrGet (reg , m .cacheSize ).(* prometheus.GaugeVec )
9296 m .downloads = util .MustRegisterOrGet (reg , m .downloads ).(* prometheus.CounterVec )
9397 m .fileReads = util .MustRegisterOrGet (reg , m .fileReads ).(* prometheus.CounterVec )
9498 return m
@@ -99,6 +103,16 @@ type sourcemapFileLocation struct {
99103 pathTemplate * template.Template
100104}
101105
106+ type timeSource interface {
107+ Now () time.Time
108+ }
109+
110+ type realTimeSource struct {}
111+
112+ func (realTimeSource ) Now () time.Time {
113+ return time .Now ()
114+ }
115+
102116type sourceMapsStoreImpl struct {
103117 log log.Logger
104118 cli httpClient
@@ -107,8 +121,18 @@ type sourceMapsStoreImpl struct {
107121 metrics * sourceMapMetrics
108122 locs []* sourcemapFileLocation
109123
110- cacheMut sync.Mutex
111- cache map [string ]* sourcemap.Consumer
124+ cacheMut sync.Mutex
125+ cache map [string ]* cachedSourceMap
126+ timeSource timeSource
127+ cleanupCtx context.Context
128+ cleanupCancel context.CancelFunc
129+ cleanupWg sync.WaitGroup
130+ isStarted bool
131+ }
132+
133+ type cachedSourceMap struct {
134+ consumer * sourcemap.Consumer
135+ lastUsed time.Time
112136}
113137
114138// newSourceMapStore creates an implementation of sourceMapsStore. The returned
@@ -141,27 +165,28 @@ func newSourceMapsStore(log log.Logger, args SourceMapsArguments, metrics *sourc
141165 }
142166
143167 return & sourceMapsStoreImpl {
144- log : log ,
145- cli : cli ,
146- fs : fs ,
147- args : args ,
148- cache : make (map [string ]* sourcemap.Consumer ),
149- metrics : metrics ,
150- locs : locs ,
168+ log : log ,
169+ cli : cli ,
170+ fs : fs ,
171+ args : args ,
172+ cache : make (map [string ]* cachedSourceMap ),
173+ metrics : metrics ,
174+ locs : locs ,
175+ timeSource : realTimeSource {},
151176 }
152177}
153178
154179func (store * sourceMapsStoreImpl ) GetSourceMap (sourceURL string , release string ) (* sourcemap.Consumer , error ) {
155- // TODO(rfratto): GetSourceMap is weak to transient errors, since it always
156- // caches the result, even when there's an error. This means that transient
157- // errors will be cached forever, preventing source maps from being retrieved.
158-
159180 store .cacheMut .Lock ()
160181 defer store .cacheMut .Unlock ()
161182
162183 cacheKey := fmt .Sprintf ("%s__%s" , sourceURL , release )
163- if sm , ok := store .cache [cacheKey ]; ok {
164- return sm , nil
184+ if cached , ok := store .cache [cacheKey ]; ok {
185+ if cached != nil {
186+ cached .lastUsed = store .timeSource .Now ()
187+ return cached .consumer , nil
188+ }
189+ return nil , nil
165190 }
166191
167192 content , sourceMapURL , err := store .getSourceMapContent (sourceURL , release )
@@ -177,11 +202,113 @@ func (store *sourceMapsStoreImpl) GetSourceMap(sourceURL string, release string)
177202 return nil , err
178203 }
179204 level .Info (store .log ).Log ("msg" , "successfully parsed source map" , "url" , sourceMapURL , "release" , release )
180- store .cache [cacheKey ] = consumer
205+ store .cache [cacheKey ] = & cachedSourceMap {
206+ consumer : consumer ,
207+ lastUsed : store .timeSource .Now (),
208+ }
181209 store .metrics .cacheSize .WithLabelValues (getOrigin (sourceURL )).Inc ()
182210 return consumer , nil
183211}
184212
213+ func (store * sourceMapsStoreImpl ) CleanOldCacheEntries () {
214+ store .cacheMut .Lock ()
215+ defer store .cacheMut .Unlock ()
216+
217+ ttl := store .args .Cache .TTL
218+ for key , cached := range store .cache {
219+ if cached != nil && cached .lastUsed .Before (store .timeSource .Now ().Add (- ttl )) {
220+ srcUrl := strings .SplitN (key , "__" , 2 )[0 ]
221+ origin := getOrigin (srcUrl )
222+ store .metrics .cacheSize .WithLabelValues (origin ).Dec ()
223+ delete (store .cache , key )
224+ }
225+ }
226+ }
227+
228+ func (store * sourceMapsStoreImpl ) CleanCachedErrors () {
229+ store .cacheMut .Lock ()
230+ defer store .cacheMut .Unlock ()
231+
232+ for key , cached := range store .cache {
233+ if cached == nil {
234+ delete (store .cache , key )
235+ }
236+ }
237+ }
238+
239+ // Start begins the cleanup routines based on configured cache intervals.
240+ func (store * sourceMapsStoreImpl ) Start () {
241+ store .cacheMut .Lock ()
242+ defer store .cacheMut .Unlock ()
243+
244+ if store .isStarted {
245+ return
246+ }
247+ store .isStarted = true
248+
249+ cacheConfig := store .args .Cache
250+ if cacheConfig == nil {
251+ return
252+ }
253+
254+ store .cleanupCtx , store .cleanupCancel = context .WithCancel (context .Background ())
255+
256+ if d := cacheConfig .CleanupCheckInterval ; d > 0 {
257+ store .cleanupWg .Add (1 )
258+ go func (interval time.Duration ) {
259+ defer store .cleanupWg .Done ()
260+ store .CleanOldCacheEntries ()
261+ ticker := time .NewTicker (interval )
262+ defer ticker .Stop ()
263+ for {
264+ select {
265+ case <- store .cleanupCtx .Done ():
266+ return
267+ case <- ticker .C :
268+ store .CleanOldCacheEntries ()
269+ }
270+ }
271+ }(d )
272+ }
273+
274+ if d := cacheConfig .ErrorCleanupInterval ; d > 0 {
275+ store .cleanupWg .Add (1 )
276+ go func (interval time.Duration ) {
277+ defer store .cleanupWg .Done ()
278+ store .CleanCachedErrors ()
279+ ticker := time .NewTicker (interval )
280+ defer ticker .Stop ()
281+ for {
282+ select {
283+ case <- store .cleanupCtx .Done ():
284+ return
285+ case <- ticker .C :
286+ store .CleanCachedErrors ()
287+ }
288+ }
289+ }(d )
290+ }
291+ }
292+
293+ // Stop terminates all cleanup goroutines and waits for them to finish.
294+ func (store * sourceMapsStoreImpl ) Stop () {
295+ store .cacheMut .Lock ()
296+ defer store .cacheMut .Unlock ()
297+
298+ if ! store .isStarted {
299+ return
300+ }
301+ store .isStarted = false
302+
303+ if store .cleanupCancel != nil {
304+ store .cleanupCancel ()
305+ store .cleanupCancel = nil
306+ }
307+
308+ store .cleanupWg .Wait ()
309+ store .cleanupCtx = nil
310+ }
311+
185312func (store * sourceMapsStoreImpl ) getSourceMapContent (sourceURL string , release string ) (content []byte , sourceMapURL string , err error ) {
186313 // Attempt to find the source map in the filesystem first.
187314 for _ , loc := range store .locs {
0 commit comments